class Fluent::KafkaOutput

Attributes

field_separator[RW]
output_data_type[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka.rb, line 79
def initialize
  super

  require 'kafka'

  @kafka = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka.rb, line 122
def configure(conf)
  super

  if @zookeeper
    require 'zookeeper'
  else
    @seed_brokers = @brokers.match(",").nil? ? [@brokers] : @brokers.split(",")
    log.info "brokers has been set directly: #{@seed_brokers}"
  end

  if conf['ack_timeout_ms']
    log.warn "'ack_timeout_ms' parameter is deprecated. Use second unit 'ack_timeout' instead"
    @ack_timeout = conf['ack_timeout_ms'].to_i / 1000
  end

  @f_separator = case @field_separator
                 when /SPACE/i then ' '
                 when /COMMA/i then ','
                 when /SOH/i then "\x01"
                 else "\t"
                 end

  @formatter_proc = setup_formatter(conf)

  @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks}
  @producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout
  @producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec
  @producer_opts[:max_buffer_size] = @max_buffer_size if @max_buffer_size
  @producer_opts[:max_buffer_bytesize] = @max_buffer_bytesize if @max_buffer_bytesize
  if @active_support_notification_regex
    require 'active_support/notifications'
    require 'active_support/core_ext/hash/keys'
    ActiveSupport::Notifications.subscribe(Regexp.new(@active_support_notification_regex)) do |*args|
      event = ActiveSupport::Notifications::Event.new(*args)
      message = event.payload.respond_to?(:stringify_keys) ? event.payload.stringify_keys : event.payload
      @router.emit("fluent_kafka_stats.#{event.name}", Time.now.to_i, message)
    end
  end
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_kafka.rb, line 198
def emit(tag, es, chain)
  begin
    chain.next

    # out_kafka is mainly for testing so don't need the performance unlike out_kafka_buffered.
    producer = @kafka.producer(@producer_opts)

    es.each do |time, record|
      if @output_include_time
        if @time_format
          record['time'] = Time.at(time).strftime(@time_format)
        else
          record['time'] = time
        end
      end
      record['tag'] = tag if @output_include_tag
      topic = (@exclude_topic_key ? record.delete('topic') : record['topic']) || @default_topic || tag
      partition_key = (@exclude_partition_key ? record.delete('partition_key') : record['partition_key']) || @default_partition_key
      partition = (@exclude_partition ? record.delete('partition'.freeze) : record['partition'.freeze]) || @default_partition
      message_key = (@exclude_message_key ? record.delete('message_key') : record['message_key']) || @default_message_key

      value = @formatter_proc.call(tag, time, record)

      log.trace { "message will send to #{topic} with partition_key: #{partition_key}, partition: #{partition}, message_key: #{message_key} and value: #{value}." }
      begin
        producer.produce(value, topic: topic, key: message_key, partition: partition, partition_key: partition_key)
      rescue Kafka::BufferOverflow => e
        log.warn "BufferOverflow occurred: #{e}"
        log.info "Trying to deliver the messages to prevent the buffer from overflowing again."
        producer.deliver_messages
        log.info "Recovered from BufferOverflow successfully`"
      end
    end

    producer.deliver_messages
    producer.shutdown
  rescue Exception => e
    log.warn "Send exception occurred: #{e}"
    producer.shutdown if producer
    refresh_client
    raise e
  end
end
refresh_client() click to toggle source
# File lib/fluent/plugin/out_kafka.rb, line 87
def refresh_client
  if @zookeeper
    @seed_brokers = []
    z = Zookeeper.new(@zookeeper)
    z.get_children(:path => @zookeeper_path)[:children].each do |id|
      broker = Yajl.load(z.get(:path => @zookeeper_path + "/#{id}")[:data])
      @seed_brokers.push("#{broker['host']}:#{broker['port']}")
    end
    z.close
    log.info "brokers has been refreshed via Zookeeper: #{@seed_brokers}"
  end
  begin
    if @seed_brokers.length > 0
      if @scram_mechanism != nil && @username != nil && @password != nil
        @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
                           ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                           sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism)
      elsif @username != nil && @password != nil
        @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
                           ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                           sasl_plain_username: @username, sasl_plain_password: @password)
      else
        @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
                           ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
                           sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
      end
      log.info "initialized kafka producer: #{@client_id}"
    else
      log.warn "No brokers found on Zookeeper"
    end
  rescue Exception => e
    log.error e
  end
end
setup_formatter(conf) click to toggle source
# File lib/fluent/plugin/out_kafka.rb, line 172
def setup_formatter(conf)
  if @output_data_type == 'json'
    require 'yajl'
    Proc.new { |tag, time, record| Yajl::Encoder.encode(record) }
  elsif @output_data_type == 'ltsv'
    require 'ltsv'
    Proc.new { |tag, time, record| LTSV.dump(record) }
  elsif @output_data_type == 'msgpack'
    require 'msgpack'
    Proc.new { |tag, time, record| record.to_msgpack }
  elsif @output_data_type =~ /^attr:(.*)$/
    @custom_attributes = $1.split(',').map(&:strip).reject(&:empty?)
    @custom_attributes.unshift('time') if @output_include_time
    @custom_attributes.unshift('tag') if @output_include_tag
    Proc.new { |tag, time, record|
      @custom_attributes.map { |attr|
        record[attr].nil? ? '' : record[attr].to_s
      }.join(@f_separator)
    }
  else
    @formatter = Fluent::Plugin.new_formatter(@output_data_type)
    @formatter.configure(conf)
    @formatter.method(:format)
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka.rb, line 167
def shutdown
  super
  @kafka = nil
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kafka.rb, line 162
def start
  super
  refresh_client
end