class Fluent::Plugin::NSQInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nsq.rb, line 30 def configure(conf) super fail Fluent::ConfigError, 'Missing nsqlookupd' unless @nsqlookupd fail Fluent::ConfigError, 'Missing topic' unless @topic fail Fluent::ConfigError, 'Missing channel' unless @channel fail Fluent::ConfigError, 'in_flight needs to be bigger than 0' unless @in_flight > 0 end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nsq.rb, line 51 def shutdown super @running = false @consumer.terminate end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_nsq.rb, line 39 def start super @consumer = Nsq::Consumer.new( nsqlookupd: @nsqlookupd, topic: @topic, channel: @channel, max_in_flight: @in_flight ) @running = true @thread = Thread.new(&method(:consume)) end
Private Instance Methods
consume()
click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 58 def consume while @running consume_one end end
consume_one()
click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 64 def consume_one msg = @consumer.pop record = Yajl.load(msg.body.force_encoding('UTF-8')) record_tag = tag_for_record(record) record_time = time_for_record(record, msg) router.emit(record_tag, record_time, record) msg.finish rescue => e log.warn("nsq: #{e}") msg.requeue if msg end
tag_for_record(record)
click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 76 def tag_for_record(record) case @tag_source when :static @tag when :key record[@tag] when :topic @topic end end
time_for_record(record, msg)
click to toggle source
# File lib/fluent/plugin/in_nsq.rb, line 87 def time_for_record(record, msg) if @time_key record[@time_key] else Fluent::EventTime.new(msg.timestamp.to_i, msg.timestamp.nsec) end end