class LogStash::Outputs::Progress

File output.

Write since_db-file with progress

Public Instance Methods

receive(event) click to toggle source
# File lib/logstash/outputs/progress.rb, line 39
def receive(event)
  return unless output?(event)

  if @message_format
    output = event.sprintf(@message_format)
  else
    output = event.to_json
  end

  ino, dev_major, dev_minor, size, pos = event["message"].split(" ", 5)

  inode = [ino.to_i, dev_major.to_i, dev_minor.to_i, size.to_i]

  if event.include? "tags" and event["tags"].include?("del")
    @sincedb.delete(inode)
  else
    @sincedb[inode] = output
  end
  _sincedb_write(event["path"])

end
register() click to toggle source
# File lib/logstash/outputs/progress.rb, line 31
def register
  require "fileutils" # For mkdir_p

  workers_not_supported
  @sincedb = {}
end

Private Instance Methods

_sincedb_write(event_path) click to toggle source
# File lib/logstash/outputs/progress.rb, line 62
def _sincedb_write(event_path)
  path = @progressdb_path
  tmp = "#{path}.new"
  begin
    db = File.open(tmp, "w")
  rescue => e
    @logger.warn("_sincedb_write failed: #{tmp}: #{e}")
    return
  end

  @sincedb.each do |inode, message|
    db.puts([message].flatten.join(" "))
  end
  db.close

  begin
    File.rename(tmp, path)
  rescue => e
    @logger.warn("_sincedb_write rename/sync failed: #{tmp} -> #{path}: #{e}")
  end
end
teardown() click to toggle source
# File lib/logstash/outputs/progress.rb, line 84
def teardown
  @logger.debug("Teardown: closing files")
  finished
end