class LogStash::Codecs::Joinlines
Public Instance Methods
accept(listener)
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 144 def accept(listener) # memoize references to listener that holds upstream state @previous_listener = @last_seen_listener || listener @last_seen_listener = listener internal_decode(listener.data) do |event,what| what_based_listener(what).process_event(event) end end
auto_flush(listener = @last_seen_listener)
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 226 def auto_flush(listener = @last_seen_listener) return if listener.nil? flush do |event| listener.process_event(event) end end
auto_flush_active?()
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 288 def auto_flush_active? !@auto_flush_interval.nil? end
auto_flush_runner()
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 292 def auto_flush_runner @auto_flush_runner || AutoFlushUnset.new(nil, nil) end
buffer(text)
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 204 def buffer(text) @buffer_bytes += text.bytesize @buffer.push(text) end
buffer_over_limits?()
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 275 def buffer_over_limits? over_maximum_lines? || over_maximum_bytes? end
close()
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 284 def close auto_flush_runner.stop end
decode(text) { |event| ... }
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 198 def decode(text, &block) internal_decode(text) do |event,what| yield(event) end end
do_next(text, matched, &block)
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 255 def do_next(text, matched, &block) buffer(text) auto_flush_runner.start flush(&block) if !matched || buffer_over_limits? end
do_previous(text, matched, &block)
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 261 def do_previous(text, matched, &block) flush(&block) if !matched || buffer_over_limits? auto_flush_runner.start buffer(text) end
doing_previous?(what)
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 247 def doing_previous?(what) what != "next" end
encode(event)
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 279 def encode(event) # Nothing to do. @on_event.call(event, event) end
flush() { |events| ... }
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 209 def flush(&block) if block_given? && @buffer.any? no_error = true events = merge_events begin yield events rescue ::Exception => e # need to rescue everything # likliest cause: backpressure or timeout by exception # can't really do anything but leave the data in the buffer for next time if there is one @logger.error("Joinlines: flush downstream error", :exception => e) no_error = false end reset_buffer if no_error end end
initialize_copy(source)
click to toggle source
Calls superclass method
# File lib/logstash/codecs/joinlines.rb, line 296 def initialize_copy(source) super register end
internal_decode(text) { |event,matching| ... }
click to toggle source
private
# File lib/logstash/codecs/joinlines.rb, line 159 def internal_decode(text, &block) do_flush = false text = @converter.convert(text) text.split("\n").each do |line| matched = false zip_config.each do |pattern,what,negate,grok,handler| match = grok.match(line) @logger.debug("Joinlines", :pattern => pattern, :text => line, :match => (match != false), :negate => negate) # Add negate option match = (match and !negate) || (!match and negate) if match do_flush = (what == "next" and @matching != "next") matched = true @matching = what break end end if !matched do_flush = (@matching != "next") @matching = "" end if do_flush flush do |event| yield(event,@matching) end do_flush = false end auto_flush_runner.start buffer(line) end end
merge_events()
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 234 def merge_events event = LogStash::Event.new(LogStash::Event::TIMESTAMP => @time, "message" => @buffer.join(NL)) event.tag @multiline_tag if !@multiline_tag.empty? && @buffer.size > 1 event.tag "joinlines_codec_max_bytes_reached" if over_maximum_bytes? event.tag "joinlines_codec_max_lines_reached" if over_maximum_lines? event end
over_maximum_bytes?()
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 271 def over_maximum_bytes? @buffer_bytes >= @max_bytes end
over_maximum_lines?()
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 267 def over_maximum_lines? @buffer.size > @max_lines end
register()
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 91 def register require "grok-pure" # rubygem 'jls-grok' require 'logstash/patterns/core' @matching = "" # Detect if we are running from a jarfile, pick the right path. patterns_path = [] patterns_path += [LogStash::Patterns::Core.path] @patterns_dir = patterns_path.to_a + @patterns_dir @groks = [] @handlers = [] @patterns.zip(@what).each do |pattern,what| grok = Grok.new @patterns_dir.each do |path| if ::File.directory?(path) path = ::File.join(path, "*") end Dir.glob(path).each do |file| @logger.debug("Grok loading patterns from file", :path => file) grok.add_patterns_from_file(file) end end grok.compile(pattern) handler = method("do_#{what}".to_sym) @groks.push(grok) @handlers.push(handler) end @logger.trace("Registered joinlines plugin", :type => @type, :config => @config) reset_buffer @converter = LogStash::Util::Charset.new(@charset) @converter.logger = @logger if @auto_flush_interval # will start on first decode @auto_flush_runner = AutoFlush.new(self, @auto_flush_interval) end end
reset_buffer()
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 242 def reset_buffer @buffer = [] @buffer_bytes = 0 end
use_mapper_auto_flush()
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 138 def use_mapper_auto_flush return unless auto_flush_active? @auto_flush_runner = AutoFlushUnset.new(nil, nil) @auto_flush_interval = @auto_flush_interval.to_f end
what_based_listener(what)
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 251 def what_based_listener(what) doing_previous?(what) ? @previous_listener : @last_seen_listener end
zip_config()
click to toggle source
# File lib/logstash/codecs/joinlines.rb, line 154 def zip_config @patterns.zip(@what, @negate, @groks, @handlers) end