class Fluent::CollectdUnrollOutput
Public Instance Methods
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_collectd_unroll.rb, line 9 def emit(tag, es, chain) tag = update_tag(tag) es.each { |time, record| Engine.emit(tag, time, normalize_record(record)) } chain.next end
update_tag(tag)
click to toggle source
# File lib/fluent/plugin/out_collectd_unroll.rb, line 19 def update_tag(tag) if remove_tag_prefix if remove_tag_prefix == tag tag = '' elsif tag.to_s.start_with?(remove_tag_prefix+'.') tag = tag[remove_tag_prefix.length+1 .. -1] end end if add_tag_prefix tag = tag && tag.length > 0 ? "#{add_tag_prefix}.#{tag}" : add_tag_prefix end return tag end
Private Instance Methods
normalize_record(record)
click to toggle source
# File lib/fluent/plugin/out_collectd_unroll.rb, line 35 def normalize_record(record) if record.nil? return record end if !(record.has_key?('values')) || !(record.has_key?('dsnames')) || !(record.has_key?('dstypes')) || !(record.has_key?('host')) || !(record.has_key?('plugin')) || !(record.has_key?('plugin_instance')) || !(record.has_key?('type')) || !(record.has_key?('type_instance')) return record end record['values'].each_with_index { |value, index| @tags = [record['host'].gsub(".","/"), record['plugin'], record['plugin_instance'], record['type'], record['type_instance'], record['dsnames'][index]] tag = @tags.join(".").squeeze(".").gsub(/\.$/, '') record[tag] = value record[record['dsnames'][index]] = value record['dstype_' + record['dsnames'][index]] = record['dstypes'][index] record['dstype'] = record['dstypes'][index] } record.delete('dstypes') record.delete('dsnames') record.delete('values') record end