class Fluent::KafkaOutputBuffered

Attributes

field_separator[RW]
output_data_type[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka_buffered.rb, line 101
def initialize
  super

  require 'kafka'
  require 'fluent/plugin/kafka_producer_ext'

  @kafka = nil
  @producers = {}
  @producers_mutex = Mutex.new
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka_buffered.rb, line 164
def configure(conf)
  super

  log.warn "Support of fluentd v0.12 has ended. Use kafka2 instead. kafka_buffered will be an alias of kafka2"

  if @zookeeper
    require 'zookeeper'
  else
    @seed_brokers = @brokers.split(",")
    log.info "brokers has been set directly: #{@seed_brokers}"
  end

  if conf['ack_timeout_ms']
    log.warn "'ack_timeout_ms' parameter is deprecated. Use second unit 'ack_timeout' instead"
    @ack_timeout = conf['ack_timeout_ms'].to_i / 1000
  end

  @f_separator = case @field_separator
                 when /SPACE/i then ' '
                 when /COMMA/i then ','
                 when /SOH/i then "\x01"
                 else "\t"
                 end

  @formatter_proc = setup_formatter(conf)

  @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks, idempotent: @idempotent}
  @producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout
  @producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec

  if @discard_kafka_delivery_failed
    log.warn "'discard_kafka_delivery_failed' option discards events which cause delivery failure, e.g. invalid topic or something."
    log.warn "If this is unexpected, you need to check your configuration or data."
  end

  if @active_support_notification_regex
    require 'active_support/notifications'
    require 'active_support/core_ext/hash/keys'
    ActiveSupport::Notifications.subscribe(Regexp.new(@active_support_notification_regex)) do |*args|
      event = ActiveSupport::Notifications::Event.new(*args)
      message = event.payload.respond_to?(:stringify_keys) ? event.payload.stringify_keys : event.payload
      @router.emit("fluent_kafka_stats.#{event.name}", Time.now.to_i, message)
    end
  end

  @monitoring_list.each { |m|
    require "kafka/#{m}"
    log.info "#{m} monitoring started"
  }
end
deliver_messages(producer, tag) click to toggle source
# File lib/fluent/plugin/out_kafka_buffered.rb, line 286
def deliver_messages(producer, tag)
  if @discard_kafka_delivery_failed
    begin
      producer.deliver_messages
    rescue Kafka::DeliveryFailed => e
      log.warn "DeliveryFailed occurred. Discard broken event:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
      producer.clear_buffer
    end
  else
    producer.deliver_messages
  end
end
emit(tag, es, chain) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka_buffered.rb, line 226
def emit(tag, es, chain)
  super(tag, es, chain, tag)
end
format_stream(tag, es) click to toggle source
# File lib/fluent/plugin/out_kafka_buffered.rb, line 230
def format_stream(tag, es)
  es.to_msgpack_stream
end
get_producer() click to toggle source
# File lib/fluent/plugin/out_kafka_buffered.rb, line 243
def get_producer
  @producers_mutex.synchronize {
    producer = @producers[Thread.current.object_id]
    unless producer
      producer = @kafka.producer(**@producer_opts)
      @producers[Thread.current.object_id] = producer
    end
    producer
  }
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_kafka_buffered.rb, line 112
def multi_workers_ready?
  true
end
refresh_client(raise_error = true) click to toggle source
# File lib/fluent/plugin/out_kafka_buffered.rb, line 116
def refresh_client(raise_error = true)
  if @zookeeper
    @seed_brokers = []
    z = Zookeeper.new(@zookeeper)
    z.get_children(:path => @zookeeper_path)[:children].each do |id|
      broker = Yajl.load(z.get(:path => @zookeeper_path + "/#{id}")[:data])
      if @ssl_client_cert
        @seed_brokers.push(pickup_ssl_endpoint(broker))
      else
        @seed_brokers.push("#{broker['host']}:#{broker['port']}")
      end
    end
    z.close
    log.info "brokers has been refreshed via Zookeeper: #{@seed_brokers}"
  end
  begin
    if @seed_brokers.length > 0
      logger = @get_kafka_client_log ? log : nil
      if @scram_mechanism != nil && @username != nil && @password != nil
        @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
                           ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
                           sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl,
                           ssl_verify_hostname: @ssl_verify_hostname,
                           partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
      elsif @username != nil && @password != nil
        @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
                           ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
                           sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname,
                           partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
      else
        @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
                           ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
                           sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl,  ssl_verify_hostname: @ssl_verify_hostname,
                           partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
      end
      log.info "initialized kafka producer: #{@client_id}"
    else
      log.warn "No brokers found on Zookeeper"
    end
  rescue Exception => e
    if raise_error # During startup, error should be reported to engine and stop its phase for safety.
      raise e
    else
      log.error e
    end
  end
