class Fluent::KafkaOutBuffered
encode: utf-8
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/kafka_out_buffered.rb, line 36 def initialize super require 'kafka' require "active_support/notifications" require 'Yajl' end
Public Instance Methods
build_producer()
click to toggle source
# File lib/fluent/plugin/kafka_out_buffered.rb, line 62 def build_producer() @kafka = Kafka.new( seed_brokers: @brokers.split(','), ssl_ca_cert: @ca_cert, ssl_client_cert: @client_cert, ssl_client_cert_key: @client_cert_key ) if @producer_type == 'sync' @producer = @kafka.producer( required_acks: @required_acks, ack_timeout: @ack_timeout, compression_codec: @compression_codec, compression_threshold: @compression_threshold, max_retries: @max_retries, retry_backoff: @retry_backoff, max_buffer_size: @max_buffer_size ) elsif @producer_type == 'async' @producer = @kafka.async_producer( max_queue_size: @max_queue_size, delivery_threshold: @delivery_threshold, delivery_interval: @delivery_interval ) else raise Fluent::ConfigError, "Producer type parameter, #{@producer_type}, is invalid" $log.info "produer type is #{@producer_type}" end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/kafka_out_buffered.rb, line 43 def configure(conf) super if @encryption raise Fluent::ConfigError, "CA cert file is not found or invalid" unless File.readable?(@ca_cert_path) @ca_cert = File.read(@ca_cert_path) $log.info "ca_cert is valid" end if @authentication raise Fluent::ConfigError, "Client cert file is not found or invalid" unless File.readable?(@client_cert_path) raise Fluent::ConfigError, "Client cert key is not found or invalid" unless File.readable?(@client_cert_key_path) @client_cert = File.read(@client_cert_path) @client_cert_key = File.read(@client_cert_key_path) $log.info "client cert and client cert key is valid" end end
encode(record)
click to toggle source
# File lib/fluent/plugin/kafka_out_buffered.rb, line 108 def encode(record) if @output_data_type == 'msgpack' record.to_msgpack elsif @output_data_type == 'json' Yajl::Encoder.encode(record) elsif none record end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/kafka_out_buffered.rb, line 104 def format(tag, time, record) [tag, time, record].to_msgpack end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/kafka_out_buffered.rb, line 99 def shutdown super @producer.shutdown end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/kafka_out_buffered.rb, line 94 def start super build_producer() end
write(chunk)
click to toggle source
# File lib/fluent/plugin/kafka_out_buffered.rb, line 118 def write(chunk) chunk.msgpack_each { |(tag, time, record)| record['tag'] = tag if @output_include_tag record['time'] = time if @output_include_time encoded_record=encode(record) @producer.produce( encoded_record, topic: @topic, partition_key: @partition_key ) @producer.deliver_messages } end