class Fluent::Plugin::DeisOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_deis.rb, line 43
def initialize
  super
  require 'kafka'
  require 'fluent/plugin/kafka_producer_ext'

  @kafka = nil
  @producers = {}
  @producers_mutex = Mutex.new
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_deis.rb, line 104
def configure(conf)
  super

  @producer_opts = { max_retries: @max_send_retries, required_acks: @required_acks }
  @producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout
  @producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec

  return unless @discard_kafka_delivery_failed
  log.warn "'discard_kafka_delivery_failed' option discards events which "\
           'cause delivery failure, e.g. invalid topic or something.'
  log.warn 'If this is unexpected, you need to check your configuration or data.'
end
deliver_messages(producer, tag) click to toggle source
# File lib/fluent/plugin/out_deis.rb, line 82
def deliver_messages(producer, tag)
  if @discard_kafka_delivery_failed
    begin
      producer.deliver_messages
    rescue Kafka::DeliveryFailed => e
      log.warn 'DeliveryFailed occurred. Discard broken event:',
               error: e.to_s, error_class: e.class.to_s, tag: tag
      producer.clear_buffer
    end
  else
    producer.deliver_messages
  end
end
filter_record(record) click to toggle source

def emit(tag, es, chain)

super(tag, es, chain, tag)

end

# File lib/fluent/plugin/out_deis.rb, line 121
def filter_record(record)
  return unless from_router?(record)
  data = build_series(record)
  return unless data
  return data.map do |point|
    InfluxDB::PointValue.new(point).dump
  end.join("\n")
rescue Exception => e # rubocop:disable RescueException
  puts "Error:#{e.backtrace}"
end
get_producer() click to toggle source
# File lib/fluent/plugin/out_deis.rb, line 71
def get_producer # rubocop:disable AccessorMethodName
  @producers_mutex.synchronize do
    producer = @producers[Thread.current.object_id]
    unless producer
      producer = @kafka.producer(@producer_opts)
      @producers[Thread.current.object_id] = producer
    end
    producer
  end
end
refresh_client(raise_error = true) click to toggle source
# File lib/fluent/plugin/out_deis.rb, line 96
def refresh_client(raise_error = true)
  @kafka = Kafka.new(seed_brokers: @brokers.split(','), client_id: @client_id)
  log.info "initialized kafka producer: #{@client_id}"
rescue Exception => e # rubocop:disable RescueException
  raise e if raise_error
  log.error e
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_deis.rb, line 58
def shutdown
  super
  shutdown_producers
  @kafka = nil
end
shutdown_producers() click to toggle source
# File lib/fluent/plugin/out_deis.rb, line 64
def shutdown_producers
  @producers_mutex.synchronize do
    @producers.each_value(&:shutdown)
    @producers = {}
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_deis.rb, line 53
def start
  super
  refresh_client
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_deis.rb, line 132
def write(chunk)
  tag = chunk.metadata.tag
  producer = get_producer

  records_by_topic = {}
  bytes_by_topic = {}
  messages = 0
  messages_bytes = 0
  record_buf = nil
  record_buf_bytes = nil
  begin
    Fluent::Engine.msgpack_factory.unpacker(chunk.open).each do |time, record|
      begin
        topic = @metrics_topic
        records_by_topic[topic] ||= 0
        bytes_by_topic[topic] ||= 0
        line = filter_record(record)

        next unless line
        record_buf_bytes = line.bytesize
        if @max_send_limit_bytes && record_buf_bytes > @max_send_limit_bytes
          log.warn 'record size exceeds max_send_limit_bytes. Skip event:',
                   time: time, record: record
          next
        end
      rescue StandardError => e
        log.warn 'unexpected error during format record. Skip broken event:',
                 error: e.to_s, error_class: e.class.to_s, time: time, record: record
        next
      end

      if messages.positive? &&
         (messages_bytes + record_buf_bytes > @kafka_agg_max_bytes) ||
         (@kafka_agg_max_messages && messages >= @kafka_agg_max_messages)
        log.debug do
          "#{messages} messages send because reaches the limit of batch transmission."
        end
        deliver_messages(producer, tag)
        messages = 0
        messages_bytes = 0
      end

      log.trace do
        "message will send to #{topic} with partition_key: #{partition_key},"\
        "partition: #{partition}, message_key: #{message_key} and value: #{record_buf}."
      end

      messages += 1
      producer.produce2(
        line,
        topic: topic
      )
      messages_bytes += record_buf_bytes

      records_by_topic[topic] += 1
      bytes_by_topic[topic] += record_buf_bytes
    end
    if messages.positive?
      log.debug { "#{messages} messages send." }
      deliver_messages(producer, tag)
    end
    log.debug { "(records|bytes) (#{records_by_topic}|#{bytes_by_topic})" }
  end
rescue Exception => e # rubocop:disable RescueException
  log.warn "Send exception occurred: #{e}"
  log.warn "Exception Backtrace : #{e.backtrace.join("\n")}"
  # For safety, refresh client and its producers
  shutdown_producers
  refresh_client(false)
  # Raise exception to retry sendind messages
  raise e
end