class Fluent::KafkaOutBuffered

encode: utf-8

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/kafka_out_buffered.rb, line 36
def initialize
  super
  require 'kafka'
  require "active_support/notifications"
  require 'Yajl'
end

Public Instance Methods

build_producer() click to toggle source
# File lib/fluent/plugin/kafka_out_buffered.rb, line 62
def  build_producer()
  @kafka = Kafka.new(
    seed_brokers: @brokers.split(','),
    ssl_ca_cert:  @ca_cert,
    ssl_client_cert: @client_cert,
    ssl_client_cert_key: @client_cert_key
  )

  if @producer_type == 'sync'
    @producer = @kafka.producer(
      required_acks: @required_acks,
      ack_timeout: @ack_timeout,
      compression_codec: @compression_codec,
      compression_threshold: @compression_threshold,
      max_retries: @max_retries,
      retry_backoff: @retry_backoff,
      max_buffer_size: @max_buffer_size
    )

  elsif @producer_type == 'async'
    @producer = @kafka.async_producer(
      max_queue_size: @max_queue_size,
      delivery_threshold: @delivery_threshold,
      delivery_interval: @delivery_interval
    )
  else
    raise Fluent::ConfigError, "Producer type parameter, #{@producer_type}, is invalid"

  $log.info "produer type is #{@producer_type}"
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/kafka_out_buffered.rb, line 43
def configure(conf)
  super

  if @encryption
    raise Fluent::ConfigError, "CA cert file is not found or invalid" unless File.readable?(@ca_cert_path)
    @ca_cert = File.read(@ca_cert_path)
    $log.info "ca_cert is valid"
  end

  if @authentication
    raise Fluent::ConfigError, "Client cert file is not found or invalid" unless File.readable?(@client_cert_path)
    raise Fluent::ConfigError, "Client cert key is not found or invalid" unless File.readable?(@client_cert_key_path)
    @client_cert = File.read(@client_cert_path)
    @client_cert_key = File.read(@client_cert_key_path)
    $log.info "client cert and client cert key is valid"
  end

end
encode(record) click to toggle source
# File lib/fluent/plugin/kafka_out_buffered.rb, line 108
def encode(record)
  if @output_data_type == 'msgpack'
    record.to_msgpack
  elsif @output_data_type == 'json'
    Yajl::Encoder.encode(record)
  elsif none
    record
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/kafka_out_buffered.rb, line 104
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/kafka_out_buffered.rb, line 99
def shutdown
  super
  @producer.shutdown
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/kafka_out_buffered.rb, line 94
def start
  super
  build_producer()
end
write(chunk) click to toggle source
# File lib/fluent/plugin/kafka_out_buffered.rb, line 118
def write(chunk)

  chunk.msgpack_each { |(tag, time, record)|

    record['tag'] = tag if @output_include_tag
    record['time'] = time if @output_include_time
    encoded_record=encode(record)

    @producer.produce(
      encoded_record,
      topic: @topic,
      partition_key: @partition_key
    )

    @producer.deliver_messages
  }

end