class Fluent::KafkaGroupInput

Constants

BufferError

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka_group.rb, line 89
def initialize
  super
  require 'kafka'

  @time_parser = nil
  @retry_count = 1
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka_group.rb, line 111
def configure(conf)
  super

  $log.info "Will watch for topics #{@topics} at brokers " \
            "#{@brokers} and '#{@consumer_group}' group"

  @topics = _config_to_array(@topics)

  if conf['max_wait_ms']
    log.warn "'max_wait_ms' parameter is deprecated. Use second unit 'max_wait_time' instead"
    @max_wait_time = conf['max_wait_ms'].to_i / 1000
  end

  @parser_proc = setup_parser(conf)

  @consumer_opts = {:group_id => @consumer_group}
  @consumer_opts[:session_timeout] = @session_timeout if @session_timeout
  @consumer_opts[:offset_commit_interval] = @offset_commit_interval if @offset_commit_interval
  @consumer_opts[:offset_commit_threshold] = @offset_commit_threshold if @offset_commit_threshold
  @consumer_opts[:fetcher_max_queue_size] = @fetcher_max_queue_size if @fetcher_max_queue_size

  @fetch_opts = {}
  @fetch_opts[:max_wait_time] = @max_wait_time if @max_wait_time
  @fetch_opts[:min_bytes] = @min_bytes if @min_bytes

  @time_source = :record if @use_record_time

  if @time_source == :record and @time_format
    if defined?(Fluent::TimeParser)
      @time_parser = Fluent::TimeParser.new(@time_format)
    else
      @time_parser = Fluent::TextParser::TimeParser.new(@time_format)
    end
  end

  if @time_source == :record && defined?(Fluent::NumericTimeParser)
    @float_numeric_parse = Fluent::NumericTimeParser.new(:float)
  end
end
emit_events(tag, es) click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 367
def emit_events(tag, es)
  retries = 0
  begin
    router.emit_stream(tag, es)
  rescue BufferError
    raise ForShutdown if @consumer.nil?

    if @retry_emit_limit.nil?
      sleep 1
      retry
    end

    if retries < @retry_emit_limit
      retries += 1
      sleep 1
      retry
    else
      raise RuntimeError, "Exceeds retry_emit_limit"
    end
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 105
def multi_workers_ready?
  true
end
process_batch(batch) click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 300
def process_batch(batch)
  es = Fluent::MultiEventStream.new
  tag = batch.topic
  tag = @add_prefix + "." + tag if @add_prefix
  tag = tag + "." + @add_suffix if @add_suffix

  batch.messages.each { |msg|
    begin
      record = @parser_proc.call(msg)
      case @time_source
      when :kafka
        record_time = Fluent::EventTime.from_time(msg.create_time)
      when :now
        record_time = Fluent::Engine.now
      when :record
        record_time = record[@record_time_key]

        if @time_format
          record_time = @time_parser.parse(record_time.to_s)
        elsif record_time.is_a?(Float) && @float_numeric_parse
          record_time = @float_numeric_parse.parse(record_time)
        end
      else
        log.fatal "BUG: invalid time_source: #{@time_source}"
      end
      if @kafka_message_key
        record[@kafka_message_key] = msg.key
      end
      if @add_headers
        msg.headers.each_pair { |k, v|
          record[k] = v
        }
      end
      es.add(record_time, record)
    rescue => e
      log.warn "parser error in #{batch.topic}/#{batch.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset
      log.debug_backtrace
    end
  }

  unless es.empty?
    emit_events(tag, es)
  end
end
process_batch_with_record_tag(batch) click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 255
def process_batch_with_record_tag(batch)
  es = {} 
  batch.messages.each { |msg|
    begin
      record = @parser_proc.call(msg)
      tag = record[@record_tag_key]
      tag = @add_prefix + "." + tag if @add_prefix
      tag = tag + "." + @add_suffix if @add_suffix
      es[tag] ||= Fluent::MultiEventStream.new
      case @time_source
      when :kafka
        record_time = Fluent::EventTime.from_time(msg.create_time)
      when :now
        record_time = Fluent::Engine.now
      when :record
        if @time_format
          record_time = @time_parser.parse(record[@record_time_key].to_s)
        else
          record_time = record[@record_time_key]
        end
      else
        log.fatal "BUG: invalid time_source: #{@time_source}"
      end
      if @kafka_message_key
        record[@kafka_message_key] = msg.key
      end
      if @add_headers
        msg.headers.each_pair { |k, v|
          record[k] = v
        }
      end
      es[tag].add(record_time, record)
    rescue => e
      log.warn "parser error in #{batch.topic}/#{batch.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset
      log.debug_backtrace
    end
  }

  unless es.empty?
    es.each { |tag,es|
      emit_events(tag, es)
    }
  end
