class Fluent::Plugin::NATSInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats.rb, line 31
def configure(conf)
  super

  @nats_config = {
    uri: "nats://#{@host}:#{@port}",
    ssl: @ssl,
    user: @user,
    pass: @password,
    reconnect_time_wait: @reconnect_time_wait,
    max_reconnect_attempts: @max_reconnect_attempts,
  }
end
run() click to toggle source
# File lib/fluent/plugin/in_nats.rb, line 62
def run
  EM.next_tick do
    @nats_conn = NATS.connect(@nats_config) do
      @queues.each do |queue|
        @nats_conn.subscribe(queue) do |msg, _reply, sub|
          tag = "#{@tag}.#{sub}"
          begin
            message = JSON.parse(msg)
          rescue JSON::ParserError => e
            log.error "Failed parsing JSON #{e.inspect}.  Passing as a normal string"
            message = msg
          end
          time = Engine.now
          router.emit(tag, time, message || {})
        end
      end
    end
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats.rb, line 56
def shutdown
  @nats_conn.close
  EM.stop if EM.reactor_running?
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats.rb, line 44
def start
  super
  NATS.on_error do |error|
    log.error "Server Error:", error: error
    # supervisor will restart worker
    exit!
  end
  run_reactor_thread
  thread_create(:nats_input_main, &method(:run))
  log.info "listening nats on #{@uri}/#{@queue}"
end

Private Instance Methods

run_reactor_thread() click to toggle source
# File lib/fluent/plugin/in_nats.rb, line 84
def run_reactor_thread
  return if EM.reactor_running?
  thread_create(:nats_input_reactor_thread) do
    EM.run
  end
end