class Fluent::Plugin::ConcatFilter

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_concat.rb, line 51
def initialize
  super

  @buffer = Hash.new {|h, k| h[k] = [] }
  @timeout_map_mutex = Thread::Mutex.new
  @timeout_map_mutex.synchronize do
    @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
  end
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_concat.rb, line 67
def configure(conf)
  super

  params, names = required_params
  if params.all?
    raise Fluent::ConfigError, "Either #{[names[0..-2].join(", "), names[-1]].join(" or ")} is required"
  end
  if @n_lines && (@multiline_start_regexp || @multiline_end_regexp)
    raise Fluent::ConfigError, "n_lines and multiline_start_regexp/multiline_end_regexp are exclusive"
  end
  if @partial_key && @n_lines
    raise Fluent::ConfigError, "partial_key and n_lines are exclusive"
  end
  if @partial_key && (@multiline_start_regexp || @multiline_end_regexp)
    raise Fluent::ConfigError, "partial_key and multiline_start_regexp/multiline_end_regexp are exclusive"
  end
  if @partial_key && @partial_value.nil?
    raise Fluent::ConfigError, "partial_value is required when partial_key is specified"
  end
  if @use_partial_metadata && @n_lines
    raise Fluent::ConfigError, "use_partial_metadata and n_lines are exclusive"
  end
  if @use_partial_metadata && (@multiline_start_regexp || @multiline_end_regexp)
    raise Fluent::ConfigError, "use_partial_metadata and multiline_start_regexp/multiline_end_regexp are exclusive"
  end
  if @use_partial_metadata && @partial_key
    raise Fluent::ConfigError, "use_partial_metadata and partial_key are exclusive"
  end
  if @use_partial_cri_logtag && @n_lines
    raise Fluent::ConfigError, "use_partial_cri_logtag and n_lines are exclusive"
  end
  if @use_partial_cri_logtag && (@multiline_start_regexp || @multiline_end_regexp)
    raise Fluent::ConfigError, "use_partial_cri_logtag and multiline_start_regexp/multiline_end_regexp are exclusive"
  end
  if @use_partial_cri_logtag && @partial_key
    raise Fluent::ConfigError, "use_partial_cri_logtag and partial_key are exclusive"
  end

  @mode = nil
  case
  when @n_lines
    @mode = :line
  when @partial_key
    @mode = :partial
  when @use_partial_metadata
    @mode = :partial_metadata
    case @partial_metadata_format
    when :"docker-fluentd"
      @partial_message_field     = "partial_message".freeze
      @partial_id_field          = "partial_id".freeze
      @partial_ordinal_field     = "partial_ordinal".freeze
      @partial_last_field        = "partial_last".freeze
      @partial_message_indicator = @partial_message_field
    when :"docker-journald"
      @partial_message_field     = "CONTAINER_PARTIAL_MESSAGE".freeze
      @partial_id_field          = "CONTAINER_PARTIAL_ID".freeze
      @partial_ordinal_field     = "CONTAINER_PARTIAL_ORDINAL".freeze
      @partial_last_field        = "CONTAINER_PARTIAL_LAST".freeze
      # the journald log driver does not add CONTAINER_PARTIAL_MESSAGE to the last message
      # so we help ourself by using another indicator
      @partial_message_indicator = @partial_id_field
    when :"docker-journald-lowercase"
      @partial_message_field     = "container_partial_message".freeze
      @partial_id_field          = "container_partial_id".freeze
      @partial_ordinal_field     = "container_partial_ordinal".freeze
      @partial_last_field        = "container_partial_last".freeze
      @partial_message_indicator = @partial_id_field
    end
  when @use_partial_cri_logtag
    @mode = :partial_cri
    @partial_logtag_delimiter = ":".freeze
    @partial_logtag_continue = "P".freeze
    @partial_logtag_full = "F".freeze
  when @multiline_start_regexp || @multiline_end_regexp
    @mode = :regexp
    if @multiline_start_regexp
      @multiline_start_regexp = Regexp.compile(@multiline_start_regexp[1..-2])
    end
    if @multiline_end_regexp
      @multiline_end_regexp = Regexp.compile(@multiline_end_regexp[1..-2])
    end
    if @continuous_line_regexp
      @continuous_line_regexp = Regexp.compile(@continuous_line_regexp[1..-2])
    end
  end
