class Fluent::NorikraFilterOutput

Attributes

execute_server[R]

<server>

execute_server_path[R]

<server>

Public Instance Methods

client(opts={}) click to toggle source
# File lib/fluent/plugin/out_norikra_filter.rb, line 90
def client(opts={})
  Norikra::Client.new(@host, @port, {
      :connect_timeout => opts[:connect_timeout] || @connect_timeout,
      :send_timeout    => opts[:send_timeout]    || @send_timeout,
      :receive_timeout => opts[:receive_timeout] || @receive_timeout,
    })
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_norikra_filter.rb, line 58
def configure(conf)
  super

  @host,@port = @norikra.split(':', 2)
  @port = @port.to_i

  if !@target_map_tag && @target_map_key.nil? && @target_string.nil?
    raise Fluent::ConfigError, 'target naming not specified (target_map_tag/target_map_key/target_string)'
  end

  @execute_server = false

  conf.elements.each do |element|
    case element.name
    when 'default', 'target'
      # ignore: processed in OutputMixin
    when 'fetch'
      # ignore: processed in InputMixin, and set @fetch_queue
    when 'server'
      @execute_server = true
      @execute_jruby_path = element['jruby']
      @execute_server_path = element['path']
      @execute_server_opts = element['opts']
    else
      raise Fluent::ConfigError, "unknown configuration section name for this plugin: #{element.name}"
    end
  end

  setup_output(conf, true) # <query> enabled in <default> and <target TARGET>
  setup_input(conf)
end
fetchable?() click to toggle source
# File lib/fluent/plugin/out_norikra_filter.rb, line 175
def fetchable?
  @norikra_started
end
server_starter() click to toggle source
# File lib/fluent/plugin/out_norikra_filter.rb, line 138
def server_starter
  log.info "starting Norikra server process #{@host}:#{@port}"
  base_options = [@execute_server_path, 'start', '-H', @host, '-P', @port.to_s]
  cmd,options = if @execute_jruby_path
                  [@execute_jruby_path, [@execute_server_path, 'start', '-H', @host, '-P', @port.to_s]]
                else
                  [@execute_server_path, ['start', '-H', @host, '-P', @port.to_s]]
                end
  if @execute_server_opts
    options += @execute_server_opts.split(/ +/)
  end
  @norikra_pid = fork do
    ENV.keys.select{|k| k =~ /^(RUBY|GEM|BUNDLE|RBENV|RVM|rvm)/}.each {|k| ENV.delete(k)}
    exec([cmd, 'norikra(fluentd)'], *options)
  end
  connecting = true
  log.info "trying to confirm norikra server status..."
  while connecting
    begin
      log.debug "start to connect norikra server #{@host}:#{@port}"
      client(:connect_timeout => 1, :send_timeout => 1, :receive_timeout => 1).targets
      # discard result: no exceptions is success
      connecting = false
      next
    rescue HTTPClient::TimeoutError
      log.debug "Norikra server test connection timeout. retrying..."
    rescue Errno::ECONNREFUSED
      log.debug "Norikra server test connection refused. retrying..."
    rescue => e
      log.error "unknown error in confirming norikra server, #{e.class}:#{e.message}"
    end
    sleep 3
  end
  log.info "confirmed that norikra server #{@host}:#{@port} started."
  @norikra_started = true
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_norikra_filter.rb, line 117
def shutdown
  stop_output
  stop_input
  Process.kill(:TERM, @norikra_pid) if @execute_server

  shutdown_output
  shutdown_input

  if @execute_server
    begin
      counter = 0
      while !Process.waitpid(@norikra_pid, Process::WNOHANG)
        sleep 1
        break if counter > 3
      end
    rescue Errno::ECHILD
      # norikra server process exited.
    end
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_norikra_filter.rb, line 98
def start
  super

  @norikra_started = false

  if @execute_server
    @norikra_pid = nil
    @norikra_thread = Thread.new(&method(:server_starter))
    # @norikra_started will be set in server_starter
  else
    @norikra_pid = nil
    @norikra_thread = nil
    @norikra_started = true
  end

  start_output
  start_input
end