class Fluent::KafkaInput::TopicWatcher

Public Class Methods

new(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, record_time_key, tag_source, record_tag_key, options={}) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka.rb, line 258
def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, record_time_key, tag_source, record_tag_key, options={})
  @topic_entry = topic_entry
  @kafka = kafka
  @callback = method(:consume)
  @parser = parser
  @add_prefix = add_prefix
  @add_suffix = add_suffix
  @options = options
  @offset_manager = offset_manager
  @router = router
  @kafka_message_key = kafka_message_key
  @time_source = time_source
  @record_time_key = record_time_key
  @tag_source = tag_source
  @record_tag_key = record_tag_key

  @next_offset = @topic_entry.offset
  if @topic_entry.offset == -1 && offset_manager
    @next_offset = offset_manager.next_offset
  end
  @fetch_args = {
    topic: @topic_entry.topic,
    partition: @topic_entry.partition,
  }.merge(@options)

  super(interval, true)
end

Public Instance Methods

consume() click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 294
def consume
  offset = @next_offset
  @fetch_args[:offset] = offset
  messages = @kafka.fetch_messages(**@fetch_args)

  return if messages.size.zero?

  es = Fluent::MultiEventStream.new
  tag = @topic_entry.topic
  tag = @add_prefix + "." + tag if @add_prefix
  tag = tag + "." + @add_suffix if @add_suffix

  messages.each { |msg|
    begin
      record = @parser.call(msg, @topic_entry)
      if @tag_source == :record
        tag = record[@record_tag_key]
        tag = @add_prefix + "." + tag if @add_prefix
        tag = tag + "." + @add_suffix if @add_suffix
      end
      case @time_source
      when :kafka
        record_time = Fluent::EventTime.from_time(msg.create_time)
      when :now
        record_time = Fluent::Engine.now
      when :record
        if @time_format
          record_time = @time_parser.parse(record[@record_time_key])
        else
          record_time = record[@record_time_key]
        end
      else
        $log.fatal "BUG: invalid time_source: #{@time_source}"
      end
      if @kafka_message_key
        record[@kafka_message_key] = msg.key
      end
      es.add(record_time, record)
    rescue => e
      $log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset
      $log.debug_backtrace
    end
  }
  offset = messages.last.offset + 1

  unless es.empty?
    @router.emit_stream(tag, es)

    if @offset_manager
      @offset_manager.save_offset(offset)
    end
    @next_offset = offset
  end
end
on_timer() click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 286
def on_timer
  @callback.call
rescue => e
  # TODO log?
  $log.error e.to_s
  $log.error_backtrace
end