class Fluent::Plugin::RdKafkaGroupInput
Constants
- BufferError
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rdkafka_group.rb, line 65 def initialize super @time_parser = nil @retry_count = 1 end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rdkafka_group.rb, line 86 def configure(conf) compat_parameters_convert(conf, :parser) super log.warn "The in_rdkafka_group consumer was not yet tested under heavy production load. Use it at your own risk!" log.info "Will watch for topics #{@topics} at brokers " \ "#{@kafka_configs["bootstrap.servers"]} and '#{@kafka_configs["group.id"]}' group" @topics = _config_to_array(@topics) parser_conf = conf.elements('parse').first unless parser_conf raise Fluent::ConfigError, "<parse> section or format parameter is required." end unless parser_conf["@type"] raise Fluent::ConfigError, "parse/@type is required." end @parser_proc = setup_parser(parser_conf) @time_source = :record if @use_record_time if @time_source == :record and @time_format @time_parser = Fluent::TimeParser.new(@time_format) end end
each_batch() { |batch| ... }
click to toggle source
Executes the passed codeblock on a batch of messages. It is guaranteed that every message in a given batch belongs to the same topic, because the tagging logic in :run expects that property. The number of maximum messages in a batch is capped by the :max_batch_size configuration value. It ensures that consuming from a single topic for a long time (e.g. with `auto.offset.reset` set to `earliest`) does not lead to memory exhaustion. Also, calling consumer.poll advances thes consumer offset, so in case the process crashes we might lose at most :max_batch_size messages.
# File lib/fluent/plugin/in_rdkafka_group.rb, line 205 def each_batch(&block) batch = nil message = nil while @consumer message = @consumer.poll(@max_wait_time_ms) if message if not batch batch = Batch.new(message.topic) elsif batch.topic != message.topic || batch.messages.size >= @max_batch_size yield batch batch = Batch.new(message.topic) end batch.messages << message else yield batch if batch batch = nil end end yield batch if batch end
emit_events(tag, es)
click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 284 def emit_events(tag, es) retries = 0 begin router.emit_stream(tag, es) rescue BufferError raise ForShutdown if @consumer.nil? if @retry_emit_limit.nil? sleep 1 retry end if retries < @retry_emit_limit retries += 1 sleep 1 retry else raise RuntimeError, "Exceeds retry_emit_limit" end end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 80 def multi_workers_ready? true end
reconnect_consumer()
click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 169 def reconnect_consumer log.warn "Stopping Consumer" consumer = @consumer @consumer = nil if consumer consumer.close end log.warn "Could not connect to broker. retry_time:#{@retry_count}. Next retry will be in #{@retry_wait_seconds} seconds" @retry_count = @retry_count + 1 sleep @retry_wait_seconds @consumer = setup_consumer log.warn "Re-starting consumer #{Time.now.to_s}" @retry_count = 0 rescue =>e log.error "unexpected error during re-starting consumer object access", :error => e.to_s log.error_backtrace if @retry_count <= @retry_limit or disable_retry_limit reconnect_consumer end end
run()
click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 226 def run while @consumer begin each_batch { |batch| log.debug "A new batch for topic #{batch.topic} with #{batch.messages.size} messages" es = Fluent::MultiEventStream.new tag = batch.topic tag = @add_prefix + "." + tag if @add_prefix tag = tag + "." + @add_suffix if @add_suffix batch.messages.each { |msg| begin record = @parser_proc.call(msg) case @time_source when :kafka record_time = Fluent::EventTime.from_time(msg.timestamp) when :now record_time = Fluent::Engine.now when :record if @time_format record_time = @time_parser.parse(record[@record_time_key].to_s) else record_time = record[@record_time_key] end else log.fatal "BUG: invalid time_source: #{@time_source}" end if @kafka_message_key record[@kafka_message_key] = msg.key end if @add_headers msg.headers.each_pair { |k, v| record[k] = v } end es.add(record_time, record) rescue => e log.warn "parser error in #{msg.topic}/#{msg.partition}", :error => e.to_s, :value => msg.payload, :offset => msg.offset log.debug_backtrace end } unless es.empty? emit_events(tag, es) end } rescue ForShutdown rescue => e log.error "unexpected error during consuming events from kafka. Re-fetch events.", :error => e.to_s log.error_backtrace reconnect_consumer end end rescue => e log.error "unexpected error during consumer object access", :error => e.to_s log.error_backtrace end
setup_consumer()
click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 163 def setup_consumer consumer = Rdkafka::Config.new(@kafka_configs).consumer consumer.subscribe(*@topics) consumer end
setup_parser(parser_conf)
click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 114 def setup_parser(parser_conf) format = parser_conf["@type"] case format when 'json' begin require 'oj' Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS Proc.new { |msg| Oj.load(msg.payload) } rescue LoadError require 'yajl' Proc.new { |msg| Yajl::Parser.parse(msg.payload) } end when 'ltsv' require 'ltsv' Proc.new { |msg| LTSV.parse(msg.payload, {:symbolize_keys => false}).first } when 'msgpack' require 'msgpack' Proc.new { |msg| MessagePack.unpack(msg.payload) } when 'text' Proc.new { |msg| {@message_key => msg.payload} } else @custom_parser = parser_create(usage: 'in-rdkafka-plugin', conf: parser_conf) Proc.new { |msg| @custom_parser.parse(msg.payload) {|_time, record| record } } end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_rdkafka_group.rb, line 152 def shutdown # This nil assignment should be guarded by mutex in multithread programming manner. # But the situation is very low contention, so we don't use mutex for now. # If the problem happens, we will add a guard for consumer. consumer = @consumer @consumer = nil consumer.close super end
start()
click to toggle source
Calls superclass method
Fluent::KafkaPluginUtil::SSLSettings#start
# File lib/fluent/plugin/in_rdkafka_group.rb, line 144 def start super @consumer = setup_consumer thread_create(:in_rdkafka_group, &method(:run)) end
Private Instance Methods
_config_to_array(config)
click to toggle source
# File lib/fluent/plugin/in_rdkafka_group.rb, line 72 def _config_to_array(config) config_array = config.split(',').map {|k| k.strip } if config_array.empty? raise Fluent::ConfigError, "kafka_group: '#{config}' is a required parameter" end config_array end