class Fluent::Plugin::Fluent::Plugin::Fluent::Kafka2Output
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka2.rb, line 64 def initialize super @kafka = nil end
Public Instance Methods
close()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka2.rb, line 147 def close super @kafka.close if @kafka end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka2.rb, line 96 def configure(conf) super if @brokers.size > 0 log.info "brokers has been set: #{@brokers}" else raise Fluent::Config, 'No brokers specified. Need one broker at least.' end formatter_conf = conf.elements('format').first unless formatter_conf raise Fluent::ConfigError, "<format> section is required." end unless formatter_conf["@type"] raise Fluent::ConfigError, "format/@type is required." end @formatter_proc = setup_formatter(formatter_conf) if @default_topic.nil? if @chunk_keys.include?('topic') && !@chunk_keys.include?('tag') log.warn "Use 'topic' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer topic,tag>" end else if @chunk_keys.include?('tag') log.warn "default_topic is set. Fluentd's event tag is not used for topic" end end @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks} @producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout @producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec if @active_support_notification_regex require 'active_support/notifications' require 'active_support/core_ext/hash/keys' ActiveSupport::Notifications.subscribe(Regexp.new(@active_support_notification_regex)) do |*args| event = ActiveSupport::Notifications::Event.new(*args) message = event.payload.respond_to?(:stringify_keys) ? event.payload.stringify_keys : event.payload @router.emit("fluent_kafka_stats.#{event.name}", Time.now.to_i, message) end end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_kafka2.rb, line 138 def multi_workers_ready? true end
refresh_client(raise_error = true)
click to toggle source
# File lib/fluent/plugin/out_kafka2.rb, line 70 def refresh_client(raise_error = true) begin logger = @get_kafka_client_log ? log : nil if @scram_mechanism != nil && @username != nil && @password != nil @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism) elsif @username != nil && @password != nil @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), sasl_plain_username: @username, sasl_plain_password: @password) else @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab) end log.info "initialized kafka producer: #{@client_id}" rescue Exception => e if raise_error # During startup, error should be reported to engine and stop its phase for safety. raise e else log.error e end end end
setup_formatter(conf)
click to toggle source
# File lib/fluent/plugin/out_kafka2.rb, line 157 def setup_formatter(conf) type = conf['@type'] case type when 'json' begin require 'oj' Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS Proc.new { |tag, time, record| Oj.dump(record) } rescue LoadError require 'yajl' Proc.new { |tag, time, record| Yajl::Encoder.encode(record) } end when 'ltsv' require 'ltsv' Proc.new { |tag, time, record| LTSV.dump(record) } else @formatter = formatter_create(usage: 'kafka-plugin', conf: conf) @formatter.method(:format) end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka2.rb, line 142 def start super refresh_client end
terminate()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka2.rb, line 152 def terminate super @kafka = nil end
write(chunk)
click to toggle source
TODO: optimize write performance
# File lib/fluent/plugin/out_kafka2.rb, line 179 def write(chunk) tag = chunk.metadata.tag topic = chunk.metadata.variables[:topic] || @default_topic || tag producer = @kafka.topic_producer(topic, @producer_opts) messages = 0 record_buf = nil begin chunk.msgpack_each { |time, record| begin record = inject_values_to_record(tag, time, record) record.delete('topic'.freeze) if @exclude_topic_key partition_key = (@exclude_partition_key ? record.delete('partition_key'.freeze) : record['partition_key'.freeze]) || @default_partition_key partition = (@exclude_partition ? record.delete('partition'.freeze) : record['partition'.freeze]) || @default_partition message_key = (@exclude_message_key ? record.delete('message_key'.freeze) : record['message_key'.freeze]) || @default_message_key record_buf = @formatter_proc.call(tag, time, record) rescue StandardError => e log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record next end log.trace { "message will send to #{topic} with partition_key: #{partition_key}, partition: #{partition}, message_key: #{message_key} and value: #{record_buf}." } messages += 1 producer.produce(record_buf, message_key, partition, partition_key) } if messages > 0 log.debug { "#{messages} messages send." } producer.deliver_messages end end rescue Exception => e log.warn "Send exception occurred: #{e}" log.warn "Exception Backtrace : #{e.backtrace.join("\n")}" # For safety, refresh client and its producers refresh_client(false) # Raise exception to retry sendind messages raise e ensure producer.shutdown if producer end