class LogStash::Inputs::FileProgress

Public Instance Methods

begin_tailing() click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 298
def begin_tailing
  # if the pipeline restarts this input,
  # make sure previous files are closed
  stop
  # use observer listener api
  @tail = FileWatch::Ext::Tail.new_observing_progress(@tail_config)
  @tail.logger = @logger
  @path.each { |path| @tail.tail(path) }
end
listener_for(path) click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 293
def listener_for(path)
  # path is the identity
  ListenerTail.new(path, self)
end
log_line_received(path, line) click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 321
def log_line_received(path, line)
  return if !@logger.debug?
  @logger.debug("Received line", :path => path, :text => line)
end
post_process_this(event) click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 315
def post_process_this(event)
  event.set("host", @host) if !event.include?("host")
  decorate(event)
  @queue << event
end
register() click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 178
def register
  require "addressable/uri"
  require "filewatch/ext/tail"
  require "digest/md5"
  @logger.info("Registering file input", :path => @path)
  @host = Socket.gethostname.force_encoding(Encoding::UTF_8)

  @tail_config = {
    :exclude => @exclude,
    :stat_interval => @stat_interval,
    :discover_interval => @discover_interval,
    :sincedb_write_interval => @sincedb_write_interval,
    :delimiter => @delimiter,
    :ignore_older => @ignore_older,
    :close_older => @close_older,
    :max_open_files => @max_open_files,
    :eof_close => true,
    :progress_write_interval => @progress_write_interval
  }

  @path.each do |path|
    if Pathname.new(path).relative?
      raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}")
    end
  end

  if @sincedb_path.nil?
    if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil?
      @logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \
                    "to keep track of the files I'm watching. Either set " \
                    "HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \
                    "in your Logstash config for the file input with " \
                    "path '#{@path.inspect}'")
      raise # TODO(sissel): HOW DO I FAIL PROPERLY YO
    end

    #pick SINCEDB_DIR if available, otherwise use HOME
    sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"]

    # Join by ',' to make it easy for folks to know their own sincedb
    # generated path (vs, say, inspecting the @path array)
    @sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@path.join(",")))

    # Migrate any old .sincedb to the new file (this is for version <=1.1.1 compatibility)
    old_sincedb = File.join(sincedb_dir, ".sincedb")
    if File.exists?(old_sincedb)
      @logger.info("Renaming old ~/.sincedb to new one", :old => old_sincedb,
                   :new => @sincedb_path)
      File.rename(old_sincedb, @sincedb_path)
    end

    @logger.info("No sincedb_path set, generating one based on the file path",
                 :sincedb_path => @sincedb_path, :path => @path)
  end

  if File.directory?(@sincedb_path)
    raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
  end

  @tail_config[:sincedb_path] = @sincedb_path

  if @start_position == "beginning"
    @tail_config[:start_new_files_at] = :beginning
  end

  @codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
end
run(queue) click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 308
def run(queue)
  begin_tailing
  @queue = queue
  @tail.subscribe(self)
  exit_flush
end
stop() click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 326
def stop
  # in filewatch >= 0.6.7, quit will closes and forget all files
  # but it will write their last read positions to since_db
  # beforehand
  if @tail
    @codec.close
    @tail.quit
  end
end

Private Instance Methods

exit_flush() click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 338
def exit_flush
  listener = FlushableListener.new("none", self)
  if @codec.identity_count.zero?
    # using the base codec without identity/path info
    @codec.base_codec.flush do |event|
      begin
        listener.process_event(event)
      rescue => e
        @logger.error("File Input: flush on exit downstream error", :exception => e)
      end
    end
  else
    @codec.flush_mapped(listener)
  end
end