class Fluent::Plugin::RecordModifierOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_record_modifier.rb, line 55 def configure(conf) super @map = {} @to_enc = nil if @char_encoding from, to = @char_encoding.split(':', 2) @from_enc = Encoding.find(from) @to_enc = Encoding.find(to) if to m = if @to_enc method(:convert_encoding) else method(:set_encoding) end (class << self; self; end).module_eval do define_method(:change_encoding, m) end end @has_tag_parts = false conf.elements.select { |element| element.name == 'record' }.each do |element| element.each_pair do |k, v| element.has_key?(k) # to suppress unread configuration warning @has_tag_parts = true if v.include?('tag_parts') @map[k] = DynamicExpander.new(k, v, @prepare_value) end end if @remove_keys and @whitelist_keys raise Fluent::ConfigError, "remove_keys and whitelist_keys are exclusive with each other." elsif @remove_keys @remove_keys = @remove_keys.split(',').map(&:strip) elsif @whitelist_keys @whitelist_keys = @whitelist_keys.split(',').map(&:strip) @whitelist_keys.concat(@map.keys).uniq! end @has_tag_parts = true if @tag.include?('tag_parts') @tag_ex = DynamicExpander.new('tag', @tag, @prepare_value) # Collect DynamicExpander related garbage instructions GC.start end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_record_modifier.rb, line 101 def multi_workers_ready? true end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_record_modifier.rb, line 105 def process(tag, es) tag_parts = @has_tag_parts ? tag.split('.') : nil if @tag_ex.param_value.nil? result = {} es.each { |time, record| new_record = modify_record(tag, time, record, tag_parts) new_tag = @tag_ex.expand(tag, time, new_record, tag_parts) result[new_tag] ||= MultiEventStream.new result[new_tag].add(time, new_record) } result.each { |tag, stream| router.emit_stream(tag, stream) } else stream = MultiEventStream.new es.each { |time, record| new_record = modify_record(tag, time, record, tag_parts) stream.add(time, new_record) } router.emit_stream(@tag, stream) end end
Private Instance Methods
convert_encoding(value)
click to toggle source
# File lib/fluent/plugin/out_record_modifier.rb, line 178 def convert_encoding(value) if value.is_a?(String) value.force_encoding(@from_enc) if value.encoding == Encoding::BINARY value.encode!(@to_enc, @from_enc, :invalid => :replace, :undef => :replace) elsif value.is_a?(Hash) value.each_pair { |k, v| if v.frozen? && v.is_a?(String) value[k] = convert_encoding(v.dup) else convert_encoding(v) end } elsif value.is_a?(Array) value.each { |v| convert_encoding(v) } else value end end
modify_record(tag, time, record, tag_parts)
click to toggle source
# File lib/fluent/plugin/out_record_modifier.rb, line 130 def modify_record(tag, time, record, tag_parts) @map.each_pair { |k, v| record[k] = v.expand(tag, time, record, tag_parts) } if @remove_keys @remove_keys.each { |v| record.delete(v) } elsif @whitelist_keys modified = {} record.each do |k, v| modified[k] = v if @whitelist_keys.include?(k) end record = modified end unless @replaces.empty? @replaces.each { |replace| target_key = replace.key if record.include?(target_key) && replace.expression.match(record[target_key]) record[target_key] = record[target_key].gsub(replace.expression, replace.replace) end } end record = change_encoding(record) if @char_encoding record end
set_encoding(value)
click to toggle source
# File lib/fluent/plugin/out_record_modifier.rb, line 160 def set_encoding(value) if value.is_a?(String) value.force_encoding(@from_enc) elsif value.is_a?(Hash) value.each_pair { |k, v| if v.frozen? && v.is_a?(String) value[k] = set_encoding(v.dup) else set_encoding(v) end } elsif value.is_a?(Array) value.each { |v| set_encoding(v) } else value end end