class Fluent::CollectdUnrollOutput

Public Instance Methods

emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_collectd_unroll.rb, line 9
def emit(tag, es, chain)
  tag = update_tag(tag)
  es.each { |time, record|
    Engine.emit(tag, time, normalize_record(record))
  }

  chain.next
end
update_tag(tag) click to toggle source
# File lib/fluent/plugin/out_collectd_unroll.rb, line 19
def update_tag(tag)
  if remove_tag_prefix
    if remove_tag_prefix == tag
      tag = ''
    elsif tag.to_s.start_with?(remove_tag_prefix+'.')
      tag = tag[remove_tag_prefix.length+1 .. -1]
    end
  end
  if add_tag_prefix
    tag = tag && tag.length > 0 ? "#{add_tag_prefix}.#{tag}" : add_tag_prefix
  end
  return tag
end

Private Instance Methods

normalize_record(record) click to toggle source
# File lib/fluent/plugin/out_collectd_unroll.rb, line 35
def normalize_record(record)
  if record.nil?
    return record
  end
  if !(record.has_key?('values')) || !(record.has_key?('dsnames')) || !(record.has_key?('dstypes')) || !(record.has_key?('host')) || !(record.has_key?('plugin')) || !(record.has_key?('plugin_instance')) || !(record.has_key?('type')) || !(record.has_key?('type_instance'))
    return record
  end
  
  record['values'].each_with_index { |value, index|
    @tags = [record['host'].gsub(".","/"), record['plugin'], record['plugin_instance'], record['type'], record['type_instance'], record['dsnames'][index]]
    tag = @tags.join(".").squeeze(".").gsub(/\.$/, '')
    record[tag] = value
    record[record['dsnames'][index]] = value
    record['dstype_' + record['dsnames'][index]] = record['dstypes'][index]
    record['dstype'] = record['dstypes'][index]
  }
  record.delete('dstypes')
  record.delete('dsnames')
  record.delete('values')
  record
end