class Fluent::Plugin::RdKafkaGroupInput

Constants

BufferError

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rdkafka_group.rb, line 65
def initialize
  super

  @time_parser = nil
  @retry_count = 1
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rdkafka_group.rb, line 86
def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  log.warn "The in_rdkafka_group consumer was not yet tested under heavy production load. Use it at your own risk!"

  log.info "Will watch for topics #{@topics} at brokers " \
            "#{@kafka_configs["bootstrap.servers"]} and '#{@kafka_configs["group.id"]}' group"

  @topics = _config_to_array(@topics)

  parser_conf = conf.elements('parse').first
  unless parser_conf
    raise Fluent::ConfigError, "<parse> section or format parameter is required."
  end
  unless parser_conf["@type"]
    raise Fluent::ConfigError, "parse/@type is required."
  end
  @parser_proc = setup_parser(parser_conf)

  @time_source = :record if @use_record_time

  if @time_source == :record and @time_format
    @time_parser = Fluent::TimeParser.new(@time_format)
  end
end
each_batch() { |batch| ... } click to toggle source

Executes the passed codeblock on a batch of messages. It is guaranteed that every message in a given batch belongs to the same topic, because the tagging logic in :run expects that property. The number of maximum messages in a batch is capped by the :max_batch_size configuration value. It ensures that consuming from a single topic for a long time (e.g. with `auto.offset.reset` set to `earliest`) does not lead to memory exhaustion. Also, calling consumer.poll advances thes consumer offset, so in case the process crashes we might lose at most :max_batch_size messages.

# File lib/fluent/plugin/in_rdkafka_group.rb, line 205
def each_batch(&block)
  batch = nil
  message = nil
  while @consumer
    message = @consumer.poll(@max_wait_time_ms)
    if message
      if not batch
        batch = Batch.new(message.topic)
      elsif batch.topic != message.topic || batch.messages.size >= @max_batch_size
        yield batch
        batch = Batch.new(message.topic)
      end
      batch.messages << message
    else
      yield batch if batch
      batch = nil
    end
  end
  yield batch if batch
end
emit_events(tag, es) click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 284
def emit_events(tag, es)
  retries = 0
  begin
    router.emit_stream(tag, es)
  rescue BufferError
    raise ForShutdown if @consumer.nil?

    if @retry_emit_limit.nil?
      sleep 1
      retry
    end

    if retries < @retry_emit_limit
      retries += 1
      sleep 1
      retry
    else
      raise RuntimeError, "Exceeds retry_emit_limit"
    end
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 80
def multi_workers_ready?
  true
end
reconnect_consumer() click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 169
def reconnect_consumer
  log.warn "Stopping Consumer"
  consumer = @consumer
  @consumer = nil
  if consumer
    consumer.close
  end
  log.warn "Could not connect to broker. retry_time:#{@retry_count}. Next retry will be in #{@retry_wait_seconds} seconds"
  @retry_count = @retry_count + 1
  sleep @retry_wait_seconds
  @consumer = setup_consumer
  log.warn "Re-starting consumer #{Time.now.to_s}"
  @retry_count = 0
rescue =>e
  log.error "unexpected error during re-starting consumer object access", :error => e.to_s
  log.error_backtrace
  if @retry_count <= @retry_limit or disable_retry_limit
    reconnect_consumer
  end
end
run() click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 226
def run
  while @consumer
    begin
      each_batch { |batch|
        log.debug "A new batch for topic #{batch.topic} with #{batch.messages.size} messages"
        es = Fluent::MultiEventStream.new
        tag = batch.topic
        tag = @add_prefix + "." + tag if @add_prefix
        tag = tag + "." + @add_suffix if @add_suffix

        batch.messages.each { |msg|
          begin
            record = @parser_proc.call(msg)
            case @time_source
            when :kafka
              record_time = Fluent::EventTime.from_time(msg.timestamp)
            when :now
              record_time = Fluent::Engine.now
            when :record
              if @time_format
                record_time = @time_parser.parse(record[@record_time_key].to_s)
              else
                record_time = record[@record_time_key]
              end
            else
              log.fatal "BUG: invalid time_source: #{@time_source}"
            end
            if @kafka_message_key
              record[@kafka_message_key] = msg.key
            end
            if @add_headers
              msg.headers.each_pair { |k, v|
                record[k] = v
              }
            end
            es.add(record_time, record)
          rescue => e
            log.warn "parser error in #{msg.topic}/#{msg.partition}", :error => e.to_s, :value => msg.payload, :offset => msg.offset
            log.debug_backtrace
          end
        }

        unless es.empty?
          emit_events(tag, es)
        end
      }
    rescue ForShutdown
    rescue => e
      log.error "unexpected error during consuming events from kafka. Re-fetch events.", :error => e.to_s
      log.error_backtrace
      reconnect_consumer
    end
  end
rescue => e
  log.error "unexpected error during consumer object access", :error => e.to_s
  log.error_backtrace
end
setup_consumer() click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 163
def setup_consumer
  consumer = Rdkafka::Config.new(@kafka_configs).consumer
  consumer.subscribe(*@topics)
  consumer
end
setup_parser(parser_conf) click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 114
def setup_parser(parser_conf)
  format = parser_conf["@type"]
  case format
  when 'json'
    begin
      require 'oj'
      Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
      Proc.new { |msg| Oj.load(msg.payload) }
    rescue LoadError
      require 'yajl'
      Proc.new { |msg| Yajl::Parser.parse(msg.payload) }
    end
  when 'ltsv'
    require 'ltsv'
    Proc.new { |msg| LTSV.parse(msg.payload, {:symbolize_keys => false}).first }
  when 'msgpack'
    require 'msgpack'
    Proc.new { |msg| MessagePack.unpack(msg.payload) }
  when 'text'
    Proc.new { |msg| {@message_key => msg.payload} }
  else
    @custom_parser = parser_create(usage: 'in-rdkafka-plugin', conf: parser_conf)
    Proc.new { |msg|
      @custom_parser.parse(msg.payload) {|_time, record|
        record
      }
    }
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rdkafka_group.rb, line 152
def shutdown
  # This nil assignment should be guarded by mutex in multithread programming manner.
  # But the situation is very low contention, so we don't use mutex for now.
  # If the problem happens, we will add a guard for consumer.
  consumer = @consumer
  @consumer = nil
  consumer.close

  super
end
start() click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 144
def start
  super

  @consumer = setup_consumer

  thread_create(:in_rdkafka_group, &method(:run))
end

Private Instance Methods

_config_to_array(config) click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 72
def _config_to_array(config)
  config_array = config.split(',').map {|k| k.strip }
  if config_array.empty?
    raise Fluent::ConfigError, "kafka_group: '#{config}' is a required parameter"
  end
  config_array
end