class Fluent::Plugin::ExecInput

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::PluginLoggerMixin.new
# File lib/fluent/plugin/in_exec.rb, line 31
def initialize
  super
  require 'fluent/plugin/exec_util'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/in_exec.rb, line 55
def configure(conf)
  super

  if conf['localtime']
    @localtime = true
  elsif conf['utc']
    @localtime = false
  end

  if conf['timezone']
    @timezone = conf['timezone']
    Fluent::Timezone.validate!(@timezone)
  end

  if !@tag && !@tag_key
    raise Fleunt::ConfigError, "'tag' or 'tag_key' option is required on exec input"
  end

  if @time_key
    if @time_format
      f = @time_format
      @time_parse_proc =
        begin
          strptime = Strptime.new(f)
          Proc.new { |str| Fluent::EventTime.from_time(strptime.exec(str)) }
        rescue
          Proc.new {|str| Fluent::EventTime.from_time(Time.strptime(str, f)) }
        end
    else
      @time_parse_proc = Proc.new {|str| Fluent::EventTime.from_time(Time.at(str.to_f)) }
    end
  end

  @parser = setup_parser(conf)
end
run(io) click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 121
def run(io)
  @parser.call(io)
end
setup_parser(conf) click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 91
def setup_parser(conf)
  case @format
  when 'tsv'
    if @keys.empty?
      raise Fluent::ConfigError, "keys option is required on exec input for tsv format"
    end
    Fluent::ExecUtil::TSVParser.new(@keys, method(:on_message))
  when 'json'
    Fluent::ExecUtil::JSONParser.new(method(:on_message))
  when 'msgpack'
    Fluent::ExecUtil::MessagePackParser.new(method(:on_message))
  else
    Fluent::ExecUtil::TextParserWrapperParser.new(conf, method(:on_message))
  end
end
start() click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#start
# File lib/fluent/plugin/in_exec.rb, line 107
def start
  super

  if @run_interval
    child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read]) do |io|
      run(io)
    end
  else
    child_process_execute(:exec_input, @command, immediate: true, mode: [:read]) do |io|
      run(io)
    end
  end
end

Private Instance Methods

on_message(record, parsed_time = nil) click to toggle source
# File lib/fluent/plugin/in_exec.rb, line 127
def on_message(record, parsed_time = nil)
  if val = record.delete(@tag_key)
    tag = val
  else
    tag = @tag
  end

  if parsed_time
    time = parsed_time
  else
    if val = record.delete(@time_key)
      time = @time_parse_proc.call(val)
    else
      time = Fluent::EventTime.now
    end
  end

  router.emit(tag, time, record)
rescue => e
  log.error "exec failed to emit", error: e, tag: tag, record: Yajl.dump(record)
end