module Fluent::RecordReformerOutputCore

Constants

BUILTIN_CONFIGURATIONS

Public Class Methods

included(klass) click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 10
def self.included(klass)
  klass.config_param :output_tag, :string, :default => nil, # obsolete
    :desc => 'The output tag name. This option is deprecated. Use `tag` option instead.'
  klass.config_param :tag, :string, :default => nil,
    :desc => 'The output tag name.'
  klass.config_param :remove_keys, :string, :default => nil,
    :desc => 'Specify record keys to be removed by a string separated by , (comma).'
  klass.config_param :keep_keys, :string, :default => nil,
    :desc => 'Specify record keys to be kept by a string separated by , (comma).'
  klass.config_param :renew_record, :bool, :default => false,
    :desc => 'Creates an output record newly without extending (merging) the input record fields.'
  klass.config_param :renew_time_key, :string, :default => nil,
    :desc => 'Overwrites the time of events with a value of the record field.'
  klass.config_param :enable_ruby, :bool, :default => true, # true for lower version compatibility
    :desc => 'Enable to use ruby codes in placeholders.'
  klass.config_param :auto_typecast, :bool, :default => false, # false for lower version compatibility
    :desc => 'Automatically cast the field types.'
end
new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_record_reformer/core.rb, line 6
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_record_reformer/core.rb, line 31
def configure(conf)
  super

  map = {}
  conf.each_pair { |k, v|
    next if BUILTIN_CONFIGURATIONS.include?(k)
    conf.has_key?(k) # to suppress unread configuration warning
    map[k] = parse_value(v)
  }
  # <record></record> directive
  conf.elements.select { |element| element.name == 'record' }.each { |element|
    element.each_pair { |k, v|
      element.has_key?(k) # to suppress unread configuration warning
      map[k] = parse_value(v)
    }
  }

  if @remove_keys
    @remove_keys = @remove_keys.split(',')
  end

  if @keep_keys
    raise Fluent::ConfigError, "out_record_reformer: `renew_record` must be true to use `keep_keys`" unless @renew_record
    @keep_keys = @keep_keys.split(',')
  end

  if @output_tag and @tag.nil? # for lower version compatibility
    log.warn "out_record_reformer: `output_tag` is deprecated. Use `tag` option instead."
    @tag = @output_tag
  end
  if @tag.nil?
    raise Fluent::ConfigError, "out_record_reformer: `tag` must be specified"
  end

  placeholder_expander_params = {
    :log           => log,
    :auto_typecast => @auto_typecast,
  }
  @placeholder_expander =
    if @enable_ruby
      # require utilities which would be used in ruby placeholders
      require 'pathname'
      require 'uri'
      require 'cgi'
      RubyPlaceholderExpander.new(placeholder_expander_params)
    else
      PlaceholderExpander.new(placeholder_expander_params)
    end
  @map = @placeholder_expander.preprocess_map(map)
  @tag = @placeholder_expander.preprocess_map(@tag)

  @hostname = Socket.gethostname
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 85
def multi_workers_ready?
  true
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 89
def process(tag, es)
  tag_parts = tag.split('.')
  tag_prefix = tag_prefix(tag_parts)
  tag_suffix = tag_suffix(tag_parts)
  placeholder_values = {
    'tag'        => tag,
    'tags'       => tag_parts, # for old version compatibility
    'tag_parts'  => tag_parts,
    'tag_prefix' => tag_prefix,
    'tag_suffix' => tag_suffix,
    'hostname'   => @hostname,
  }
  last_record = nil
  es.each {|time, record|
    last_record = record # for debug log
    placeholder_values.merge!({
      'time'     => @placeholder_expander.time_value(time),
      'record'   => record,
    })
    new_tag, new_record = reform(@tag, record, placeholder_values)
    if new_tag
      if @renew_time_key && new_record.has_key?(@renew_time_key)
        time = new_record[@renew_time_key].to_i
      end
      @remove_keys.each {|k| new_record.delete(k) } if @remove_keys
      router.emit(new_tag, time, new_record)
    end
  }
rescue => e
  log.warn "record_reformer: #{e.class} #{e.message} #{e.backtrace.first}"
  log.debug "record_reformer: tag:#{@tag} map:#{@map} record:#{last_record} placeholder_values:#{placeholder_values}"
end

Private Instance Methods

expand_placeholders(value, placeholders) click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 147
def expand_placeholders(value, placeholders)
  if value.is_a?(String)
    new_value = @placeholder_expander.expand(value, placeholders)
  elsif value.is_a?(Hash)
    new_value = {}
    value.each_pair do |k, v|
      new_key = @placeholder_expander.expand(k, placeholders, true)
      new_value[new_key] = expand_placeholders(v, placeholders)
    end
  elsif value.is_a?(Array)
    new_value = []
    value.each_with_index do |v, i|
      new_value[i] = expand_placeholders(v, placeholders)
    end
  else
    new_value = value
  end
  new_value
end
parse_value(value_str) click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 124
def parse_value(value_str)
  if value_str.start_with?('{', '[')
    JSON.parse(value_str)
  else
    value_str
  end
rescue => e
  log.warn "failed to parse #{value_str} as json. Assuming #{value_str} is a string", :error_class => e.class, :error => e.message
  value_str # emit as string
end
reform(tag, record, placeholder_values) click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 135
def reform(tag, record, placeholder_values)
  placeholders = @placeholder_expander.prepare_placeholders(placeholder_values)

  new_tag = expand_placeholders(tag, placeholders)

  new_record = @renew_record ? {} : record.dup
  @keep_keys.each {|k| new_record[k] = record[k]} if @keep_keys and @renew_record
  new_record.merge!(expand_placeholders(@map, placeholders))

  [new_tag, new_record]
end
tag_prefix(tag_parts) click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 167
def tag_prefix(tag_parts)
  return [] if tag_parts.empty?
  tag_prefix = [tag_parts.first]
  1.upto(tag_parts.size-1).each do |i|
    tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}"
  end
  tag_prefix
end
tag_suffix(tag_parts) click to toggle source
# File lib/fluent/plugin/out_record_reformer/core.rb, line 176
def tag_suffix(tag_parts)
  return [] if tag_parts.empty?
  rev_tag_parts = tag_parts.reverse
  rev_tag_suffix = [rev_tag_parts.first]
  1.upto(tag_parts.size-1).each do |i|
    rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}"
  end
  rev_tag_suffix.reverse!
end