class Fluent::Plugin::NATSOutput

Constants

DEFAULT_FORMAT_TYPE

Attributes

formatter[RW]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats.rb, line 39
def configure(conf)
  super

  @nats_config = {
    uri: "nats://#{@host}:#{@port}",
    ssl: @ssl,
    user: @user,
    pass: @password,
    reconnect_time_wait: @reconnect_time_wait,
    max_reconnect_attempts: @max_reconnect_attempts,
  }
  @formatter = formatter_create
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_nats.rb, line 85
def format(tag, time, record)
  @formatter.format(tag, time, record)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_nats.rb, line 33
def multi_workers_ready?
  true
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_nats.rb, line 76
def process(tag, es)
  es = inject_values_to_event_stream(tag, es)
  es.each do |time,record|
    EM.next_tick do
      NATS.publish(tag, format(tag, time, record))
    end
  end
end
run() click to toggle source
# File lib/fluent/plugin/out_nats.rb, line 65
def run
  NATS.on_error do |error|
    log.error "Server Error:", error: error
    # supervisor will restart worker
    exit!
  end
  NATS.start(@nats_config) do
    log.info "nats client is running for #{@nats_config[:uri]}"
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats.rb, line 58
def shutdown
  EM.next_tick do
    NATS.stop
  end
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats.rb, line 53
def start
  super
  thread_create(:nats_output_main, &method(:run))
end