class Fluent::Plugin::RecordsMergerOutput

fluentd records merger

Constants

PATTERN_MAX_NUM

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_records_merger.rb, line 51
def configure(conf)
  super
  # conf: {"@type"=>"records_merger", "tag"=>"merged1", "main_tag"=>"accesslog.1.main", "sub_tag1"=>"accesslog.1.sub1", "sub_tag2"=>"accesslog.1.sub2"}, []
  @tag = conf['tag'] # これも,結局 ${tag} とかを有効にする関係で使わなくなりそう.
  # set the tags set by the user
  @last_records = {}
  @main_tag = conf['main_tag']

  @sub_tags = []
  (1..PATTERN_MAX_NUM).each do |i|
    next unless conf["sub_tag#{i}"]

    @sub_tags.push(conf["sub_tag#{i}"])
  end

  if conf['merge_timing']
    @merge_timing = conf['merge_timing']
  end

  @tolerable_time_range = conf['tolerable_time_range'].to_i

  @flags4merge = Array.new(@sub_tags.size + 1, 0)
  @force_emit = conf['force_emit']

  # from out-record-reformer
  map = {}
  conf.elements.select { |element| element.name == 'record' }.each do |element|
    element.each_pair do |k, v|
      element.key?(k) # to suppress unread configuration warning
      map[k] = parse_value(v)
    end
  end

  placeholder_expander_params = {
    log: log,
    auto_typecast: @auto_typecast
  }
  @placeholder_expander = PlaceholderExpander.new(placeholder_expander_params)
  @map = @placeholder_expander.preprocess_map(map)
  @tag = @placeholder_expander.preprocess_map(@tag)
  @hostname = Socket.gethostname
  @condition = conf['condition']
  @keep_records = conf['keep_records']
  if conf['sub_is_must'] == "false"
    @sub_is_must = false
  else
    @sub_is_must = true
  end
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_records_merger.rb, line 109
def process(tag, es)
  if @merge_timing == 'before'
    if @main_tag == tag
      store_to_lastrecords(tag, es)
      @flags4merge[0] = 1
      check_timegap if @tolerable_time_range > 0
      emit_new_event(es) if (@flags4merge.all? { |flag| flag == 1 } || @sub_is_must == false)
    elsif @sub_tags.include?(tag)
      store_to_lastrecords(tag, es)
      @flags4merge[@sub_tags.index(tag) + 1] = 1
      check_timegap if @tolerable_time_range > 0
    end
  elsif @merge_timing == 'after'
    if @main_tag == tag
      store_to_lastrecords(tag, es)
      @flags4merge[0] = 1
      check_timegap if @tolerable_time_range > 0
    elsif @sub_tags.include?(tag)
      if @flags4merge[0] == 1
        store_to_lastrecords(tag, es)
        @flags4merge[@sub_tags.index(tag) + 1] = 1
        check_timegap if @tolerable_time_range > 0
        emit_new_event(es) if (@flags4merge.all? {|flag| flag == 1}  || @sub_is_must == false)
      end
    else
      p 'something else has come! check it!' + tag.to_s
    end
  elsif @merge_timing == 'simple'
    if @main_tag == tag
      store_to_lastrecords(tag, es)
      @flags4merge[0] = 1
      check_timegap if @tolerable_time_range > 0
      emit_new_event(es) if (@flags4merge.all? { |flag| flag == 1 }  || @sub_is_must == false)
    elsif @sub_tags.include?(tag)
      store_to_lastrecords(tag, es)
      @flags4merge[@sub_tags.index(tag) + 1] = 1
      check_timegap if @tolerable_time_range > 0
      emit_new_event(es) if (@flags4merge.all? { |flag| flag == 1 }  || @sub_is_must == false)
    else
      p 'something else has come! check it!' + tag.to_s
    end
  end
rescue StandardError => e
  log.warn "record_reformer: #{e.class} #{e.message} #{e.backtrace.first}"
  log.debug "record_reformer: tag:#{@tag} map:#{@map} record:#{@last_records} placeholder_values"
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_records_merger.rb, line 105
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_records_merger.rb, line 101
def start
  super
end

Private Instance Methods

check_timegap() click to toggle source
# File lib/fluent/plugin/out_records_merger.rb, line 234
def check_timegap
  tag_time_hash = {}
  if @last_records.size > 1
    @last_records.each do |k, v|
      tag_time_hash.store(k, v['time'].sec)
    end
    # ここでtag_nameと一緒に保持しておきたい
    latest = tag_time_hash.max { |a, b| a[1] <=> b[1] }
    oldest = tag_time_hash.min { |a, b| a[1] <=> b[1] }

    if latest[1] - oldest[1] > @tolerable_time_range
      @last_records.delete(oldest[0])
      if oldest[0] == @main_tag
        @flags4merge[0] = 0
      else
        @flags4merge[@sub_tags.index(oldest[0]) + 1] = 0
      end
    end
  end
end
condition_is_ok?(placeholder_values) click to toggle source
# File lib/fluent/plugin/out_records_merger.rb, line 229
def condition_is_ok?(placeholder_values)
  placeholders = @placeholder_expander.prepare_placeholders(placeholder_values)
  instance_eval(@condition.gsub(/((main\["\w+"\])|(sub[0-9]{1,2}\["\w+"\])|(sub[0-9]{1,2})|main)/, 'placeholders[\'${\1}\']'))
