module Fluent::RecordReformerOutputCore
Constants
- BUILTIN_CONFIGURATIONS
Public Class Methods
included(klass)
click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 10 def self.included(klass) klass.config_param :output_tag, :string, :default => nil, # obsolete :desc => 'The output tag name. This option is deprecated. Use `tag` option instead.' klass.config_param :tag, :string, :default => nil, :desc => 'The output tag name.' klass.config_param :remove_keys, :string, :default => nil, :desc => 'Specify record keys to be removed by a string separated by , (comma).' klass.config_param :keep_keys, :string, :default => nil, :desc => 'Specify record keys to be kept by a string separated by , (comma).' klass.config_param :renew_record, :bool, :default => false, :desc => 'Creates an output record newly without extending (merging) the input record fields.' klass.config_param :renew_time_key, :string, :default => nil, :desc => 'Overwrites the time of events with a value of the record field.' klass.config_param :enable_ruby, :bool, :default => true, # true for lower version compatibility :desc => 'Enable to use ruby codes in placeholders.' klass.config_param :auto_typecast, :bool, :default => false, # false for lower version compatibility :desc => 'Automatically cast the field types.' end
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_record_reformer/core.rb, line 6 def initialize super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_record_reformer/core.rb, line 31 def configure(conf) super map = {} conf.each_pair { |k, v| next if BUILTIN_CONFIGURATIONS.include?(k) conf.has_key?(k) # to suppress unread configuration warning map[k] = parse_value(v) } # <record></record> directive conf.elements.select { |element| element.name == 'record' }.each { |element| element.each_pair { |k, v| element.has_key?(k) # to suppress unread configuration warning map[k] = parse_value(v) } } if @remove_keys @remove_keys = @remove_keys.split(',') end if @keep_keys raise Fluent::ConfigError, "out_record_reformer: `renew_record` must be true to use `keep_keys`" unless @renew_record @keep_keys = @keep_keys.split(',') end if @output_tag and @tag.nil? # for lower version compatibility log.warn "out_record_reformer: `output_tag` is deprecated. Use `tag` option instead." @tag = @output_tag end if @tag.nil? raise Fluent::ConfigError, "out_record_reformer: `tag` must be specified" end placeholder_expander_params = { :log => log, :auto_typecast => @auto_typecast, } @placeholder_expander = if @enable_ruby # require utilities which would be used in ruby placeholders require 'pathname' require 'uri' require 'cgi' RubyPlaceholderExpander.new(placeholder_expander_params) else PlaceholderExpander.new(placeholder_expander_params) end @map = @placeholder_expander.preprocess_map(map) @tag = @placeholder_expander.preprocess_map(@tag) @hostname = Socket.gethostname end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 85 def multi_workers_ready? true end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 89 def process(tag, es) tag_parts = tag.split('.') tag_prefix = tag_prefix(tag_parts) tag_suffix = tag_suffix(tag_parts) placeholder_values = { 'tag' => tag, 'tags' => tag_parts, # for old version compatibility 'tag_parts' => tag_parts, 'tag_prefix' => tag_prefix, 'tag_suffix' => tag_suffix, 'hostname' => @hostname, } last_record = nil es.each {|time, record| last_record = record # for debug log placeholder_values.merge!({ 'time' => @placeholder_expander.time_value(time), 'record' => record, }) new_tag, new_record = reform(@tag, record, placeholder_values) if new_tag if @renew_time_key && new_record.has_key?(@renew_time_key) time = new_record[@renew_time_key].to_i end @remove_keys.each {|k| new_record.delete(k) } if @remove_keys router.emit(new_tag, time, new_record) end } rescue => e log.warn "record_reformer: #{e.class} #{e.message} #{e.backtrace.first}" log.debug "record_reformer: tag:#{@tag} map:#{@map} record:#{last_record} placeholder_values:#{placeholder_values}" end
Private Instance Methods
expand_placeholders(value, placeholders)
click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 147 def expand_placeholders(value, placeholders) if value.is_a?(String) new_value = @placeholder_expander.expand(value, placeholders) elsif value.is_a?(Hash) new_value = {} value.each_pair do |k, v| new_key = @placeholder_expander.expand(k, placeholders, true) new_value[new_key] = expand_placeholders(v, placeholders) end elsif value.is_a?(Array) new_value = [] value.each_with_index do |v, i| new_value[i] = expand_placeholders(v, placeholders) end else new_value = value end new_value end
parse_value(value_str)
click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 124 def parse_value(value_str) if value_str.start_with?('{', '[') JSON.parse(value_str) else value_str end rescue => e log.warn "failed to parse #{value_str} as json. Assuming #{value_str} is a string", :error_class => e.class, :error => e.message value_str # emit as string end
reform(tag, record, placeholder_values)
click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 135 def reform(tag, record, placeholder_values) placeholders = @placeholder_expander.prepare_placeholders(placeholder_values) new_tag = expand_placeholders(tag, placeholders) new_record = @renew_record ? {} : record.dup @keep_keys.each {|k| new_record[k] = record[k]} if @keep_keys and @renew_record new_record.merge!(expand_placeholders(@map, placeholders)) [new_tag, new_record] end
tag_prefix(tag_parts)
click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 167 def tag_prefix(tag_parts) return [] if tag_parts.empty? tag_prefix = [tag_parts.first] 1.upto(tag_parts.size-1).each do |i| tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}" end tag_prefix end
tag_suffix(tag_parts)
click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 176 def tag_suffix(tag_parts) return [] if tag_parts.empty? rev_tag_parts = tag_parts.reverse rev_tag_suffix = [rev_tag_parts.first] 1.upto(tag_parts.size-1).each do |i| rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}" end rev_tag_suffix.reverse! end