class Fluent::Plugin::Fluent::Plugin::Fluent::Kafka2Output

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka2.rb, line 91
def initialize
  super

  @kafka = nil
end

Public Instance Methods

close() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka2.rb, line 191
def close
  super
  @kafka.close if @kafka
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka2.rb, line 129
def configure(conf)
  super

  if @brokers.size > 0
    @seed_brokers = @brokers
    log.info "brokers has been set: #{@seed_brokers}"
  else
    raise Fluent::ConfigError, 'No brokers specified. Need one broker at least.'
  end

  formatter_conf = conf.elements('format').first
  unless formatter_conf
    raise Fluent::ConfigError, "<format> section is required."
  end
  unless formatter_conf["@type"]
    raise Fluent::ConfigError, "format/@type is required."
  end
  @formatter_proc = setup_formatter(formatter_conf)

  if @default_topic.nil?
    if @use_default_for_unknown_topic
      raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic is true"
    end
    if @chunk_keys.include?('topic') && !@chunk_key_tag
      log.warn "Use 'topic' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer topic,tag>"
    end
  else
    if @chunk_key_tag
      log.warn "default_topic is set. Fluentd's event tag is not used for topic"
    end
  end

  @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks, idempotent: @idempotent}
  @producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout
  @producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec
  if @active_support_notification_regex
    require 'active_support/notifications'
    require 'active_support/core_ext/hash/keys'
    ActiveSupport::Notifications.subscribe(Regexp.new(@active_support_notification_regex)) do |*args|
      event = ActiveSupport::Notifications::Event.new(*args)
      message = event.payload.respond_to?(:stringify_keys) ? event.payload.stringify_keys : event.payload
      @router.emit("fluent_kafka_stats.#{event.name}", Time.now.to_i, message)
    end
  end

  @topic_key_sym = @topic_key.to_sym

  @headers_from_record_accessors = {}
  @headers_from_record.each do |key, value|
    @headers_from_record_accessors[key] = record_accessor_create(value)
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_kafka2.rb, line 182
def multi_workers_ready?
  true
end
refresh_client(raise_error = true) click to toggle source
# File lib/fluent/plugin/out_kafka2.rb, line 97
def refresh_client(raise_error = true)
  begin
    logger = @get_kafka_client_log ? log : nil
    if @scram_mechanism != nil && @username != nil && @password != nil
      @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
                         ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
                         ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
                         sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname,
                         partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
    elsif @username != nil && @password != nil
      @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
                         ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
                         ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
                         ssl_verify_hostname: @ssl_verify_hostname,
                         partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
    else
      @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
                         ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
                         ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl,
                         ssl_verify_hostname: @ssl_verify_hostname,
                         partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
    end
    log.info "initialized kafka producer: #{@client_id}"
  rescue Exception => e
    if raise_error # During startup, error should be reported to engine and stop its phase for safety.
      raise e
    else
      log.error e
    end
  end
end
setup_formatter(conf) click to toggle source
# File lib/fluent/plugin/out_kafka2.rb, line 201
def setup_formatter(conf)
  type = conf['@type']
  case type
  when 'json'
    begin
      require 'oj'
      Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
      Proc.new { |tag, time, record| Oj.dump(record) }
    rescue LoadError
      require 'yajl'
      Proc.new { |tag, time, record| Yajl::Encoder.encode(record) }
    end
  when 'ltsv'
    require 'ltsv'
    Proc.new { |tag, time, record| LTSV.dump(record) }
  else
    @formatter = formatter_create(usage: 'kafka-plugin', conf: conf)
    @formatter.method(:format)
  end
end
start() click to toggle source
# File lib/fluent/plugin/out_kafka2.rb, line 186
def start
  super
  refresh_client
end
terminate() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka2.rb, line 196
def terminate
  super
  @kafka = nil
end
write(chunk) click to toggle source

TODO: optimize write performance

# File lib/fluent/plugin/out_kafka2.rb, line 223
def write(chunk)
  tag = chunk.metadata.tag
  topic = if @topic
            extract_placeholders(@topic, chunk)
          else
            (chunk.metadata.variables && chunk.metadata.variables[@topic_key_sym]) || @default_topic || tag
          end

  messages = 0
  record_buf = nil

  base_headers = @headers
  mutate_headers = !@headers_from_record_accessors.empty?

  begin
    producer = @kafka.topic_producer(topic, @producer_opts)
    chunk.msgpack_each { |time, record|
      begin
        record = inject_values_to_record(tag, time, record)
        record.delete(@topic_key) if @exclude_topic_key
        partition_key = (@exclude_partition_key ? record.delete(@partition_key_key) : record[@partition_key_key]) || @default_partition_key
        partition = (@exclude_partition ? record.delete(@partition_key) : record[@partition_key]) || @default_partition
        message_key = (@exclude_message_key ? record.delete(@message_key_key) : record[@message_key_key]) || @default_message_key

        if mutate_headers
          headers = base_headers.clone
          @headers_from_record_accessors.each do |key, header_accessor|
            headers[key] = header_accessor.call(record)
          end
        else
          headers = base_headers
        end

        record_buf = @formatter_proc.call(tag, time, record)
        record_buf_bytes = record_buf.bytesize
        if @max_send_limit_bytes && record_buf_bytes > @max_send_limit_bytes
          log.warn "record size exceeds max_send_limit_bytes. Skip event:", :time => time, :record => record
          next
        end
      rescue StandardError => e
        log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record
        next
      end

      log.trace { "message will send to #{topic} with partition_key: #{partition_key}, partition: #{partition}, message_key: #{message_key} and value: #{record_buf}." }
      messages += 1

      producer.produce(record_buf, key: message_key, partition_key: partition_key, partition: partition, headers: headers,
                       create_time: @use_event_time ? Time.at(time) : Time.now)
    }

    if messages > 0
      log.debug { "#{messages} messages send." }
      if @discard_kafka_delivery_failed
        begin
          producer.deliver_messages
        rescue Kafka::DeliveryFailed => e
          log.warn "DeliveryFailed occurred. Discard broken event:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
          producer.clear_buffer
        end
      else
        producer.deliver_messages
      end
    end
  rescue Kafka::UnknownTopicOrPartition
    if @use_default_for_unknown_topic && topic != @default_topic
      producer.shutdown if producer
      log.warn "'#{topic}' topic not found. Retry with '#{default_topic}' topic"
      topic = @default_topic
      retry
    end
    raise
  end
rescue Exception => e
  ignore = @ignore_exceptions.include?(e.class.name)

  log.warn "Send exception occurred: #{e}"
  log.warn "Exception Backtrace : #{e.backtrace.join("\n")}"
  log.warn "Exception ignored in tag : #{tag}" if ignore
  # For safety, refresh client and its producers
  refresh_client(false)
  # raise UnrecoverableError for backup ignored exception chunk
  raise Fluent::UnrecoverableError if ignore && exception_backup
  # Raise exception to retry sendind messages
  raise e unless ignore
ensure
  producer.shutdown if producer
end