class Fluent::Plugin::NatsStreamingOutput

Constants

DEFAULT_FORMAT_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats-streaming.rb, line 53
def initialize
  super
  @sc = nil
end

Public Instance Methods

close() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats-streaming.rb, line 141
def close
  super
  @sc.close if @sc
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats-streaming.rb, line 58
def configure(conf)
  super

  @sc_config = {
    servers: ["nats://#{server}"],
    reconnect_time_wait: @reconnect_time_wait,
    max_reconnect_attempts: @max_reconnect_attempts,
    connect_timeout: @connect_timeout
  }

  formatter_conf = conf.elements('format').first
  unless formatter_conf
    raise Fluent::ConfigError, "<format> section is required."
  end
  unless formatter_conf["@type"]
    raise Fluent::ConfigError, "format/@type is required."
  end
  @formatter_proc = setup_formatter(formatter_conf)
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 49
def formatted_to_msgpack_binary?
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 45
def multi_workers_ready?
  true
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 118
def process(tag, es)
  es = inject_values_to_event_stream(tag, es)
  es.each do |time,record|
    @sc.publish(tag, format(tag, time, record))
  end
end
run() click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 83
def run
  @sc = STAN::Client.new

  log.info "connect nats server nats://#{server} #{cluster_id} #{client_id}"
  @sc.connect(@cluster_id, @client_id.gsub(/\./, '_'), nats: @sc_config)
  log.info "connected"

  while thread_current_running?
    log.trace "test connection"
    @sc.nats.flush(@reconnect_time_wait)
    sleep(5)
  end
end
setup_formatter(conf) click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 97
def setup_formatter(conf)
  type = conf['@type']
  case type
  when 'json'
    begin
      require 'oj'
      Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
      Proc.new { |tag, time, record| Oj.dump(record) }
    rescue LoadError
      require 'yajl'
      Proc.new { |tag, time, record| Yajl::Encoder.encode(record) }
    end
  when 'ltsv'
    require 'ltsv'
    Proc.new { |tag, time, record| LTSV.dump(record) }
  else
    @formatter = formatter_create(usage: 'kafka-plugin', conf: conf)
    @formatter.method(:format)
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats-streaming.rb, line 78
def start
  super
  thread_create(:nats_streaming_output_main, &method(:run))
end
terminate() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nats-streaming.rb, line 146
def terminate
  super
  @sc = nil
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_nats-streaming.rb, line 125
def write(chunk)
  return if chunk.empty?
  tag = chunk.metadata.tag

  messages = 0
  chunk.each { |time, record|
    record_buf = @formatter_proc.call(tag, time, record)
    log.trace "Send record: #{record_buf}"
    @sc.publish(tag, record_buf, {timeout: @timeout} )
    messages += 1
  }
  if messages > 0
      log.debug { "#{messages} messages send." }
  end
end