class Fluent::Plugin::CatSweepInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cat_sweep.rb, line 30 def configure(conf) compat_parameters_convert(conf, :parser, :buffer, default_chunk_key: "time") super configure_parser(conf) if @processing_file_suffix.empty? raise Fluent::ConfigError, "in_cat_sweep: `processing_file_suffix` must has some letters." end if @error_file_suffix.empty? raise Fluent::ConfigError, "in_cat_sweep: `error_file_suffix` must has some letters." end if @line_terminated_by.empty? raise Fluent::ConfigError, "in_cat_sweep: `line_terminated_by` must has some letters." end if !remove_file? first_filename = Dir.glob(@file_path_with_glob).first dirname = first_filename ? move_dirname(first_filename) : @move_to if Dir.exist?(dirname) if !File.writable?(dirname) raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable." end else begin FileUtils.mkdir_p(dirname) rescue raise Fluent::ConfigError, "in_cat_sweep: `move_to` directory (#{dirname}) must be writable." end end end @read_bytes_once = 262144 # 256 KB end
run_periodic()
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 82 def run_periodic while @processing sleep @run_interval Dir.glob(@file_path_with_glob).map do |filename| next unless will_process?(filename) processing_filename = get_processing_filename(filename) begin lock_with_renaming(filename, processing_filename) do process(filename, processing_filename) after_processing(processing_filename) end rescue => e log.error "in_cat_sweep: processing: #{processing_filename}", :error => e, :error_class => e.class log.error_backtrace safe_fail(e, processing_filename) end end end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cat_sweep.rb, line 75 def shutdown @processing = false @thread.join super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cat_sweep.rb, line 68 def start super @processing = true @thread = Thread.new(&method(:run_periodic)) end
Private Instance Methods
after_processing(processing_filename)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 265 def after_processing(processing_filename) if remove_file? FileUtils.rm(processing_filename) else dirname = move_dirname(processing_filename) FileUtils.mkdir_p(dirname) filename = revert_processing_filename(File.basename(processing_filename)) FileUtils.mv(processing_filename, File.join(dirname, filename)) end end
buffer()
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 158 def buffer @buffer || buffer_clean! end
buffer_clean!()
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 154 def buffer_clean! @buffer = String.new.force_encoding('ASCII-8BIT') end
configure_parser(conf)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 106 def configure_parser(conf) @parser = parser_create() end
emit_file(fp)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 199 def emit_file(fp) entries = [] read_each_line(fp) do |line| if line entry = parse_line(line) entries << entry if entry end end unless entries.empty? es = Fluent::ArrayEventStream.new(entries) router.emit_stream(@tag, es) end end
emit_line(line)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 190 def emit_line(line) if line time, record = parse_line(line) if time and record router.emit(@tag, time, record) end end end
error_file?(filename)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 118 def error_file?(filename) filename.end_with?(@error_file_suffix) end
get_error_filename(e, filename)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 138 def get_error_filename(e, filename) errfile = String.new errfile << filename << "." << e.class.to_s << @error_file_suffix end
get_processing_filename(filename)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 126 def get_processing_filename(filename) tmpfile = String.new tmpfile << filename << '.' << Process.pid.to_s << '.' tmpfile << Fluent::EventTime.now.to_s << @processing_file_suffix end
lock_with_renaming(filename_from, filename_to) { || ... }
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 246 def lock_with_renaming(filename_from, filename_to) file = File.open(filename_from, open_mode_for_flock) begin if file.flock(File::LOCK_EX | File::LOCK_NB) File.rename(filename_from, filename_to) yield if block_given? else log.warn "in_cat_sweep: lock failed: skip #{filename_from}" end ensure file.flock(File::LOCK_UN) # release the lock file.close end end
move_dirname(filename)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 261 def move_dirname(filename) File.join(@move_to, File.dirname(File.expand_path(filename))) end
open_mode_for_flock()
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 241 def open_mode_for_flock # When doing flock files on NFS, these files must be opend with writable mode. @open_mode_for_flock ||= @flock_with_rw_mode ? "r+" : "r" end
parse_line(line)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 213 def parse_line(line) entry = nil @parser.parse(line) do |time, record| if time && record entry = [time, record] else # We want to fail an entire file on `pattern not match` # This behavior makes it easy to recover with manual fix operation raise FormatError, "in_cat_sweep: pattern not match: #{line.inspect}" end end entry end
process(original_filename, processing_filename)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 228 def process(original_filename, processing_filename) File.open(processing_filename, 'r') do |tfile| if @file_event_stream emit_file(tfile) else read_each_line(tfile) do |line| emit_line(line) end end log.debug { %[in_cat_sweep: process: {filename:"#{original_filename}",size:#{tfile.size}}] } end end
processing?(filename)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 114 def processing?(filename) filename.end_with?(@processing_file_suffix) end
read_each_line(io) { |chomp!(line_terminated_by)| ... }
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 166 def read_each_line(io) buffer_clean! io.each(@line_terminated_by, @read_bytes_once) do |like_line| buffer << like_line if buffer.length > @oneline_max_bytes begin raise OneLineMaxBytesOverError, "in_cat_sweep: buffer length is over #{@oneline_max_bytes} bytes. remove: #{buffer}" ensure buffer_clean! end end if buffer.end_with?(@line_terminated_by) yield(buffer.chomp!(@line_terminated_by)) buffer_clean! end end yield(buffer.chomp!(@line_terminated_by)) buffer_clean! end
remove_file?()
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 162 def remove_file? @remove_after_processing end
revert_processing_filename(processing_filename)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 132 def revert_processing_filename(processing_filename) tmpfile = processing_filename.dup tmpfile.chomp!(@processing_file_suffix) tmpfile.gsub!(/\.\d+\.\d+$/, '') end
safe_fail(e, filename)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 143 def safe_fail(e, filename) begin error_filename = get_error_filename(e, filename) lock_with_renaming(filename, error_filename) rescue => e log.error "in_cat_sweep: rename #{filename} to error filename #{error_filename}", :error => e, :error_class => e.class log.error_backtrace end end
sufficient_waiting?(filename)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 122 def sufficient_waiting?(filename) (Time.at(Fluent::EventTime.now.to_r) - File.mtime(filename)).to_i < @waiting_seconds end
will_process?(filename)
click to toggle source
# File lib/fluent/plugin/in_cat_sweep.rb, line 110 def will_process?(filename) !(processing?(filename) or error_file?(filename) or sufficient_waiting?(filename)) end