class Fluent::ExecCronInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_exec_cron.rb, line 5
def initialize
  super
  require 'fluent/plugin/exec_util'
  require 'fluent/timezone'
  require 'parse-cron'
  require 'erb'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_exec_cron.rb, line 30
def configure(conf)
  super

  if localtime = conf['localtime']
    @localtime = true
  elsif utc = conf['utc']
    @localtime = false
  end

  if conf['timezone']
    @timezone = conf['timezone']
    Fluent::Timezone.validate!(@timezone)
  end

  if !@tag && !@tag_key
    raise ConfigError, "'tag' or 'tag_key' option is required on exec input"
  end

  if @time_key
    if @time_format
      f = @time_format
      @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i }
    else
      @time_parse_proc = Proc.new {|str| str.to_i }
    end
  end

                    @parser = setup_parser(conf)

  begin
    @cron_parser = CronParser.new(@cron)
  rescue => e
    raise ConfigError, "invalid cron expression. [#{@cron}]"
  end
  @command = ERB.new(@command.gsub(/\$\{([^}]+)\}/, '<%= \1 %>'))
end
run_periodic() click to toggle source
# File lib/fluent/plugin/in_exec_cron.rb, line 97
def run_periodic
  until @finished
    begin
      secs = @cron_parser.next(Time.now) - Time.now
      sleep secs
      io = IO.popen(@command.result(binding), "r")
      @parser.call(io)
      Process.waitpid(io.pid)
    rescue
      log.error "exec failed to run or shutdown child process", :error => $!.to_s, :error_class => $!.class.to_s
      log.warn_backtrace $!.backtrace
    end
  end
end
setup_parser(conf) click to toggle source
# File lib/fluent/plugin/in_exec_cron.rb, line 67
def setup_parser(conf)
  case @format
  when 'tsv'
    if @keys.empty?
      raise ConfigError, "keys option is required on exec input for tsv format"
    end
    ExecUtil::TSVParser.new(@keys, method(:on_message))
  when 'json'
    ExecUtil::JSONParser.new(method(:on_message))
  when 'msgpack'
    ExecUtil::MessagePackParser.new(method(:on_message))
  else
    ExecUtil::TextParserWrapperParser.new(conf, method(:on_message))
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_exec_cron.rb, line 88
def shutdown
  @finished = true
  if @graceful_shutdown
    @thread.join
  else
    Thread.kill(@thread)
  end
end
start() click to toggle source
# File lib/fluent/plugin/in_exec_cron.rb, line 83
def start
  @finished = false
  @thread = Thread.new(&method(:run_periodic))
end

Private Instance Methods

on_message(record, parsed_time = nil) click to toggle source
# File lib/fluent/plugin/in_exec_cron.rb, line 114
def on_message(record, parsed_time = nil)
  if val = record.delete(@tag_key)
    tag = val
  else
    tag = @tag
  end
  
  if parsed_time
    time = parsed_time
  else
    if val = record.delete(@time_key)
      time = @time_parse_proc.call(val)
    else
      time = Engine.now
    end
  end
  
  router.emit(tag, time, record)
rescue => e
  log.error "exec failed to emit", :error => e.to_s, :error_class => e.class.to_s, :tag => tag, :record => Yajl.dump(record)
end