class Fluent::Plugin::Fluent::Rdkafka2Output
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rdkafka2.rb, line 102 def initialize super @producers = nil @producers_mutex = nil @shared_producer = nil end
Public Instance Methods
add(level, &block)
click to toggle source
# File lib/fluent/plugin/out_rdkafka2.rb, line 113 def add(level, &block) return unless block # Follow rdkakfa's log level. See also rdkafka-ruby's bindings.rb: https://github.com/appsignal/rdkafka-ruby/blob/e5c7261e3f2637554a5c12b924be297d7dca1328/lib/rdkafka/bindings.rb#L117 case level when Logger::FATAL self.fatal(block.call) when Logger::ERROR self.error(block.call) when Logger::WARN self.warn(block.call) when Logger::INFO self.info(block.call) when Logger::DEBUG self.debug(block.call) else self.trace(block.call) end end
build_config()
click to toggle source
# File lib/fluent/plugin/out_rdkafka2.rb, line 163 def build_config config = {:"bootstrap.servers" => @brokers} if @ssl_ca_cert && @ssl_ca_cert[0] ssl = true config[:"ssl.ca.location"] = @ssl_ca_cert[0] config[:"ssl.certificate.location"] = @ssl_client_cert if @ssl_client_cert config[:"ssl.key.location"] = @ssl_client_cert_key if @ssl_client_cert_key config[:"ssl.key.password"] = @ssl_client_cert_key_password if @ssl_client_cert_key_password end if @principal sasl = true config[:"sasl.mechanisms"] = "GSSAPI" config[:"sasl.kerberos.principal"] = @principal config[:"sasl.kerberos.service.name"] = @service_name if @service_name config[:"sasl.kerberos.keytab"] = @keytab if @keytab end if ssl && sasl security_protocol = "SASL_SSL" elsif ssl && !sasl security_protocol = "SSL" elsif !ssl && sasl security_protocol = "SASL_PLAINTEXT" else security_protocol = "PLAINTEXT" end config[:"security.protocol"] = security_protocol config[:"compression.codec"] = @compression_codec if @compression_codec config[:"message.send.max.retries"] = @max_send_retries if @max_send_retries config[:"request.required.acks"] = @required_acks if @required_acks config[:"request.timeout.ms"] = @ack_timeout * 1000 if @ack_timeout config[:"queue.buffering.max.ms"] = @rdkafka_buffering_max_ms if @rdkafka_buffering_max_ms config[:"queue.buffering.max.messages"] = @rdkafka_buffering_max_messages if @rdkafka_buffering_max_messages config[:"message.max.bytes"] = @rdkafka_message_max_bytes if @rdkafka_message_max_bytes config[:"batch.num.messages"] = @rdkafka_message_max_num if @rdkafka_message_max_num @rdkafka_options.each { |k, v| config[k.to_sym] = v } config end
close_producer(producer)
click to toggle source
# File lib/fluent/plugin/out_rdkafka2.rb, line 248 def close_producer(producer) unless producer.close(10) log.warn("Queue is forcefully closed after 10 seconds wait") end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rdkafka2.rb, line 110 def configure(conf) super log.instance_eval { def add(level, &block) return unless block # Follow rdkakfa's log level. See also rdkafka-ruby's bindings.rb: https://github.com/appsignal/rdkafka-ruby/blob/e5c7261e3f2637554a5c12b924be297d7dca1328/lib/rdkafka/bindings.rb#L117 case level when Logger::FATAL self.fatal(block.call) when Logger::ERROR self.error(block.call) when Logger::WARN self.warn(block.call) when Logger::INFO self.info(block.call) when Logger::DEBUG self.debug(block.call) else self.trace(block.call) end end } Rdkafka::Config.logger = log config = build_config @rdkafka = Rdkafka::Config.new(config) if @default_topic.nil? if @chunk_keys.include?(@topic_key) && !@chunk_key_tag log.warn "Use '#{@topic_key}' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer #{@topic_key},tag>" end else if @chunk_key_tag log.warn "default_topic is set. Fluentd's event tag is not used for topic" end end formatter_conf = conf.elements('format').first unless formatter_conf raise Fluent::ConfigError, "<format> section is required." end unless formatter_conf["@type"] raise Fluent::ConfigError, "format/@type is required." end @formatter_proc = setup_formatter(formatter_conf) @topic_key_sym = @topic_key.to_sym @headers_from_record_accessors = {} @headers_from_record.each do |key, value| @headers_from_record_accessors[key] = record_accessor_create(value) end end
enqueue_with_retry(producer, topic, record_buf, message_key, partition, headers)
click to toggle source
# File lib/fluent/plugin/out_rdkafka2.rb, line 338 def enqueue_with_retry(producer, topic, record_buf, message_key, partition, headers) attempt = 0 loop do begin return producer.produce(topic: topic, payload: record_buf, key: message_key, partition: partition, headers: headers) rescue Exception => e if e.respond_to?(:code) && e.code == :queue_full if attempt <= @max_enqueue_retries log.warn "Failed to enqueue message; attempting retry #{attempt} of #{@max_enqueue_retries} after #{@enqueue_retry_backoff}s" sleep @enqueue_retry_backoff attempt += 1 else raise "Failed to enqueue message although tried retry #{@max_enqueue_retries} times" end else raise e end end end end
get_producer()
click to toggle source
# File lib/fluent/plugin/out_rdkafka2.rb, line 254 def get_producer if @share_producer @shared_producer else @producers_mutex.synchronize { producer = @producers[Thread.current.object_id] unless producer producer = @rdkafka.producer @producers[Thread.current.object_id] = producer end producer } end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_rdkafka2.rb, line 220 def multi_workers_ready? true end
setup_formatter(conf)
click to toggle source
# File lib/fluent/plugin/out_rdkafka2.rb, line 269 def setup_formatter(conf) type = conf['@type'] case type when 'ltsv' require 'ltsv' Proc.new { |tag, time, record| LTSV.dump(record) } else @formatter = formatter_create(usage: 'rdkafka-plugin', conf: conf) @formatter.method(:format) end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rdkafka2.rb, line 224 def shutdown super shutdown_producers end
shutdown_producers()
click to toggle source
# File lib/fluent/plugin/out_rdkafka2.rb, line 229 def shutdown_producers if @share_producer close_producer(@shared_producer) @shared_producer = nil else @producers_mutex.synchronize { shutdown_threads = @producers.map { |key, producer| th = Thread.new { close_producer(producer) } th.abort_on_exception = true th } shutdown_threads.each { |th| th.join } @producers = {} } end end
start()
click to toggle source
Calls superclass method
Fluent::KafkaPluginUtil::SSLSettings#start
# File lib/fluent/plugin/out_rdkafka2.rb, line 209 def start if @share_producer @shared_producer = @rdkafka.producer else @producers = {} @producers_mutex = Mutex.new end super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_rdkafka2.rb, line 281 def write(chunk) tag = chunk.metadata.tag topic = if @topic extract_placeholders(@topic, chunk) else (chunk.metadata.variables && chunk.metadata.variables[@topic_key_sym]) || @default_topic || tag end handlers = [] record_buf = nil record_buf_bytes = nil headers = @headers.clone begin producer = get_producer chunk.msgpack_each { |time, record| begin record = inject_values_to_record(tag, time, record) record.delete(@topic_key) if @exclude_topic_key partition = (@exclude_partition ? record.delete(@partition_key) : record[@partition_key]) || @default_partition message_key = (@exclude_message_key ? record.delete(@message_key_key) : record[@message_key_key]) || @default_message_key @headers_from_record_accessors.each do |key, header_accessor| headers[key] = header_accessor.call(record) end record_buf = @formatter_proc.call(tag, time, record) record_buf_bytes = record_buf.bytesize if @max_send_limit_bytes && record_buf_bytes > @max_send_limit_bytes log.warn "record size exceeds max_send_limit_bytes. Skip event:", :time => time, :record => record next end rescue StandardError => e log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record next end handler = enqueue_with_retry(producer, topic, record_buf, message_key, partition, headers) if @rdkafka_delivery_handle_poll_timeout != 0 handlers << handler end } handlers.each { |handler| handler.wait(max_wait_timeout: @rdkafka_delivery_handle_poll_timeout) } end rescue Exception => e if @discard_kafka_delivery_failed log.warn "Delivery failed. Discard events:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag else log.warn "Send exception occurred: #{e} at #{e.backtrace.first}" # Raise exception to retry sendind messages raise e end end