class LogStash::Codecs::Joinlines

Public Instance Methods

accept(listener) click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 144
def accept(listener)
  # memoize references to listener that holds upstream state
  @previous_listener = @last_seen_listener || listener
  @last_seen_listener = listener

  internal_decode(listener.data) do |event,what|
    what_based_listener(what).process_event(event)
  end
end
auto_flush(listener = @last_seen_listener) click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 226
def auto_flush(listener = @last_seen_listener)
  return if listener.nil?

  flush do |event|
    listener.process_event(event)
  end
end
auto_flush_active?() click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 288
def auto_flush_active?
  !@auto_flush_interval.nil?
end
auto_flush_runner() click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 292
def auto_flush_runner
  @auto_flush_runner || AutoFlushUnset.new(nil, nil)
end
buffer(text) click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 204
def buffer(text)
  @buffer_bytes += text.bytesize
  @buffer.push(text)
end
buffer_over_limits?() click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 275
def buffer_over_limits?
  over_maximum_lines? || over_maximum_bytes?
end
close() click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 284
def close
  auto_flush_runner.stop
end
decode(text) { |event| ... } click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 198
def decode(text, &block)
  internal_decode(text) do |event,what|
    yield(event)
  end
end
do_next(text, matched, &block) click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 255
def do_next(text, matched, &block)
  buffer(text)
  auto_flush_runner.start
  flush(&block) if !matched || buffer_over_limits?
end
do_previous(text, matched, &block) click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 261
def do_previous(text, matched, &block)
  flush(&block) if !matched || buffer_over_limits?
  auto_flush_runner.start
  buffer(text)
end
doing_previous?(what) click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 247
def doing_previous?(what)
  what != "next"
end
encode(event) click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 279
def encode(event)
  # Nothing to do.
  @on_event.call(event, event)
end
flush() { |events| ... } click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 209
def flush(&block)
  if block_given? && @buffer.any?
    no_error = true
    events = merge_events
    begin
      yield events
    rescue ::Exception => e
      # need to rescue everything
      # likliest cause: backpressure or timeout by exception
      # can't really do anything but leave the data in the buffer for next time if there is one
      @logger.error("Joinlines: flush downstream error", :exception => e)
      no_error = false
    end
    reset_buffer if no_error
  end
end
initialize_copy(source) click to toggle source
Calls superclass method
# File lib/logstash/codecs/joinlines.rb, line 296
def initialize_copy(source)
  super
  register
end
internal_decode(text) { |event,matching| ... } click to toggle source

private

# File lib/logstash/codecs/joinlines.rb, line 159
def internal_decode(text, &block)
  do_flush = false
  text = @converter.convert(text)
  text.split("\n").each do |line|
    matched = false
    zip_config.each do |pattern,what,negate,grok,handler|
      match = grok.match(line)
      @logger.debug("Joinlines", :pattern => pattern, :text => line,
                    :match => (match != false), :negate => negate)

      # Add negate option
      match = (match and !negate) || (!match and negate)

      if match
        do_flush = (what == "next" and @matching != "next")
        matched = true
        @matching = what
        break
      end
    end

    if !matched
      do_flush = (@matching != "next")
      @matching = ""
    end

    if do_flush
      flush do |event|
        yield(event,@matching)
      end
      do_flush = false
    end

    auto_flush_runner.start
    buffer(line)
  end
end
merge_events() click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 234
def merge_events
  event = LogStash::Event.new(LogStash::Event::TIMESTAMP => @time, "message" => @buffer.join(NL))
  event.tag @multiline_tag if !@multiline_tag.empty? && @buffer.size > 1
  event.tag "joinlines_codec_max_bytes_reached" if over_maximum_bytes?
  event.tag "joinlines_codec_max_lines_reached" if over_maximum_lines?
  event
end
over_maximum_bytes?() click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 271
def over_maximum_bytes?
  @buffer_bytes >= @max_bytes
end
over_maximum_lines?() click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 267
def over_maximum_lines?
  @buffer.size > @max_lines
end
register() click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 91
def register
  require "grok-pure" # rubygem 'jls-grok'
  require 'logstash/patterns/core'

  @matching = ""

  # Detect if we are running from a jarfile, pick the right path.
  patterns_path = []
  patterns_path += [LogStash::Patterns::Core.path]

  @patterns_dir = patterns_path.to_a + @patterns_dir
  @groks = []
  @handlers = []

  @patterns.zip(@what).each do |pattern,what|
    grok = Grok.new

    @patterns_dir.each do |path|
      if ::File.directory?(path)
        path = ::File.join(path, "*")
      end

      Dir.glob(path).each do |file|
        @logger.debug("Grok loading patterns from file", :path => file)
        grok.add_patterns_from_file(file)
      end
    end

    grok.compile(pattern)
    handler = method("do_#{what}".to_sym)

    @groks.push(grok)
    @handlers.push(handler)
  end

  @logger.trace("Registered joinlines plugin", :type => @type, :config => @config)
  reset_buffer

  @converter = LogStash::Util::Charset.new(@charset)
  @converter.logger = @logger

  if @auto_flush_interval
    # will start on first decode
    @auto_flush_runner = AutoFlush.new(self, @auto_flush_interval)
  end
end
reset_buffer() click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 242
def reset_buffer
  @buffer = []
  @buffer_bytes = 0
end
use_mapper_auto_flush() click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 138
def use_mapper_auto_flush
  return unless auto_flush_active?
  @auto_flush_runner = AutoFlushUnset.new(nil, nil)
  @auto_flush_interval = @auto_flush_interval.to_f
end
what_based_listener(what) click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 251
def what_based_listener(what)
  doing_previous?(what) ? @previous_listener : @last_seen_listener
end
zip_config() click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 154
def zip_config
  @patterns.zip(@what, @negate, @groks, @handlers)
end