class Fluent::Plugin::DeisOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_deis.rb, line 43 def initialize super require 'kafka' require 'fluent/plugin/kafka_producer_ext' @kafka = nil @producers = {} @producers_mutex = Mutex.new end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_deis.rb, line 104 def configure(conf) super @producer_opts = { max_retries: @max_send_retries, required_acks: @required_acks } @producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout @producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec return unless @discard_kafka_delivery_failed log.warn "'discard_kafka_delivery_failed' option discards events which "\ 'cause delivery failure, e.g. invalid topic or something.' log.warn 'If this is unexpected, you need to check your configuration or data.' end
deliver_messages(producer, tag)
click to toggle source
# File lib/fluent/plugin/out_deis.rb, line 82 def deliver_messages(producer, tag) if @discard_kafka_delivery_failed begin producer.deliver_messages rescue Kafka::DeliveryFailed => e log.warn 'DeliveryFailed occurred. Discard broken event:', error: e.to_s, error_class: e.class.to_s, tag: tag producer.clear_buffer end else producer.deliver_messages end end
filter_record(record)
click to toggle source
def emit(tag, es, chain)
super(tag, es, chain, tag)
end
# File lib/fluent/plugin/out_deis.rb, line 121 def filter_record(record) return unless from_router?(record) data = build_series(record) return unless data return data.map do |point| InfluxDB::PointValue.new(point).dump end.join("\n") rescue Exception => e # rubocop:disable RescueException puts "Error:#{e.backtrace}" end
get_producer()
click to toggle source
# File lib/fluent/plugin/out_deis.rb, line 71 def get_producer # rubocop:disable AccessorMethodName @producers_mutex.synchronize do producer = @producers[Thread.current.object_id] unless producer producer = @kafka.producer(@producer_opts) @producers[Thread.current.object_id] = producer end producer end end
refresh_client(raise_error = true)
click to toggle source
# File lib/fluent/plugin/out_deis.rb, line 96 def refresh_client(raise_error = true) @kafka = Kafka.new(seed_brokers: @brokers.split(','), client_id: @client_id) log.info "initialized kafka producer: #{@client_id}" rescue Exception => e # rubocop:disable RescueException raise e if raise_error log.error e end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_deis.rb, line 58 def shutdown super shutdown_producers @kafka = nil end
shutdown_producers()
click to toggle source
# File lib/fluent/plugin/out_deis.rb, line 64 def shutdown_producers @producers_mutex.synchronize do @producers.each_value(&:shutdown) @producers = {} end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_deis.rb, line 53 def start super refresh_client end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_deis.rb, line 132 def write(chunk) tag = chunk.metadata.tag producer = get_producer records_by_topic = {} bytes_by_topic = {} messages = 0 messages_bytes = 0 record_buf = nil record_buf_bytes = nil begin Fluent::Engine.msgpack_factory.unpacker(chunk.open).each do |time, record| begin topic = @metrics_topic records_by_topic[topic] ||= 0 bytes_by_topic[topic] ||= 0 line = filter_record(record) next unless line record_buf_bytes = line.bytesize if @max_send_limit_bytes && record_buf_bytes > @max_send_limit_bytes log.warn 'record size exceeds max_send_limit_bytes. Skip event:', time: time, record: record next end rescue StandardError => e log.warn 'unexpected error during format record. Skip broken event:', error: e.to_s, error_class: e.class.to_s, time: time, record: record next end if messages.positive? && (messages_bytes + record_buf_bytes > @kafka_agg_max_bytes) || (@kafka_agg_max_messages && messages >= @kafka_agg_max_messages) log.debug do "#{messages} messages send because reaches the limit of batch transmission." end deliver_messages(producer, tag) messages = 0 messages_bytes = 0 end log.trace do "message will send to #{topic} with partition_key: #{partition_key},"\ "partition: #{partition}, message_key: #{message_key} and value: #{record_buf}." end messages += 1 producer.produce2( line, topic: topic ) messages_bytes += record_buf_bytes records_by_topic[topic] += 1 bytes_by_topic[topic] += record_buf_bytes end if messages.positive? log.debug { "#{messages} messages send." } deliver_messages(producer, tag) end log.debug { "(records|bytes) (#{records_by_topic}|#{bytes_by_topic})" } end rescue Exception => e # rubocop:disable RescueException log.warn "Send exception occurred: #{e}" log.warn "Exception Backtrace : #{e.backtrace.join("\n")}" # For safety, refresh client and its producers shutdown_producers refresh_client(false) # Raise exception to retry sendind messages raise e end