class Fluent::KafkaOutputBuffered2

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rdkafka.rb, line 84
def initialize
  super
  @producers = {}
  @producers_mutex = Mutex.new
end

Public Instance Methods

add(level, &block) click to toggle source
# File lib/fluent/plugin/out_rdkafka.rb, line 93
def add(level, &block)
  if block
    self.info(block.call)
  end
end
build_config() click to toggle source
# File lib/fluent/plugin/out_rdkafka.rb, line 105
def build_config
  config = {
    :"bootstrap.servers" => @brokers,
  }

  if @ssl_ca_cert && @ssl_ca_cert[0]
    ssl = true
    config[:"ssl.ca.location"] = @ssl_ca_cert[0]
    config[:"ssl.certificate.location"] = @ssl_client_cert if @ssl_client_cert
    config[:"ssl.key.location"] = @ssl_client_cert_key if @ssl_client_cert_key
    config[:"ssl.key.password"] = @ssl_client_cert_key_password if @ssl_client_cert_key_password
  end

  if @principal
    sasl = true
    config[:"sasl.mechanisms"] = "GSSAPI"
    config[:"sasl.kerberos.principal"] = @principal
    config[:"sasl.kerberos.service.name"] = @service_name if @service_name
    config[:"sasl.kerberos.keytab"] = @keytab if @keytab
  end

  if ssl && sasl
    security_protocol = "SASL_SSL"
  elsif ssl && !sasl
    security_protocol = "SSL"
  elsif !ssl && sasl
    security_protocol = "SASL_PLAINTEXT"
  else
    security_protocol = "PLAINTEXT"
  end
  config[:"security.protocol"] = security_protocol

  config[:"compression.codec"] = @compression_codec if @compression_codec
  config[:"message.send.max.retries"] = @max_send_retries if @max_send_retries
  config[:"request.required.acks"] = @required_acks if @required_acks
  config[:"request.timeout.ms"] = @ack_timeout * 1000 if @ack_timeout
  config[:"queue.buffering.max.ms"] = @rdkafka_buffering_max_ms if @rdkafka_buffering_max_ms
  config[:"queue.buffering.max.messages"] = @rdkafka_buffering_max_messages if @rdkafka_buffering_max_messages
  config[:"message.max.bytes"] = @rdkafka_message_max_bytes if @rdkafka_message_max_bytes
  config[:"batch.num.messages"] = @rdkafka_message_max_num if @rdkafka_message_max_num

  @rdkafka_options.each { |k, v|
    config[k.to_sym] = v
  }

  config
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rdkafka.rb, line 90
def configure(conf)
  super
  log.instance_eval {
    def add(level, &block)
      if block
        self.info(block.call)
      end
    end
  }
  Rdkafka::Config.logger = log
  config = build_config
  @rdkafka = Rdkafka::Config.new(config)
  @formatter_proc = setup_formatter(conf)
end
emit(tag, es, chain) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rdkafka.rb, line 193
def emit(tag, es, chain)
  super(tag, es, chain, tag)
end
enqueue_with_retry(producer, topic, record_buf, message_key, partition) click to toggle source
# File lib/fluent/plugin/out_rdkafka.rb, line 280
def enqueue_with_retry(producer, topic, record_buf, message_key, partition)
  attempt = 0
  loop do
    begin
      handler = producer.produce(topic: topic, payload: record_buf, key: message_key, partition: partition)
      return handler
    rescue Exception => e
      if e.code == :queue_full
        if attempt <= @max_enqueue_retries
          log.warn "Failed to enqueue message; attempting retry #{attempt} of #{@max_enqueue_retries} after #{@enqueue_retry_backoff}s"
          sleep @enqueue_retry_backoff
          attempt += 1
        else
          raise "Failed to enqueue message although tried retry #{@max_enqueue_retries} times"
        end
      else
        raise e
      end
    end
  end
end
format_stream(tag, es) click to toggle source
# File lib/fluent/plugin/out_rdkafka.rb, line 197
def format_stream(tag, es)
  es.to_msgpack_stream
