class LogStash::Inputs::FileProgress
Public Instance Methods
begin_tailing()
click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 298 def begin_tailing # if the pipeline restarts this input, # make sure previous files are closed stop # use observer listener api @tail = FileWatch::Ext::Tail.new_observing_progress(@tail_config) @tail.logger = @logger @path.each { |path| @tail.tail(path) } end
listener_for(path)
click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 293 def listener_for(path) # path is the identity ListenerTail.new(path, self) end
log_line_received(path, line)
click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 321 def log_line_received(path, line) return if !@logger.debug? @logger.debug("Received line", :path => path, :text => line) end
post_process_this(event)
click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 315 def post_process_this(event) event.set("host", @host) if !event.include?("host") decorate(event) @queue << event end
register()
click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 178 def register require "addressable/uri" require "filewatch/ext/tail" require "digest/md5" @logger.info("Registering file input", :path => @path) @host = Socket.gethostname.force_encoding(Encoding::UTF_8) @tail_config = { :exclude => @exclude, :stat_interval => @stat_interval, :discover_interval => @discover_interval, :sincedb_write_interval => @sincedb_write_interval, :delimiter => @delimiter, :ignore_older => @ignore_older, :close_older => @close_older, :max_open_files => @max_open_files, :eof_close => true, :progress_write_interval => @progress_write_interval } @path.each do |path| if Pathname.new(path).relative? raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}") end end if @sincedb_path.nil? if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil? @logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \ "to keep track of the files I'm watching. Either set " \ "HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \ "in your Logstash config for the file input with " \ "path '#{@path.inspect}'") raise # TODO(sissel): HOW DO I FAIL PROPERLY YO end #pick SINCEDB_DIR if available, otherwise use HOME sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"] # Join by ',' to make it easy for folks to know their own sincedb # generated path (vs, say, inspecting the @path array) @sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@path.join(","))) # Migrate any old .sincedb to the new file (this is for version <=1.1.1 compatibility) old_sincedb = File.join(sincedb_dir, ".sincedb") if File.exists?(old_sincedb) @logger.info("Renaming old ~/.sincedb to new one", :old => old_sincedb, :new => @sincedb_path) File.rename(old_sincedb, @sincedb_path) end @logger.info("No sincedb_path set, generating one based on the file path", :sincedb_path => @sincedb_path, :path => @path) end if File.directory?(@sincedb_path) raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"") end @tail_config[:sincedb_path] = @sincedb_path if @start_position == "beginning" @tail_config[:start_new_files_at] = :beginning end @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) end
run(queue)
click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 308 def run(queue) begin_tailing @queue = queue @tail.subscribe(self) exit_flush end
stop()
click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 326 def stop # in filewatch >= 0.6.7, quit will closes and forget all files # but it will write their last read positions to since_db # beforehand if @tail @codec.close @tail.quit end end
Private Instance Methods
exit_flush()
click to toggle source
# File lib/logstash/inputs/file_progress.rb, line 338 def exit_flush listener = FlushableListener.new("none", self) if @codec.identity_count.zero? # using the base codec without identity/path info @codec.base_codec.flush do |event| begin listener.process_event(event) rescue => e @logger.error("File Input: flush on exit downstream error", :exception => e) end end else @codec.flush_mapped(listener) end end