class Fluent::Plugin::NSQInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nsq.rb, line 30
def configure(conf)
  super

  fail Fluent::ConfigError, 'Missing nsqlookupd' unless @nsqlookupd
  fail Fluent::ConfigError, 'Missing topic' unless @topic
  fail Fluent::ConfigError, 'Missing channel' unless @channel
  fail Fluent::ConfigError, 'in_flight needs to be bigger than 0' unless @in_flight > 0
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nsq.rb, line 51
def shutdown
  super
  @running = false
  @consumer.terminate
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nsq.rb, line 39
def start
  super
  @consumer = Nsq::Consumer.new(
    nsqlookupd: @nsqlookupd,
    topic: @topic,
    channel: @channel,
    max_in_flight: @in_flight
  )
  @running = true
  @thread = Thread.new(&method(:consume))
end

Private Instance Methods

consume() click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 58
def consume
  while @running
    consume_one
  end
end
consume_one() click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 64
def consume_one
  msg = @consumer.pop
  record = Yajl.load(msg.body.force_encoding('UTF-8'))
  record_tag = tag_for_record(record)
  record_time = time_for_record(record, msg)
  router.emit(record_tag, record_time, record)
  msg.finish
rescue => e
  log.warn("nsq: #{e}")
  msg.requeue if msg
end
tag_for_record(record) click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 76
def tag_for_record(record)
  case @tag_source
  when :static
    @tag
  when :key
    record[@tag]
  when :topic
    @topic
  end
end
time_for_record(record, msg) click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 87
def time_for_record(record, msg)
  if @time_key
    record[@time_key]
  else
    Fluent::EventTime.new(msg.timestamp.to_i, msg.timestamp.nsec)
  end
end