module LogProcessor

Public Class Methods

included(base) click to toggle source
# File lib/logstash/inputs/s3snssqs/log_processor.rb, line 9
def self.included(base)
  base.extend(self)
end

Public Instance Methods

process(record, logstash_event_queue) click to toggle source
# File lib/logstash/inputs/s3snssqs/log_processor.rb, line 13
def process(record, logstash_event_queue)
  file = record[:local_file]
  codec = @codec_factory.get_codec(record)
  folder = record[:folder]
  type = @type_by_folder.fetch(record[:bucket],{})[folder]
  metadata = {}
  line_count = 0
  event_count = 0
  #start_time = Time.now
  file_t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) #PROFILING
  read_file(file) do |line|
    line_count += 1
    if stop?
      @logger.warn("[#{Thread.current[:name]}] Abort reading in the middle of the file, we will read it again when logstash is started")
      throw :skip_delete
    end
    begin
      codec.decode(line) do |event|
        event_count += 1
        decorate_event(event, metadata, type, record[:key], record[:bucket], record[:s3_data])
        #event_time = Time.now #PROFILING
        #event.set("[@metadata][progress][begin]", start_time)
        #event.set("[@metadata][progress][index_time]", event_time)
        #event.set("[@metadata][progress][line]", line_count)
        logstash_event_queue << event
      end
    rescue Exception => e
      @logger.error("[#{Thread.current[:name]}] Unable to decode line", :line => line, :error => e)
    end
  end
  file_t1 = Process.clock_gettime(Process::CLOCK_MONOTONIC) #PROFILING
  processing_time = (file_t1 - file_t0)
  #@logger.warn("[#{Thread.current[:name]}] Completed long running File ( took #{processing_time} ) s", file: record[:key], events: event_count, processing_time: processing_time  ) if processing_time > 600.0 #PROFILING
  # ensure any stateful codecs (such as multi-line ) are flushed to the queue
  codec.flush do |event|
    event_count += 1
    decorate_event(event, metadata, type, record[:key], record[:bucket], record[:s3_data])
    @logger.debug("[#{Thread.current[:name]}] Flushing an incomplete event", :event => event.to_s)
    logstash_event_queue << event
  end
  # signal completion:
  return true
end

Private Instance Methods

decorate_event(event, metadata, type, key, bucket, s3_data) click to toggle source
# File lib/logstash/inputs/s3snssqs/log_processor.rb, line 59
def decorate_event(event, metadata, type, key, bucket, s3_data)
  if event_is_metadata?(event)
    @logger.debug('Updating the current cloudfront metadata', :event => event)
    update_metadata(metadata, event)
  else
    # type by folder - set before "decorate()" enforces default
    event.set('type', type) if type and ! event.include?('type')
    decorate(event)

    event.set("cloudfront_version", metadata[:cloudfront_version]) unless metadata[:cloudfront_version].nil?
    event.set("cloudfront_fields", metadata[:cloudfront_fields]) unless metadata[:cloudfront_fields].nil?

    event.set("[@metadata][s3]", s3_data)
    event.set("[@metadata][s3][object_key]", key)
    event.set("[@metadata][s3][bucket_name]", bucket)
    event.set("[@metadata][s3][object_folder]", get_object_folder(key))

  end
end
event_is_metadata?(event) click to toggle source
# File lib/logstash/inputs/s3snssqs/log_processor.rb, line 118
def event_is_metadata?(event)
  return false unless event.get("message").class == String
  line = event.get("message")
  version_metadata?(line) || fields_metadata?(line)
end
fields_metadata?(line) click to toggle source
# File lib/logstash/inputs/s3snssqs/log_processor.rb, line 128
def fields_metadata?(line)
  line.start_with?('#Fields: ')
end
gzip?(filename) click to toggle source
# File lib/logstash/inputs/s3snssqs/log_processor.rb, line 79
def gzip?(filename)
  return true if filename.end_with?('.gz','.gzip')
  MagicGzipValidator.new(File.new(filename, 'rb')).valid?
rescue Exception => e
  @logger.warn("Problem while gzip detection", :error => e)
end
read_file(filename) { |toString()| ... } click to toggle source
# File lib/logstash/inputs/s3snssqs/log_processor.rb, line 86
def read_file(filename)
  zipped = gzip?(filename)
  completed = false
  file_stream = FileInputStream.new(filename)
  if zipped
    gzip_stream = GZIPInputStream.new(file_stream)
    decoder = InputStreamReader.new(gzip_stream, 'UTF-8')
  else
    decoder = InputStreamReader.new(file_stream, 'UTF-8')
  end
  buffered = BufferedReader.new(decoder)

  while (data = buffered.readLine())
    line = StringBuilder.new(data).append("\n")
    yield(line.toString())
  end
  completed = true
rescue ZipException => e
  @logger.error("Gzip codec: We cannot uncompress the gzip file", :filename => filename, :error => e)
ensure
  buffered.close unless buffered.nil?
  decoder.close unless decoder.nil?
  gzip_stream.close unless gzip_stream.nil?
  file_stream.close unless file_stream.nil?

  unless completed
    @logger.warn("[#{Thread.current[:name]}] Incomplete message in read_file. We´ll throw skip_delete.", :filename => filename)
    throw :skip_delete
  end

end
update_metadata(metadata, event) click to toggle source
# File lib/logstash/inputs/s3snssqs/log_processor.rb, line 132
def update_metadata(metadata, event)
  line = event.get('message').strip

  if version_metadata?(line)
    metadata[:cloudfront_version] = line.split(/#Version: (.+)/).last
  end

  if fields_metadata?(line)
    metadata[:cloudfront_fields] = line.split(/#Fields: (.+)/).last
  end
end
version_metadata?(line) click to toggle source
# File lib/logstash/inputs/s3snssqs/log_processor.rb, line 124
def version_metadata?(line)
  line.start_with?('#Version: ')
end