end
setup_formatter(conf) click to toggle source
# File lib/fluent/plugin/out_kafka_buffered.rb, line 254
def setup_formatter(conf)
  if @output_data_type == 'json'
    begin
      require 'oj'
      Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
      Proc.new { |tag, time, record| Oj.dump(record) }
    rescue LoadError
      require 'yajl'
      Proc.new { |tag, time, record| Yajl::Encoder.encode(record) }
    end
  elsif @output_data_type == 'ltsv'
    require 'ltsv'
    Proc.new { |tag, time, record| LTSV.dump(record) }
  elsif @output_data_type == 'msgpack'
    require 'msgpack'
    Proc.new { |tag, time, record| record.to_msgpack }
  elsif @output_data_type =~ /^attr:(.*)$/
    @custom_attributes = $1.split(',').map(&:strip).reject(&:empty?)
    @custom_attributes.unshift('time') if @output_include_time
    @custom_attributes.unshift('tag') if @output_include_tag
    Proc.new { |tag, time, record|
      @custom_attributes.map { |attr|
        record[attr].nil? ? '' : record[attr].to_s
      }.join(@f_separator)
    }
  else
    @formatter = Fluent::Plugin.new_formatter(@output_data_type)
    @formatter.configure(conf)
    @formatter.method(:format)
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka_buffered.rb, line 220
def shutdown
  super
  shutdown_producers
  @kafka = nil
end
shutdown_producers() click to toggle source
# File lib/fluent/plugin/out_kafka_buffered.rb, line 234
def shutdown_producers
  @producers_mutex.synchronize {
    @producers.each { |key, producer|
      producer.shutdown
    }
    @producers = {}
  }
end
start() click to toggle source
# File lib/fluent/plugin/out_kafka_buffered.rb, line 215
def start
  super
  refresh_client
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_kafka_buffered.rb, line 299
def write(chunk)
  tag = chunk.key
  def_topic = @default_topic || tag
  producer = get_producer

  records_by_topic = {}
  bytes_by_topic = {}
  messages = 0
  messages_bytes = 0
  record_buf = nil
  record_buf_bytes = nil

  begin
    chunk.msgpack_each { |time, record|
      begin
        if @output_include_time
          if @time_format
            record['time'.freeze] = Time.at(time).strftime(@time_format)
          else
            record['time'.freeze] = time
          end
        end

        record['tag'] = tag if @output_include_tag
        topic = (@exclude_topic_key ? record.delete(@topic_key) : record[@topic_key]) || def_topic
        partition_key = (@exclude_partition_key ? record.delete(@partition_key_key) : record[@partition_key_key]) || @default_partition_key
        partition = (@exclude_partition ? record.delete(@partition_key) : record[@partition_key]) || @default_partition
        message_key = (@exclude_message_key ? record.delete(@message_key_key) : record[@message_key_key]) || @default_message_key

        records_by_topic[topic] ||= 0
        bytes_by_topic[topic] ||= 0

        record_buf = @formatter_proc.call(tag, time, record)
        record_buf_bytes = record_buf.bytesize
        if @max_send_limit_bytes && record_buf_bytes > @max_send_limit_bytes
          log.warn "record size exceeds max_send_limit_bytes. Skip event:", :time => time, :record => record
          next
        end
      rescue StandardError => e
        log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record
        next
      end

      if (messages > 0) and (messages_bytes + record_buf_bytes > @kafka_agg_max_bytes) or (@kafka_agg_max_messages && messages >= @kafka_agg_max_messages)
        log.debug { "#{messages} messages send because reaches the limit of batch transmission." }
        deliver_messages(producer, tag)
        messages = 0
        messages_bytes = 0
      end
      log.trace { "message will send to #{topic} with partition_key: #{partition_key}, partition: #{partition}, message_key: #{message_key} and value: #{record_buf}." }
      messages += 1
      producer.produce_for_buffered(record_buf, topic: topic, key: message_key, partition_key: partition_key, partition: partition,
                                    create_time: @use_event_time ? Time.at(time) : Time.now)
      messages_bytes += record_buf_bytes

      records_by_topic[topic] += 1
      bytes_by_topic[topic] += record_buf_bytes
    }
    if messages > 0
      log.debug { "#{messages} messages send." }
      deliver_messages(producer, tag)
    end
    log.debug { "(records|bytes) (#{records_by_topic}|#{bytes_by_topic})" }
  end
rescue Exception => e
  log.warn "Send exception occurred: #{e}"
  log.warn "Exception Backtrace : #{e.backtrace.join("\n")}"
  # For safety, refresh client and its producers
  shutdown_producers
  refresh_client(false)
  # Raise exception to retry sendind messages
  raise e
end