class LogStash::Inputs::Nsq

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/nsq.rb, line 20
def register
  @logger.info('Registering nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
  if @tls_key and @tls_cert
    @consumer = Nsq::Consumer.new(
     :nsqlookupd => @nsqlookupd,
     :topic => @topic,
     :channel => @channel,
     :max_in_flight => @max_in_flight,
     :tls_v1 => @tls_v1,
     :tls_context => {
      key: @tls_key,
      certificate: @tls_cert
     }
    )
  else
    @consumer = Nsq::Consumer.new(
     :nsqlookupd => @nsqlookupd,
     :topic => @topic,
     :channel => @channel,
     :tls_v1 => @tls_v1,
     :max_in_flight => @max_in_flight
    )
  end
end
run(logstash_queue) click to toggle source
# File lib/logstash/inputs/nsq.rb, line 46
def run(logstash_queue)
  @logger.info('Running nsq', :channel => @channel, :topic => @topic, :nsqlookupd => @nsqlookupd)
  begin
     while !stop?
        #@logger.info('consuming...')
        event = @consumer.pop
        #@logger.info('processing:', :event => event)
        queue_event(event.body, logstash_queue)
        event.finish
     end
     @logger.warn('Done running nsq input')
  end
end
stop() click to toggle source
# File lib/logstash/inputs/nsq.rb, line 83
def stop
  @logger.warn('nsq got stop signal. terminate')
  @consumer.terminate
end

Private Instance Methods

queue_event(body, output_queue) click to toggle source
# File lib/logstash/inputs/nsq.rb, line 61
def queue_event(body, output_queue)
  begin
      if @multi_events
        events = body.split("\n")
        events.each do |ev|
          event = LogStash::Event.new("message" => ev)
          decorate(event)
          output_queue << event
        end
      else
        @codec.decode(body) do |event|
          decorate(event)
          output_queue << event
        end
      end
  rescue => e # parse or event creation error
     @logger.error('Failed to create event', :message => "#{body}", :exception => e, :multi_events => @multi_events,
              :backtrace => e.backtrace)
  end # begin
end