class Fluent::Plugin::NATSOutput
Constants
- DEFAULT_FORMAT_TYPE
Attributes
formatter[RW]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats.rb, line 39 def configure(conf) super @nats_config = { uri: "nats://#{@host}:#{@port}", ssl: @ssl, user: @user, pass: @password, reconnect_time_wait: @reconnect_time_wait, max_reconnect_attempts: @max_reconnect_attempts, } @formatter = formatter_create end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_nats.rb, line 85 def format(tag, time, record) @formatter.format(tag, time, record) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_nats.rb, line 33 def multi_workers_ready? true end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_nats.rb, line 76 def process(tag, es) es = inject_values_to_event_stream(tag, es) es.each do |time,record| EM.next_tick do NATS.publish(tag, format(tag, time, record)) end end end
run()
click to toggle source
# File lib/fluent/plugin/out_nats.rb, line 65 def run NATS.on_error do |error| log.error "Server Error:", error: error # supervisor will restart worker exit! end NATS.start(@nats_config) do log.info "nats client is running for #{@nats_config[:uri]}" end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats.rb, line 58 def shutdown EM.next_tick do NATS.stop end super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats.rb, line 53 def start super thread_create(:nats_output_main, &method(:run)) end