class Fluent::Plugin::NatsStreamingInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats-streaming.rb, line 33 def initialize super @sc = nil end
Public Instance Methods
close()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats-streaming.rb, line 89 def close super @sc.close if @sc end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats-streaming.rb, line 38 def configure(conf) super @sc_config = { servers: ["nats://#{server}"], reconnect_time_wait: @reconnect_time_wait, max_reconnect_attempts: @max_reconnect_attempts } @sub_opts = { queue: @queue, durable_name: @durable_name, start_at: :first, deliver_all_available: true, ack_wait: 10, # seconds connect_timeout: 2 # seconds } end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_nats-streaming.rb, line 29 def multi_workers_ready? true end
run()
click to toggle source
# File lib/fluent/plugin/in_nats-streaming.rb, line 62 def run @sc = STAN::Client.new log.info "connect nats server nats://#{server} #{cluster_id} #{client_id}" @sc.connect(@cluster_id, @client_id.gsub(/\./, '_'), nats: @sc_config) log.info "connected" log.info "subscribe #{channel} #{queue} #{durable_name}" @sc.subscribe(@channel, @sub_opts) do |msg| tag = @channel begin message = JSON.parse(msg.data) rescue JSON::ParserError => e log.error "Failed parsing JSON #{e.inspect}. Passing as a normal string" message = msg.data end time = Fluent::Engine.now router.emit(tag, time, message || {}) end while thread_current_running? log.trace "test connection" @sc.nats.flush(@reconnect_time_wait) sleep(5) end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats-streaming.rb, line 57 def start super thread_create(:nats_streaming_input_main, &method(:run)) end
terminate()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats-streaming.rb, line 94 def terminate super @sc = nil end