end
filter_stream(tag, es) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 166
def filter_stream(tag, es)
  if /\Afluent\.(?:trace|debug|info|warn|error|fatal)\z/ =~ tag
    return es
  end

  new_es = Fluent::MultiEventStream.new
  es.each do |time, record|
    unless record.key?(@key)
      new_es.add(time, record)
      next
    end
    if @mode == :partial
      unless record.key?(@partial_key)
        new_es.add(time, record)
        next
      end
    end
    if @mode == :partial_metadata
      unless record.key?(@partial_message_indicator)
        new_es.add(time, record)
        next
      end
    end
    begin
      flushed_es = process(tag, time, record)
      unless flushed_es.empty?
        flushed_es.each do |_time, new_record|
          time = _time if @use_first_timestamp
          merged_record = record.merge(new_record)
          case @mode
          when :partial
            merged_record.delete(@partial_key) unless @keep_partial_key
          when :partial_metadata
            unless @keep_partial_metadata
              merged_record.delete(@partial_message_field)
              merged_record.delete(@partial_id_field)
              merged_record.delete(@partial_ordinal_field)
              merged_record.delete(@partial_last_field)
            end
          when :partial_cri
            merged_record.delete(@partial_cri_logtag_key) unless @keep_partial_key
            merged_record.delete(@partial_cri_stream_key)
          end
          new_es.add(time, merged_record)
        end
      end
    rescue => e
      router.emit_error_event(tag, time, record, e)
    end
  end
  new_es
end
required_params() click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 61
def required_params
  params = [@n_lines.nil?, @multiline_start_regexp.nil?, @multiline_end_regexp.nil?, @partial_key.nil?, !@use_partial_metadata, !@use_partial_cri_logtag]
  names = ["n_lines", "multiline_start_regexp", "multiline_end_regexp", "partial_key", "use_partial_metadata", "use_partial_cri_logtag"]
  return params, names
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_concat.rb, line 160
def shutdown
  @finished = true
  flush_remaining_buffer
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_concat.rb, line 154
def start
  super
  @finished = false
  timer_execute(:filter_concat_timer, 1, &method(:on_timer))
end

Private Instance Methods

continuous_line?(text) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 366
def continuous_line?(text)
  if @continuous_line_regexp
    @continuous_line_regexp.match?(text)
  else
    true
  end
end
firstline?(text) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 358
def firstline?(text)
  @multiline_start_regexp && @multiline_start_regexp.match?(text)
end
flush_buffer(stream_identity, new_element = nil) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 374
def flush_buffer(stream_identity, new_element = nil)
  lines = if @mode == :partial_metadata
            @buffer[stream_identity]
              .sort_by {|_tag, _time, record| record[@partial_ordinal_field].to_i }
              .map {|_tag, _time, record| record[@key] }
          else
            @buffer[stream_identity].map {|_tag, _time, record| record[@key] }
          end
  _tag, time, first_record = @buffer[stream_identity].first
  new_record = {
    @key => lines.join(@separator)
  }
  @buffer[stream_identity] = []
  @buffer[stream_identity] << new_element if new_element
  [time, first_record.merge(new_record)]
end
flush_remaining_buffer() click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 411
def flush_remaining_buffer
  @buffer.each do |stream_identity, elements|
    next if elements.empty?

    lines = elements.map {|_tag, _time, record| record[@key] }
    new_record = {
      @key => lines.join(@separator)
    }
    tag, time, record = elements.first
    message = "Flush remaining buffer: #{stream_identity}"
    handle_timeout_error(tag, time, record.merge(new_record), message)
    log.info(message)
  end
  @buffer.clear
end
flush_timeout_buffer() click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 391
def flush_timeout_buffer
  now = Fluent::Engine.now
  timeout_stream_identities = []
  @timeout_map_mutex.synchronize do
    @timeout_map.each do |stream_identity, previous_timestamp|
      next if @flush_interval > (now - previous_timestamp)
      next if @buffer[stream_identity].empty?
      time, flushed_record = flush_buffer(stream_identity)
      timeout_stream_identities << stream_identity
      tag = stream_identity.split(":").first
      message = "Timeout flush: #{stream_identity}"
      handle_timeout_error(tag, @use_first_timestamp ? time : now, flushed_record, message)
      log.info(message)
    end
    @timeout_map.reject! do |stream_identity, _|
      timeout_stream_identities.include?(stream_identity)
    end
  end
end
handle_timeout_error(tag, time, record, message) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 427
def handle_timeout_error(tag, time, record, message)
  if @timeout_label
    event_router = event_emitter_router(@timeout_label)
    event_router.emit(tag, time, record)
  else
    router.emit_error_event(tag, time, record, TimeoutError.new(message))
  end
end
lastline?(text) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 362
def lastline?(text)
  @multiline_end_regexp && @multiline_end_regexp.match?(text)
