class Fluent::ExecCronInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_exec_cron.rb, line 5 def initialize super require 'fluent/plugin/exec_util' require 'fluent/timezone' require 'parse-cron' require 'erb' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_exec_cron.rb, line 30 def configure(conf) super if localtime = conf['localtime'] @localtime = true elsif utc = conf['utc'] @localtime = false end if conf['timezone'] @timezone = conf['timezone'] Fluent::Timezone.validate!(@timezone) end if !@tag && !@tag_key raise ConfigError, "'tag' or 'tag_key' option is required on exec input" end if @time_key if @time_format f = @time_format @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i } else @time_parse_proc = Proc.new {|str| str.to_i } end end @parser = setup_parser(conf) begin @cron_parser = CronParser.new(@cron) rescue => e raise ConfigError, "invalid cron expression. [#{@cron}]" end @command = ERB.new(@command.gsub(/\$\{([^}]+)\}/, '<%= \1 %>')) end
run_periodic()
click to toggle source
# File lib/fluent/plugin/in_exec_cron.rb, line 97 def run_periodic until @finished begin secs = @cron_parser.next(Time.now) - Time.now sleep secs io = IO.popen(@command.result(binding), "r") @parser.call(io) Process.waitpid(io.pid) rescue log.error "exec failed to run or shutdown child process", :error => $!.to_s, :error_class => $!.class.to_s log.warn_backtrace $!.backtrace end end end
setup_parser(conf)
click to toggle source
# File lib/fluent/plugin/in_exec_cron.rb, line 67 def setup_parser(conf) case @format when 'tsv' if @keys.empty? raise ConfigError, "keys option is required on exec input for tsv format" end ExecUtil::TSVParser.new(@keys, method(:on_message)) when 'json' ExecUtil::JSONParser.new(method(:on_message)) when 'msgpack' ExecUtil::MessagePackParser.new(method(:on_message)) else ExecUtil::TextParserWrapperParser.new(conf, method(:on_message)) end end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_exec_cron.rb, line 88 def shutdown @finished = true if @graceful_shutdown @thread.join else Thread.kill(@thread) end end
start()
click to toggle source
# File lib/fluent/plugin/in_exec_cron.rb, line 83 def start @finished = false @thread = Thread.new(&method(:run_periodic)) end
Private Instance Methods
on_message(record, parsed_time = nil)
click to toggle source
# File lib/fluent/plugin/in_exec_cron.rb, line 114 def on_message(record, parsed_time = nil) if val = record.delete(@tag_key) tag = val else tag = @tag end if parsed_time time = parsed_time else if val = record.delete(@time_key) time = @time_parse_proc.call(val) else time = Engine.now end end router.emit(tag, time, record) rescue => e log.error "exec failed to emit", :error => e.to_s, :error_class => e.class.to_s, :tag => tag, :record => Yajl.dump(record) end