class Fluent::Plugin::CatSweepInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cat_sweep.rb, line 30
def configure(conf)
  compat_parameters_convert(conf, :parser, :buffer, default_chunk_key: "time")
  super

  configure_parser(conf)

  if @processing_file_suffix.empty?
    raise Fluent::ConfigError, "in_cat_sweep: `processing_file_suffix` must has some letters."
  end

  if @error_file_suffix.empty?
    raise Fluent::ConfigError, "in_cat_sweep: `error_file_suffix` must has some letters."
  end

  if @line_terminated_by.empty?
    raise Fluent::ConfigError, "in_cat_sweep: `line_terminated_by` must has some letters."
  end

  if !remove_file?
    first_filename = Dir.glob(@file_path_with_glob).first
    dirname = first_filename ? move_dirname(first_filename) : @move_to
    if Dir.exist?(dirname)
      if !File.writable?(dirname)
        raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable."
      end
    else
      begin
        FileUtils.mkdir_p(dirname)
      rescue
        raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable."
      end
    end
  end

  @read_bytes_once = 262144 # 256 KB

end
run_periodic() click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 82
def run_periodic
  while @processing
    sleep @run_interval

    Dir.glob(@file_path_with_glob).map do |filename|
      next unless will_process?(filename)

      processing_filename = get_processing_filename(filename)
      begin
        lock_with_renaming(filename, processing_filename) do
          process(filename, processing_filename)
          after_processing(processing_filename)
        end
      rescue => e
        log.error "in_cat_sweep: processing: #{processing_filename}", :error => e, :error_class => e.class
        log.error_backtrace
        safe_fail(e, processing_filename)
      end
    end
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cat_sweep.rb, line 75
def shutdown
  @processing = false
  @thread.join

  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cat_sweep.rb, line 68
def start
  super

  @processing = true
  @thread = Thread.new(&method(:run_periodic))
end

Private Instance Methods

after_processing(processing_filename) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 265
def after_processing(processing_filename)
  if remove_file?
    FileUtils.rm(processing_filename)
  else
    dirname = move_dirname(processing_filename)
    FileUtils.mkdir_p(dirname)
    filename = revert_processing_filename(File.basename(processing_filename))
    FileUtils.mv(processing_filename, File.join(dirname, filename))
  end
end
buffer() click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 158
def buffer
  @buffer || buffer_clean!
end
buffer_clean!() click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 154
def buffer_clean!
  @buffer = String.new.force_encoding('ASCII-8BIT')
end
configure_parser(conf) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 106
def configure_parser(conf)
  @parser = parser_create()
end
emit_file(fp) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 199
def emit_file(fp)
  entries = []
  read_each_line(fp) do |line|
    if line
      entry = parse_line(line)
      entries << entry if entry
    end
  end
  unless entries.empty?
    es = Fluent::ArrayEventStream.new(entries)
    router.emit_stream(@tag, es)
  end
end
emit_line(line) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 190
def emit_line(line)
  if line
    time, record = parse_line(line)
    if time and record
      router.emit(@tag, time, record)
    end
  end
end
error_file?(filename) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 118
def error_file?(filename)
  filename.end_with?(@error_file_suffix)
end
get_error_filename(e, filename) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 138
def get_error_filename(e, filename)
  errfile = String.new
  errfile << filename << "." << e.class.to_s << @error_file_suffix
end
get_processing_filename(filename) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 126
def get_processing_filename(filename)
  tmpfile = String.new
  tmpfile << filename << '.' << Process.pid.to_s << '.'
  tmpfile << Fluent::EventTime.now.to_s << @processing_file_suffix
end
lock_with_renaming(filename_from, filename_to) { || ... } click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 246
def lock_with_renaming(filename_from, filename_to)
  file = File.open(filename_from, open_mode_for_flock)
  begin
    if file.flock(File::LOCK_EX | File::LOCK_NB)
      File.rename(filename_from, filename_to)
      yield if block_given?
    else
      log.warn "in_cat_sweep: lock failed: skip #{filename_from}"
    end
  ensure
    file.flock(File::LOCK_UN) # release the lock
    file.close
  end
end
move_dirname(filename) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 261
def move_dirname(filename)
  File.join(@move_to, File.dirname(File.expand_path(filename)))
end
open_mode_for_flock() click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 241
def open_mode_for_flock
  # When doing flock files on NFS, these files must be opend with writable mode.
  @open_mode_for_flock ||= @flock_with_rw_mode ? "r+" : "r"
end
parse_line(line) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 213
def parse_line(line)
  entry = nil
  @parser.parse(line) do |time, record|
    if time && record
      entry = [time, record]
    else
      # We want to fail an entire file on `pattern not match`
      # This behavior makes it easy to recover with manual fix operation
      raise FormatError,
        "in_cat_sweep: pattern not match: #{line.inspect}"
    end
  end
  entry
end
process(original_filename, processing_filename) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 228
def process(original_filename, processing_filename)
  File.open(processing_filename, 'r') do |tfile|
    if @file_event_stream
      emit_file(tfile)
    else
      read_each_line(tfile) do |line|
        emit_line(line)
      end
    end
    log.debug { %[in_cat_sweep: process: {filename:"#{original_filename}",size:#{tfile.size}}] }
  end
end
processing?(filename) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 114
def processing?(filename)
  filename.end_with?(@processing_file_suffix)
end
read_each_line(io) { |chomp!(line_terminated_by)| ... } click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 166
def read_each_line(io)
  buffer_clean!

  io.each(@line_terminated_by, @read_bytes_once) do |like_line|
    buffer << like_line

    if buffer.length > @oneline_max_bytes
      begin
        raise OneLineMaxBytesOverError,
          "in_cat_sweep: buffer length is over #{@oneline_max_bytes} bytes. remove: #{buffer}"
      ensure
        buffer_clean!
      end
    end

    if buffer.end_with?(@line_terminated_by)
      yield(buffer.chomp!(@line_terminated_by))
      buffer_clean!
    end
  end
  yield(buffer.chomp!(@line_terminated_by))
  buffer_clean!
end
remove_file?() click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 162
def remove_file?
  @remove_after_processing
end
revert_processing_filename(processing_filename) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 132
def revert_processing_filename(processing_filename)
  tmpfile = processing_filename.dup
  tmpfile.chomp!(@processing_file_suffix)
  tmpfile.gsub!(/\.\d+\.\d+$/, '')
end
safe_fail(e, filename) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 143
def safe_fail(e, filename)
  begin
    error_filename = get_error_filename(e, filename)
    lock_with_renaming(filename, error_filename)
  rescue => e
    log.error "in_cat_sweep: rename #{filename} to error filename #{error_filename}",
      :error => e, :error_class => e.class
    log.error_backtrace
  end
end
sufficient_waiting?(filename) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 122
def sufficient_waiting?(filename)
  (Time.at(Fluent::EventTime.now.to_r) - File.mtime(filename)).to_i < @waiting_seconds
end
will_process?(filename) click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 110
def will_process?(filename)
  !(processing?(filename) or error_file?(filename) or sufficient_waiting?(filename))
end