end
on_timer() click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 221
def on_timer
  return if @flush_interval <= 0
  return if @finished
  flush_timeout_buffer
rescue => e
  log.error "failed to flush timeout buffer", error: e
end
process(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 229
def process(tag, time, record)
  if @mode == :partial_metadata
    if @stream_identity_key
      stream_identity = %Q(#{tag}:#{record[@stream_identity_key]}#{record[@partial_id_field]})
    else
      stream_identity = %Q(#{tag}:#{record[@partial_id_field]})
    end
  else
    if @stream_identity_key
      stream_identity = "#{tag}:#{record[@stream_identity_key]}"
    else
      stream_identity = "#{tag}:default"
    end
  end
  @timeout_map_mutex.synchronize do
    @timeout_map[stream_identity] = Fluent::Engine.now
  end
  case @mode
  when :line
    process_line(stream_identity, tag, time, record)
  when :partial
    process_partial(stream_identity, tag, time, record)
  when :partial_metadata
    process_partial_metadata(stream_identity, tag, time, record)
  when :partial_cri
    process_partial_cri(stream_identity, tag, time, record)
  when :regexp
    process_regexp(stream_identity, tag, time, record)
  end
end
process_line(stream_identity, tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 260
def process_line(stream_identity, tag, time, record)
  new_es = Fluent::MultiEventStream.new
  @buffer[stream_identity] << [tag, time, record]
  if @buffer[stream_identity].size >= @n_lines
    new_time, new_record = flush_buffer(stream_identity)
    time = new_time if @use_first_timestamp
    new_es.add(time, new_record)
  end
  new_es
end
process_partial(stream_identity, tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 271
def process_partial(stream_identity, tag, time, record)
  new_es = Fluent::MultiEventStream.new
  @buffer[stream_identity] << [tag, time, record]
  unless @partial_value == record[@partial_key]
    new_time, new_record = flush_buffer(stream_identity)
    time = new_time if @use_first_timestamp
    new_record.delete(@partial_key)
    new_es.add(time, new_record)
  end
  new_es
end
process_partial_cri(stream_identity, tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 283
def process_partial_cri(stream_identity, tag, time, record)
  new_es = Fluent::MultiEventStream.new
  @buffer[stream_identity] << [tag, time, record]
  if record[@partial_cri_logtag_key].split(@partial_logtag_delimiter)[0] == @partial_logtag_full
    new_time, new_record = flush_buffer(stream_identity)
    time = new_time if @use_first_timestamp
    new_record.delete(@partial_cri_logtag_key)
    new_es.add(time, new_record)
  end
  new_es
end
process_partial_metadata(stream_identity, tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 295
def process_partial_metadata(stream_identity, tag, time, record)
  new_es = Fluent::MultiEventStream.new
  @buffer[stream_identity] << [tag, time, record]
  if record[@partial_last_field] == "true"
    new_time, new_record = flush_buffer(stream_identity)
    time = new_time if @use_first_timestamp
    new_record.delete(@partial_key)
    new_es.add(time, new_record)
  end
  new_es
end
process_regexp(stream_identity, tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_concat.rb, line 307
def process_regexp(stream_identity, tag, time, record)
  new_es = Fluent::MultiEventStream.new
  case
  when firstline?(record[@key])
    if @buffer[stream_identity].empty?
      @buffer[stream_identity] << [tag, time, record]
      if lastline?(record[@key])
        new_time, new_record = flush_buffer(stream_identity)
        time = new_time if @use_first_timestamp
        new_es.add(time, new_record)
      end
    else
      new_time, new_record = flush_buffer(stream_identity, [tag, time, record])
      time = new_time if @use_first_timestamp
      new_es.add(time, new_record)
      if lastline?(record[@key])
        new_time, new_record = flush_buffer(stream_identity)
        time = new_time if @use_first_timestamp
        new_es.add(time, new_record)
      end
      return new_es
    end
  when lastline?(record[@key])
    @buffer[stream_identity] << [tag, time, record]
    new_time, new_record = flush_buffer(stream_identity)
    time = new_time if @use_first_timestamp
    new_es.add(time, new_record)
    return new_es
  else
    if @buffer[stream_identity].empty?
      if !@multiline_start_regexp
        @buffer[stream_identity] << [tag, time, record]
      else
        new_es.add(time, record)
        return new_es
      end
    else
      if continuous_line?(record[@key])
        # Continuation of the previous line
        @buffer[stream_identity] << [tag, time, record]
      else
        new_time, new_record = flush_buffer(stream_identity)
        time = new_time if @use_first_timestamp
        new_es.add(time, new_record)
        new_es.add(time, record)
      end
    end
  end
  new_es
end