class Fluent::Plugin::NSQOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 23
def configure(conf)
  super

  fail Fluent::ConfigError, 'Missing nsqlookupd or nsqd' unless @nsqlookupd || @nsqd
  fail Fluent::ConfigError, 'Missing topic' unless @topic
  if @enable_tls
    fail Fluent::ConfigError, 'Missing tls config params' unless @key && @certificate && @ca_certificate
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 56
def shutdown
  @producer.terminate
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 33
def start
  super
  producer_config = {topic: @topic}

  if  @nsqlookupd
    producer_config[:nsqlookupd] = @nsqlookupd
  elsif @nsqd
    producer_config[:nsqd] = @nsqd
  end

  if @enable_tls
    producer_config[:tls_v1] = true
    producer_config[:tls_options] = {
          key: @key,
          certificate: @certificate,
          ca_certificate: @ca_certificate,
          verify_mode: OpenSSL::SSL::VERIFY_PEER
      }
  end
  
  @producer = Nsq::Producer.new(producer_config)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_nsq.rb, line 61
def write(chunk)
  return if chunk.empty?

  tag = chunk.metadata.tag
  chunk.each do |time, record|
    tagged_record = record.merge(
      :_key => tag,
      :_ts => time.to_f,
      :'@timestamp' => Time.at(time).iso8601(3) # kibana/elasticsearch friendly
    )
    begin
      @producer.write(Yajl.dump(tagged_record))
    rescue => e
      log.warn("nsq: #{e}")
    end
  end
end