class Fluent::Plugin::NATSInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats.rb, line 31 def configure(conf) super @nats_config = { uri: "nats://#{@host}:#{@port}", ssl: @ssl, user: @user, pass: @password, reconnect_time_wait: @reconnect_time_wait, max_reconnect_attempts: @max_reconnect_attempts, } end
run()
click to toggle source
# File lib/fluent/plugin/in_nats.rb, line 62 def run EM.next_tick do @nats_conn = NATS.connect(@nats_config) do @queues.each do |queue| @nats_conn.subscribe(queue) do |msg, _reply, sub| tag = "#{@tag}.#{sub}" begin message = JSON.parse(msg) rescue JSON::ParserError => e log.error "Failed parsing JSON #{e.inspect}. Passing as a normal string" message = msg end time = Engine.now router.emit(tag, time, message || {}) end end end end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats.rb, line 56 def shutdown @nats_conn.close EM.stop if EM.reactor_running? super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats.rb, line 44 def start super NATS.on_error do |error| log.error "Server Error:", error: error # supervisor will restart worker exit! end run_reactor_thread thread_create(:nats_input_main, &method(:run)) log.info "listening nats on #{@uri}/#{@queue}" end
Private Instance Methods
run_reactor_thread()
click to toggle source
# File lib/fluent/plugin/in_nats.rb, line 84 def run_reactor_thread return if EM.reactor_running? thread_create(:nats_input_reactor_thread) do EM.run end end