class Fluent::KafkaInput::TopicWatcher

Public Class Methods

new(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, options={}) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka.rb, line 202
def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, options={})
  @topic_entry = topic_entry
  @kafka = kafka
  @callback = method(:consume)
  @parser = parser
  @add_prefix = add_prefix
  @add_suffix = add_suffix
  @options = options
  @offset_manager = offset_manager
  @router = router

  @next_offset = @topic_entry.offset
  if @topic_entry.offset == -1 && offset_manager
    @next_offset = offset_manager.next_offset
  end
  @fetch_args = {
    topic: @topic_entry.topic,
    partition: @topic_entry.partition,
  }.merge(@options)

  super(interval, true)
end

Public Instance Methods

consume() click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 233
def consume
  offset = @next_offset
  @fetch_args[:offset] = offset
  messages = @kafka.fetch_messages(@fetch_args)

  return if messages.size.zero?

  es = Fluent::MultiEventStream.new
  tag = @topic_entry.topic
  tag = @add_prefix + "." + tag if @add_prefix
  tag = tag + "." + @add_suffix if @add_suffix

  messages.each { |msg|
    begin
      record = @parser.call(msg, @topic_entry)
      if @use_record_time
        if @time_format
          record_time = @time_parser.parse(record['time'])
        else
          record_time = record['time']
        end
      else
        record_time = Fluent::Engine.now
      end
      es.add(record_time, record)
    rescue => e
      $log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset
      $log.debug_backtrace
    end
  }
  offset = messages.last.offset + 1

  unless es.empty?
    @router.emit_stream(tag, es)

    if @offset_manager
      @offset_manager.save_offset(offset)
    end
    @next_offset = offset
  end
end
on_timer() click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 225
def on_timer
  @callback.call
rescue => e
  # TODO log?
  $log.error e.to_s
  $log.error_backtrace
end