class Fluent::KafkaGroupInput
Constants
- BufferError
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka_group.rb, line 71 def initialize super require 'kafka' @time_parser = nil @retry_count = 1 end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka_group.rb, line 93 def configure(conf) super $log.info "Will watch for topics #{@topics} at brokers " \ "#{@brokers} and '#{@consumer_group}' group" @topics = _config_to_array(@topics) if conf['max_wait_ms'] log.warn "'max_wait_ms' parameter is deprecated. Use second unit 'max_wait_time' instead" @max_wait_time = conf['max_wait_ms'].to_i / 1000 end @parser_proc = setup_parser @consumer_opts = {:group_id => @consumer_group} @consumer_opts[:session_timeout] = @session_timeout if @session_timeout @consumer_opts[:offset_commit_interval] = @offset_commit_interval if @offset_commit_interval @consumer_opts[:offset_commit_threshold] = @offset_commit_threshold if @offset_commit_threshold @consumer_opts[:fetcher_max_queue_size] = @fetcher_max_queue_size if @fetcher_max_queue_size @fetch_opts = {} @fetch_opts[:max_wait_time] = @max_wait_time if @max_wait_time @fetch_opts[:min_bytes] = @min_bytes if @min_bytes if @use_record_time and @time_format if defined?(Fluent::TimeParser) @time_parser = Fluent::TimeParser.new(@time_format) else @time_parser = Fluent::TextParser::TimeParser.new(@time_format) end end end
emit_events(tag, es)
click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 260 def emit_events(tag, es) retries = 0 begin router.emit_stream(tag, es) rescue BufferError raise ForShutdown if @consumer.nil? if @retry_emit_limit.nil? sleep 1 retry end if retries < @retry_emit_limit retries += 1 sleep 1 retry else raise RuntimeError, "Exceeds retry_emit_limit" end end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 87 def multi_workers_ready? true end
reconnect_consumer()
click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 192 def reconnect_consumer log.warn "Stopping Consumer" consumer = @consumer @consumer = nil if consumer consumer.stop end log.warn "Could not connect to broker. retry_time:#{@retry_count}. Next retry will be in #{@retry_wait_seconds} seconds" @retry_count = @retry_count + 1 sleep @retry_wait_seconds @consumer = setup_consumer log.warn "Re-starting consumer #{Time.now.to_s}" @retry_count = 0 rescue =>e log.error "unexpected error during re-starting consumer object access", :error => e.to_s log.error_backtrace if @retry_count <= @retry_limit or disable_retry_limit reconnect_consumer end end
run()
click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 213 def run while @consumer begin @consumer.each_batch(@fetch_opts) { |batch| es = Fluent::MultiEventStream.new tag = batch.topic tag = @add_prefix + "." + tag if @add_prefix tag = tag + "." + @add_suffix if @add_suffix batch.messages.each { |msg| begin record = @parser_proc.call(msg) if @use_record_time if @time_format record_time = @time_parser.parse(record['time']) else record_time = record['time'] end else record_time = Fluent::Engine.now end if @kafka_message_key record[@kafka_message_key] = msg.key end es.add(record_time, record) rescue => e log.warn "parser error in #{batch.topic}/#{batch.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset log.debug_backtrace end } unless es.empty? emit_events(tag, es) end } rescue ForShutdown rescue => e log.error "unexpected error during consuming events from kafka. Re-fetch events.", :error => e.to_s log.error_backtrace reconnect_consumer end end rescue => e log.error "unexpected error during consumer object access", :error => e.to_s log.error_backtrace end
setup_consumer()
click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 184 def setup_consumer consumer = @kafka.consumer(@consumer_opts) @topics.each { |topic| consumer.subscribe(topic, start_from_beginning: @start_from_beginning, max_bytes_per_partition: @max_bytes) } consumer end
setup_parser()
click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 127 def setup_parser case @format when 'json' begin require 'oj' Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS Proc.new { |msg| Oj.load(msg.value) } rescue LoadError require 'yajl' Proc.new { |msg| Yajl::Parser.parse(msg.value) } end when 'ltsv' require 'ltsv' Proc.new { |msg| LTSV.parse(msg.value, {:symbolize_keys => false}).first } when 'msgpack' require 'msgpack' Proc.new { |msg| MessagePack.unpack(msg.value) } when 'text' Proc.new { |msg| {@message_key => msg.value} } end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka_group.rb, line 171 def shutdown # This nil assignment should be guarded by mutex in multithread programming manner. # But the situation is very low contention, so we don't use mutex for now. # If the problem happens, we will add a guard for consumer. consumer = @consumer @consumer = nil consumer.stop @thread.join @kafka.close super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_kafka_group.rb, line 149 def start super if @scram_mechanism != nil && @username != nil && @password != nil @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: log, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism) elsif @username != nil && @password != nil @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: log, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password) else @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: log, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab) end @consumer = setup_consumer @thread = Thread.new(&method(:run)) end
Private Instance Methods
_config_to_array(config)
click to toggle source
# File lib/fluent/plugin/in_kafka_group.rb, line 79 def _config_to_array(config) config_array = config.split(',').map {|k| k.strip } if config_array.empty? raise Fluent::ConfigError, "kafka_group: '#{config}' is a required parameter" end config_array end