class Fluent::KafkaInput::TopicWatcher
Public Class Methods
new(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, options={})
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka.rb, line 229 def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, options={}) @topic_entry = topic_entry @kafka = kafka @callback = method(:consume) @parser = parser @add_prefix = add_prefix @add_suffix = add_suffix @options = options @offset_manager = offset_manager @router = router @kafka_message_key = kafka_message_key @next_offset = @topic_entry.offset if @topic_entry.offset == -1 && offset_manager @next_offset = offset_manager.next_offset end @fetch_args = { topic: @topic_entry.topic, partition: @topic_entry.partition, }.merge(@options) super(interval, true) end
Public Instance Methods
consume()
click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 261 def consume offset = @next_offset @fetch_args[:offset] = offset messages = @kafka.fetch_messages(@fetch_args) return if messages.size.zero? es = Fluent::MultiEventStream.new tag = @topic_entry.topic tag = @add_prefix + "." + tag if @add_prefix tag = tag + "." + @add_suffix if @add_suffix messages.each { |msg| begin record = @parser.call(msg, @topic_entry) if @use_record_time if @time_format record_time = @time_parser.parse(record['time']) else record_time = record['time'] end else record_time = Fluent::Engine.now end if @kafka_message_key record[@kafka_message_key] = msg.key end es.add(record_time, record) rescue => e $log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset $log.debug_backtrace end } offset = messages.last.offset + 1 unless es.empty? @router.emit_stream(tag, es) if @offset_manager @offset_manager.save_offset(offset) end @next_offset = offset end end
on_timer()
click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 253 def on_timer @callback.call rescue => e # TODO log? $log.error e.to_s $log.error_backtrace end