end
emit_new_event(es) click to toggle source
# File lib/fluent/plugin/out_records_merger.rb, line 205
def emit_new_event(es)
  es.each do |time, _record|
    placeholder_values = generate_placeholders
    message = @last_records
    next if message.keys.empty?

    new_tag, new_record = reform(@tag, @last_records, placeholder_values)
    next unless new_tag

    unless @condition.nil?
      # conditionがOKじゃなければemiしない
      next unless condition_is_ok?(placeholder_values)
    end

    router.emit(new_tag, time, new_record)
    # flagの初期化
    unless @keep_records
      (0..@flags4merge.length - 1).each do |i|
        @flags4merge[i] = 0
      end
    end
  end
end
expand_placeholders(value, placeholders) click to toggle source
# File lib/fluent/plugin/out_records_merger.rb, line 281
def expand_placeholders(value, placeholders)
  if value.is_a?(String)
    new_value = @placeholder_expander.expand(value, placeholders)
  elsif value.is_a?(Hash)
    # ここが<record></record>を処理してる部分
    new_value = {}
    # ここで分割して,再帰的に実行している感じ.
    value.each_pair do |k, v|
      new_key = @placeholder_expander.expand(k, placeholders, true)
      new_value[new_key] = expand_placeholders(v, placeholders)
    end
  elsif value.is_a?(Array)
    # これはvalueがArrayで与えられてたら連結するために入ってる.
    new_value = []
    value.each_with_index do |v, i|
      new_value[i] = expand_placeholders(v, placeholders)
    end
  else
    new_value = value
  end
  new_value
end
generate_placeholders() click to toggle source
# File lib/fluent/plugin/out_records_merger.rb, line 168
def generate_placeholders
  # here, tag can be used directly only because it is the main tag
  main_tag_parts = @main_tag.split('.')
  main_tag_prefix = tag_prefix(main_tag_parts)
  main_tag_suffix = tag_suffix(main_tag_parts)

  placeholder_values = {
    'main' => @last_records[@main_tag],
    'main_tag' => @main_tag,
    'main_tags' => main_tag_parts,
    'main_tag_parts' => main_tag_parts,
    'main_tag_prefix' => main_tag_prefix,
    'main_tag_suffix' => main_tag_suffix,
    'hostname' => @hostname
  }

  # SubTagに関する内容を追加
  (1..PATTERN_MAX_NUM).each do |i|
    next unless @last_records.key?(@sub_tags[i - 1])

    # p @sub_tags[i-1]
    sub_tag_parts = @sub_tags[i - 1].split('.')
    sub_tag_prefix = tag_prefix(sub_tag_parts)
    sub_tag_suffix = tag_suffix(sub_tag_parts)
    placeholder_values.merge!(
      "sub#{i}" => @last_records[@sub_tags[i - 1]],
      "sub#{i}_tag" => @sub_tags[i - 1],
      "sub#{i}_tags" => sub_tag_parts,
      "sub#{i}_tag_parts" => sub_tag_parts,
      "sub#{i}_tag_prefix" => sub_tag_prefix,
      "sub#{i}_tag_suffix" => sub_tag_suffix
    )
  end

  placeholder_values
end
parse_value(value_str) click to toggle source
# File lib/fluent/plugin/out_records_merger.rb, line 255
def parse_value(value_str)
  if value_str.start_with?('{', '[')
    JSON.parse(value_str)
  else
    value_str
  end
rescue StandardError => e
  log.warn "failed to parse #{value_str} as json. Assuming #{value_str} is a string", error_class: e.class, error: e.message
  value_str # emit as string
end
reform(tag, _record, placeholder_values) click to toggle source
# File lib/fluent/plugin/out_records_merger.rb, line 266
def reform(tag, _record, placeholder_values)
  placeholders = @placeholder_expander.prepare_placeholders(placeholder_values)

  new_tag = expand_placeholders(tag, placeholders) # 新しいtagを生成してる

  # 基本的に,全部ゼロから作るからここでdupする必要はない.
  # new_record = @renew_record ? {} : record.dup # dupは新しいのを作ってコピーするやつ
  # @keep_keys.each {|k| new_record[k] = record[k]} if @keep_keys and @renew_record

  new_record = {}
  new_record.merge!(expand_placeholders(@map, placeholders))

  [new_tag, new_record]
end
store_to_lastrecords(tag, es) click to toggle source

そのまま持ってきてしまっているので,rewriteしたほうが良さそう?

# File lib/fluent/plugin/out_records_merger.rb, line 160
def store_to_lastrecords(tag, es)
  es.each do |time, record|
    record.store('time', time)
    @last_records.store(tag, record)
    # これで,last_recordsにはsub_tagの情報がそのまま入ってる.ちなみに,storeはkey, valueの形でハッシュに値を入れるために使う.´
  end
end
tag_prefix(tag_parts) click to toggle source
# File lib/fluent/plugin/out_records_merger.rb, line 304
def tag_prefix(tag_parts)
  return [] if tag_parts.empty?

  tag_prefix = [tag_parts.first]
  1.upto(tag_parts.size - 1).each do |i|
    tag_prefix[i] = "#{tag_prefix[i - 1]}.#{tag_parts[i]}"
  end
  tag_prefix
end
tag_suffix(tag_parts) click to toggle source
# File lib/fluent/plugin/out_records_merger.rb, line 314
def tag_suffix(tag_parts)
  return [] if tag_parts.empty?

  rev_tag_parts = tag_parts.reverse
  rev_tag_suffix = [rev_tag_parts.first]
  1.upto(tag_parts.size - 1).each do |i|
    rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i - 1]}"
  end
  rev_tag_suffix.reverse!
end