end
get_producer() click to toggle source
# File lib/fluent/plugin/out_rdkafka.rb, line 182
def get_producer
  @producers_mutex.synchronize {
    producer = @producers[Thread.current.object_id]
    unless producer
      producer = @rdkafka.producer
      @producers[Thread.current.object_id] = producer
    end
    producer
  }
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_rdkafka.rb, line 157
def multi_workers_ready?
  true
end
setup_formatter(conf) click to toggle source
# File lib/fluent/plugin/out_rdkafka.rb, line 201
def setup_formatter(conf)
  if @output_data_type == 'json'
    begin
      require 'oj'
      Oj.default_options = Fluent::DEFAULT_OJ_OPTIONS
      Proc.new { |tag, time, record| Oj.dump(record) }
    rescue LoadError
      require 'yajl'
      Proc.new { |tag, time, record| Yajl::Encoder.encode(record) }
    end
  elsif @output_data_type == 'ltsv'
    require 'ltsv'
    Proc.new { |tag, time, record| LTSV.dump(record) }
  elsif @output_data_type == 'msgpack'
    require 'msgpack'
    Proc.new { |tag, time, record| record.to_msgpack }
  elsif @output_data_type =~ /^attr:(.*)$/
    @custom_attributes = $1.split(',').map(&:strip).reject(&:empty?)
    @custom_attributes.unshift('time') if @output_include_time
    @custom_attributes.unshift('tag') if @output_include_tag
    Proc.new { |tag, time, record|
      @custom_attributes.map { |attr|
        record[attr].nil? ? '' : record[attr].to_s
      }.join(@f_separator)
    }
  else
    @formatter = Fluent::Plugin.new_formatter(@output_data_type)
    @formatter.configure(conf)
    @formatter.method(:format)
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rdkafka.rb, line 161
def shutdown
  super
  shutdown_producers
end
shutdown_producers() click to toggle source
# File lib/fluent/plugin/out_rdkafka.rb, line 166
def shutdown_producers
  @producers_mutex.synchronize {
    shutdown_threads = @producers.map { |key, producer|
      th = Thread.new {
        unless producer.close(10)
          log.warn("Queue is forcefully closed after 10 seconds wait")
        end
      }
      th.abort_on_exception = true
      th
    }
    shutdown_threads.each { |th| th.join }
    @producers = {}
  }
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rdkafka.rb, line 153
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_rdkafka.rb, line 233
def write(chunk)
  tag = chunk.key
  def_topic = @default_topic || tag

  record_buf = nil
  record_buf_bytes = nil

  begin
    chunk.msgpack_each.map { |time, record|
      begin
        if @output_include_time
          if @time_format
            record['time'.freeze] = Time.at(time).strftime(@time_format)
          else
            record['time'.freeze] = time
          end
        end

        record['tag'] = tag if @output_include_tag
        topic = (@exclude_topic_key ? record.delete('topic'.freeze) : record['topic'.freeze]) || def_topic
        partition = (@exclude_partition ? record.delete('partition'.freeze) : record['partition'.freeze]) || @default_partition
        message_key = (@exclude_message_key ? record.delete('message_key'.freeze) : record['message_key'.freeze]) || @default_message_key

        record_buf = @formatter_proc.call(tag, time, record)
        record_buf_bytes = record_buf.bytesize
        if @max_send_limit_bytes && record_buf_bytes > @max_send_limit_bytes
          log.warn "record size exceeds max_send_limit_bytes. Skip event:", :time => time, :record => record
          next
        end
      rescue StandardError => e
        log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record
        next
      end

      producer = get_producer
      handler = enqueue_with_retry(producer, topic, record_buf, message_key, partition)
      handler
    }.each { |handler|
      handler.wait(@rdkafka_delivery_handle_poll_timeout) if @rdkafka_delivery_handle_poll_timeout != 0
    }
  end
rescue Exception => e
  log.warn "Send exception occurred: #{e} at #{e.backtrace.first}"
  # Raise exception to retry sendind messages
  raise e
end