class Fluent::DetectExceptionsOutput

This output plugin consumes a log stream of JSON objects which contain single-line log messages. If a consecutive sequence of log messages form an exception stack trace, they forwarded as a single, combined JSON object. Otherwise, the input log data is forwarded as is.

Public Instance Methods

before_shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_detect_exceptions.rb, line 67
def before_shutdown
  flush_buffers
  super if defined?(super)
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_detect_exceptions.rb, line 44
def configure(conf)
  super

  if multiline_flush_interval
    @check_flush_interval = [multiline_flush_interval * 0.1, 1].max
  end

  @languages = languages.map(&:to_sym)

  # Maps log stream tags to a corresponding TraceAccumulator.
  @accumulators = {}
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_detect_exceptions.rb, line 80
def emit(tag, es, chain)
  es.each do |time_sec, record|
    process_record(tag, time_sec, record)
  end
  chain.next
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_detect_exceptions.rb, line 72
def shutdown
  # Before shutdown is not available in older fluentd versions.
  # Hence, we make sure that we flush the buffers here as well.
  flush_buffers
  @thread.join if @multiline_flush_interval
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_detect_exceptions.rb, line 57
def start
  super

  if multiline_flush_interval
    @flush_buffer_mutex = Mutex.new
    @stop_check = false
    @thread = Thread.new(&method(:check_flush_loop))
  end
end

Private Instance Methods

check_flush_loop() click to toggle source
# File lib/fluent/plugin/out_detect_exceptions.rb, line 115
def check_flush_loop
  @flush_buffer_mutex.synchronize do
    loop do
      @flush_buffer_mutex.sleep(@check_flush_interval)
      now = Time.now
      break if @stop_check
      @accumulators.each_value do |acc|
        acc.force_flush if now - acc.buffer_start_time >
                           @multiline_flush_interval
      end
    end
  end
rescue
  log.error 'error in check_flush_loop', error: $ERROR_INFO.to_s
  log.error_backtrace
end
flush_buffers() click to toggle source
# File lib/fluent/plugin/out_detect_exceptions.rb, line 108
def flush_buffers
  synchronize do
    @stop_check = true
    @accumulators.each_value(&:force_flush)
  end
end
process_record(tag, time_sec, record) click to toggle source
# File lib/fluent/plugin/out_detect_exceptions.rb, line 89
def process_record(tag, time_sec, record)
  synchronize do
    log_id = [tag]
    log_id.push(record.fetch(@stream, '')) unless @stream.empty?
    unless @accumulators.key?(log_id)
      out_tag = tag.sub(/^#{Regexp.escape(@remove_tag_prefix)}\./, '')
      @accumulators[log_id] =
        Fluent::TraceAccumulator.new(@message, @languages,
                                     max_lines: @max_lines,
                                     max_bytes: @max_bytes,
                                     join_separator: @join_separator) do |t, r|
          router.emit(out_tag, t, r)
        end
    end

    @accumulators[log_id].push(time_sec, record)
  end
end
synchronize() { || ... } click to toggle source
# File lib/fluent/plugin/out_detect_exceptions.rb, line 132
def synchronize(&block)
  if @multiline_flush_interval
    @flush_buffer_mutex.synchronize(&block)
  else
    yield
  end
end