class Fluent::TailMultilineInput_EX
Constants
- FORMAT_MAX_NUMS
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 64 def initialize super @locker = Monitor.new @logbuf = nil @logbuf_flusher = CallLater_EX::new @ready = false end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 72 def configure(conf) if conf['format'].nil? invalids = conf.keys.select{|k| k =~ /^format(\d+)$/ and not (1..FORMAT_MAX_NUMS).include?($1.to_i)} if invalids.size > 0 raise ConfigError, "invalid number formats (valid format number:1-#{FORMAT_MAX_NUMS}):" + invalids.join(",") end format_index_list = conf.keys.select{|s| s =~ /^format\d+$/}.map{|v| (/^format(\d+)$/.match(v))[1].to_i} if (1..format_index_list.max).map{|i| conf["format#{i}"]}.include?(nil) raise Fluent::ConfigError, "jump of format index found" end formats = (1..FORMAT_MAX_NUMS).map {|i| conf["format#{i}"] }.delete_if {|format| format.nil? }.map {|format| format[1..-2] }.join conf['format'] = '/' + formats + '/' end super if @tag.index('*') @tag_prefix, @tag_suffix = @tag.split('*') @tag_suffix ||= '' else @tag_prefix = nil @tag_suffix = nil end @watchers = {} @refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, &method(:refresh_watchers)) if read_newfile_from_head and @pf # Tread new file as rotated file # Use temp file inode number as previos logfile @paths.map {|path| pe = @pf[path] if pe.read_inode == 0 require 'tempfile' tmpfile = Tempfile.new('gettempinode') pe.update(File.stat(tmpfile).ino, 0) tmpfile.unlink end } end end
configure_parser(conf)
click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 182 def configure_parser(conf) @parser = MultilineTextParser_EX.new @parser.configure(conf) end
expand_paths()
click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 116 def expand_paths date = Time.now paths = [] for path in @paths if @expand_date path = date.strftime(path) end paths += Dir.glob(path) end paths end
flush_logbuf()
click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 228 def flush_logbuf time, record = nil,nil @locker.synchronize do time, record = parse_logbuf(@logbuf) @logbuf = nil end if time && record Engine.emit(@tag, time, record) end end
parse_logbuf(buf)
click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 239 def parse_logbuf(buf) return nil,nil unless buf buf.chomp! begin time, record = @parser.parse(buf) rescue $log.warn buf.dump, :error=>$!.to_s $log.debug_backtrace end return nil,nil unless time && record record[@rawdata_key] = buf if @rawdata_key return time, record end
receive_lines(lines,tag)
click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 187 def receive_lines(lines,tag) if @tag_prefix || @tag_suffix @tag = @tag_prefix + tag + @tag_suffix end @logbuf_flusher.cancel() es = MultiEventStream.new @locker.synchronize do lines.each {|line| if @parser.match_firstline(line) time, record = parse_logbuf(@logbuf) if time && record es.add(time, record) end @logbuf = line else @logbuf += line if(@logbuf) end } end unless es.empty? begin Engine.emit_stream(@tag, es) rescue # ignore errors. Engine shows logs and backtraces. end end @logbuf_flusher.call_later(@auto_flush_sec) do flush_logbuf() end end
refresh_watchers()
click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 128 def refresh_watchers paths = expand_paths missing = @watchers.keys - paths added = paths - @watchers.keys stop_watch(missing) unless missing.empty? start_watch(added) unless added.empty? end
run()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 175 def run # don't run unless ready to avoid coolio error if @ready super end end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 218 def shutdown @refresh_trigger.detach stop_watch(@watchers.keys, true) @loop.stop @thread.join @pf_file.close if @pf_file flush_logbuf() @logbuf_flusher.shutdown() end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 164 def start paths, @paths = @paths, [] super @thread.join @paths = paths refresh_watchers @refresh_trigger.attach(@loop) @ready = true @thread = Thread.new(&method(:run)) end
start_watch(paths)
click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 137 def start_watch(paths) paths.each do |path| if @pf pe = @pf[path] if @read_all && pe.read_inode == 0 inode = File::Stat.new(path).ino pe.update(inode, 0) end else pe = nil end watcher = TailExWatcher_EX.new(path, @rotate_wait, pe, &method(:receive_lines)) watcher.attach(@loop) @watchers[path] = watcher end end
stop_watch(paths, immediate=false)
click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 155 def stop_watch(paths, immediate=false) paths.each do |path| watcher = @watchers.delete(path) if watcher watcher.close(immediate ? nil : @loop) end end end