class Fluent::TailMultilineInput_EX

Constants

FORMAT_MAX_NUMS

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 64
def initialize
  super
  @locker = Monitor.new
  @logbuf = nil
  @logbuf_flusher = CallLater_EX::new
  @ready = false
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 72
def configure(conf)
  if conf['format'].nil?
    invalids = conf.keys.select{|k| k =~ /^format(\d+)$/ and not (1..FORMAT_MAX_NUMS).include?($1.to_i)}
    if invalids.size > 0
      raise ConfigError, "invalid number formats (valid format number:1-#{FORMAT_MAX_NUMS}):" + invalids.join(",")
    end
    format_index_list = conf.keys.select{|s| s =~ /^format\d+$/}.map{|v| (/^format(\d+)$/.match(v))[1].to_i}
    if (1..format_index_list.max).map{|i| conf["format#{i}"]}.include?(nil)
      raise Fluent::ConfigError, "jump of format index found"
    end
    formats = (1..FORMAT_MAX_NUMS).map {|i|
      conf["format#{i}"]
    }.delete_if {|format|
      format.nil?
    }.map {|format|
      format[1..-2]
    }.join
    conf['format'] = '/' + formats + '/'
  end
  super
  if @tag.index('*')
    @tag_prefix, @tag_suffix = @tag.split('*')
    @tag_suffix ||= ''
  else
    @tag_prefix = nil
    @tag_suffix = nil
  end
  @watchers = {}
  @refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, &method(:refresh_watchers))
  if read_newfile_from_head and @pf
    # Tread new file as rotated file
    # Use temp file inode number as previos logfile
    @paths.map {|path|
      pe = @pf[path]
      if pe.read_inode == 0
        require 'tempfile'
        tmpfile = Tempfile.new('gettempinode')
        pe.update(File.stat(tmpfile).ino, 0)
        tmpfile.unlink
      end
    }
  end
end
configure_parser(conf) click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 182
def configure_parser(conf)
  @parser = MultilineTextParser_EX.new
  @parser.configure(conf)
end
expand_paths() click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 116
def expand_paths
  date = Time.now
  paths = []
  for path in @paths
    if @expand_date
      path = date.strftime(path)
    end
    paths += Dir.glob(path)
  end
  paths
end
flush_logbuf() click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 228
def flush_logbuf
  time, record = nil,nil
  @locker.synchronize do
    time, record = parse_logbuf(@logbuf)
    @logbuf = nil
  end
  if time && record
    Engine.emit(@tag, time, record)
  end
end
parse_logbuf(buf) click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 239
def parse_logbuf(buf)
  return nil,nil unless buf
  buf.chomp!
  begin
    time, record = @parser.parse(buf)
  rescue
    $log.warn buf.dump, :error=>$!.to_s
    $log.debug_backtrace
  end
  return nil,nil unless time && record
  record[@rawdata_key] = buf if @rawdata_key
  return time, record
end
receive_lines(lines,tag) click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 187
def receive_lines(lines,tag)
  if @tag_prefix || @tag_suffix
    @tag = @tag_prefix + tag + @tag_suffix
  end
  @logbuf_flusher.cancel()
  es = MultiEventStream.new
  @locker.synchronize do
    lines.each {|line|
        if @parser.match_firstline(line)
          time, record = parse_logbuf(@logbuf)
          if time && record
            es.add(time, record)
          end
          @logbuf = line
        else
          @logbuf += line if(@logbuf)
        end
    }
  end
  unless es.empty?
    begin
      Engine.emit_stream(@tag, es)
    rescue
      # ignore errors. Engine shows logs and backtraces.
    end
  end
  @logbuf_flusher.call_later(@auto_flush_sec) do
    flush_logbuf()
  end
end
refresh_watchers() click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 128
def refresh_watchers
  paths = expand_paths
  missing = @watchers.keys - paths
  added = paths - @watchers.keys

  stop_watch(missing) unless missing.empty?
  start_watch(added) unless added.empty?
end
run() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 175
def run
  # don't run unless ready to avoid coolio error
  if @ready
    super
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 218
def shutdown
  @refresh_trigger.detach
  stop_watch(@watchers.keys, true)
  @loop.stop
  @thread.join
  @pf_file.close if @pf_file
  flush_logbuf()
  @logbuf_flusher.shutdown()
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 164
def start
  paths, @paths = @paths, []
  super
  @thread.join
  @paths = paths
  refresh_watchers
  @refresh_trigger.attach(@loop)
  @ready = true
  @thread = Thread.new(&method(:run))
end
start_watch(paths) click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 137
def start_watch(paths)
  paths.each do |path|
    if @pf
      pe = @pf[path]
      if @read_all && pe.read_inode == 0
        inode = File::Stat.new(path).ino
        pe.update(inode, 0)
      end
    else
      pe = nil
    end

    watcher = TailExWatcher_EX.new(path, @rotate_wait, pe, &method(:receive_lines))
    watcher.attach(@loop)
    @watchers[path] = watcher
  end
end
stop_watch(paths, immediate=false) click to toggle source
# File lib/fluent/plugin/in_tail_multiline_ex.rb, line 155
def stop_watch(paths, immediate=false)
  paths.each do |path|
    watcher = @watchers.delete(path)
    if watcher
      watcher.close(immediate ? nil : @loop)
    end
  end
end