class Fluent::KafkaInput::TopicWatcher
Public Class Methods
new(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, record_time_key, tag_source, record_tag_key, options={})
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka.rb, line 258 def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, record_time_key, tag_source, record_tag_key, options={}) @topic_entry = topic_entry @kafka = kafka @callback = method(:consume) @parser = parser @add_prefix = add_prefix @add_suffix = add_suffix @options = options @offset_manager = offset_manager @router = router @kafka_message_key = kafka_message_key @time_source = time_source @record_time_key = record_time_key @tag_source = tag_source @record_tag_key = record_tag_key @next_offset = @topic_entry.offset if @topic_entry.offset == -1 && offset_manager @next_offset = offset_manager.next_offset end @fetch_args = { topic: @topic_entry.topic, partition: @topic_entry.partition, }.merge(@options) super(interval, true) end
Public Instance Methods
consume()
click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 294 def consume offset = @next_offset @fetch_args[:offset] = offset messages = @kafka.fetch_messages(**@fetch_args) return if messages.size.zero? es = Fluent::MultiEventStream.new tag = @topic_entry.topic tag = @add_prefix + "." + tag if @add_prefix tag = tag + "." + @add_suffix if @add_suffix messages.each { |msg| begin record = @parser.call(msg, @topic_entry) if @tag_source == :record tag = record[@record_tag_key] tag = @add_prefix + "." + tag if @add_prefix tag = tag + "." + @add_suffix if @add_suffix end case @time_source when :kafka record_time = Fluent::EventTime.from_time(msg.create_time) when :now record_time = Fluent::Engine.now when :record if @time_format record_time = @time_parser.parse(record[@record_time_key]) else record_time = record[@record_time_key] end else $log.fatal "BUG: invalid time_source: #{@time_source}" end if @kafka_message_key record[@kafka_message_key] = msg.key end es.add(record_time, record) rescue => e $log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset $log.debug_backtrace end } offset = messages.last.offset + 1 unless es.empty? @router.emit_stream(tag, es) if @offset_manager @offset_manager.save_offset(offset) end @next_offset = offset end end
on_timer()
click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 286 def on_timer @callback.call rescue => e # TODO log? $log.error e.to_s $log.error_backtrace end