end
reconnect_consumer() click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 234
def reconnect_consumer
  log.warn "Stopping Consumer"
  consumer = @consumer
  @consumer = nil
  if consumer
    consumer.stop
  end
  log.warn "Could not connect to broker. retry_time:#{@retry_count}. Next retry will be in #{@retry_wait_seconds} seconds"
  @retry_count = @retry_count + 1
  sleep @retry_wait_seconds
  @consumer = setup_consumer
  log.warn "Re-starting consumer #{Time.now.to_s}"
  @retry_count = 0
rescue =>e
  log.error "unexpected error during re-starting consumer object access", :error => e.to_s
  log.error_backtrace
  if @retry_count <= @retry_limit or disable_retry_limit
    reconnect_consumer
  end
end
run() click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 345
def run
  while @consumer
    begin
      @consumer.each_batch(**@fetch_opts) { |batch|
        if @tag_source == :record
          process_batch_with_record_tag(batch)
        else
          process_batch(batch) 
        end
      }
    rescue ForShutdown
    rescue => e
      log.error "unexpected error during consuming events from kafka. Re-fetch events.", :error => e.to_s
      log.error_backtrace
      reconnect_consumer
    end
  end
rescue => e
  log.error "unexpected error during consumer object access", :error => e.to_s
  log.error_backtrace
end
setup_consumer() click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 219
def setup_consumer
  consumer = @kafka.consumer(**@consumer_opts)
  @topics.each { |topic|
    if m = /^\/(.+)\/$/.match(topic)
      topic_or_regex = Regexp.new(m[1])
      $log.info "Subscribe to topics matching the regex #{topic}"
    else
      topic_or_regex = topic
      $log.info "Subscribe to topic #{topic}"
    end
    consumer.subscribe(topic_or_regex, start_from_beginning: @start_from_beginning, max_bytes_per_partition: @max_bytes)
  }
  consumer
end
setup_parser(conf) click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 151
def setup_parser(conf)
  case @format
  when 'json'
    begin
      require 'oj'
      Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
      Proc.new { |msg| Oj.load(msg.value) }
    rescue LoadError
      require 'yajl'
      Proc.new { |msg| Yajl::Parser.parse(msg.value) }
    end
  when 'ltsv'
    require 'ltsv'
    Proc.new { |msg| LTSV.parse(msg.value, {:symbolize_keys => false}).first }
  when 'msgpack'
    require 'msgpack'
    Proc.new { |msg| MessagePack.unpack(msg.value) }
  when 'text'
    Proc.new { |msg| {@message_key => msg.value} }
  else
    @custom_parser = Fluent::Plugin.new_parser(conf['format'])
    @custom_parser.configure(conf)
    Proc.new { |msg|
      @custom_parser.parse(msg.value) {|_time, record|
        record
      }
    }
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka_group.rb, line 206
def shutdown
  # This nil assignment should be guarded by mutex in multithread programming manner.
  # But the situation is very low contention, so we don't use mutex for now.
  # If the problem happens, we will add a guard for consumer.
  consumer = @consumer
  @consumer = nil
  consumer.stop

  @thread.join
  @kafka.close
  super
end
start() click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 181
def start
  super

  logger = @get_kafka_client_log ? log : nil
  if @scram_mechanism != nil && @username != nil && @password != nil
    @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
                       ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                       ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
                       sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
  elsif @username != nil && @password != nil
    @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
                       ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                       ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password,
                       sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
  else
    @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
                       ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                       ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
                       ssl_verify_hostname: @ssl_verify_hostname)
  end

  @consumer = setup_consumer
  @thread = Thread.new(&method(:run))
end

Private Instance Methods

_config_to_array(config) click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 97
def _config_to_array(config)
  config_array = config.split(',').map {|k| k.strip }
  if config_array.empty?
    raise Fluent::ConfigError, "kafka_group: '#{config}' is a required parameter"
  end
  config_array
end