class Fluent::Plugin::NSQOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 23 def configure(conf) super fail Fluent::ConfigError, 'Missing nsqlookupd or nsqd' unless @nsqlookupd || @nsqd fail Fluent::ConfigError, 'Missing topic' unless @topic if @enable_tls fail Fluent::ConfigError, 'Missing tls config params' unless @key && @certificate && @ca_certificate end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 56 def shutdown @producer.terminate super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_nsq.rb, line 33 def start super producer_config = {topic: @topic} if @nsqlookupd producer_config[:nsqlookupd] = @nsqlookupd elsif @nsqd producer_config[:nsqd] = @nsqd end if @enable_tls producer_config[:tls_v1] = true producer_config[:tls_options] = { key: @key, certificate: @certificate, ca_certificate: @ca_certificate, verify_mode: OpenSSL::SSL::VERIFY_PEER } end @producer = Nsq::Producer.new(producer_config) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_nsq.rb, line 61 def write(chunk) return if chunk.empty? tag = chunk.metadata.tag chunk.each do |time, record| tagged_record = record.merge( :_key => tag, :_ts => time.to_f, :'@timestamp' => Time.at(time).iso8601(3) # kibana/elasticsearch friendly ) begin @producer.write(Yajl.dump(tagged_record)) rescue => e log.warn("nsq: #{e}") end end end