class Fluent::Plugin::NatsStreamingInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats-streaming.rb, line 33
def initialize
  super
  @sc = nil
end

Public Instance Methods

close() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats-streaming.rb, line 89
def close
  super
  @sc.close if @sc
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats-streaming.rb, line 38
def configure(conf)
  super

  @sc_config = {
    servers: ["nats://#{server}"],
    reconnect_time_wait: @reconnect_time_wait,
    max_reconnect_attempts: @max_reconnect_attempts
  }

  @sub_opts = {
    queue: @queue,
    durable_name: @durable_name,
    start_at: :first,
    deliver_all_available: true,
    ack_wait: 10,  # seconds
    connect_timeout: 2 # seconds
  }
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_nats-streaming.rb, line 29
def multi_workers_ready?
  true
end
run() click to toggle source
# File lib/fluent/plugin/in_nats-streaming.rb, line 62
def run
  @sc = STAN::Client.new

  log.info "connect nats server nats://#{server} #{cluster_id} #{client_id}"
  @sc.connect(@cluster_id, @client_id.gsub(/\./, '_'), nats: @sc_config)
  log.info "connected"

  log.info "subscribe #{channel} #{queue} #{durable_name}"
  @sc.subscribe(@channel, @sub_opts) do |msg|
    tag = @channel
    begin
      message = JSON.parse(msg.data)
    rescue JSON::ParserError => e
      log.error "Failed parsing JSON #{e.inspect}.  Passing as a normal string"
      message = msg.data
    end
    time = Fluent::Engine.now
    router.emit(tag, time, message || {})
  end

  while thread_current_running?
    log.trace "test connection"
    @sc.nats.flush(@reconnect_time_wait)
    sleep(5)
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats-streaming.rb, line 57
def start
  super
  thread_create(:nats_streaming_input_main, &method(:run))
end
terminate() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nats-streaming.rb, line 94
def terminate
  super
  @sc = nil
end