module OpenTelemetry::Instrumentation::RubyKafka::Patches::Client
The Client
module contains the instrumentation patch the Client#deliver_message
and Client#each_message
methods.
Public Instance Methods
deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, retries: 1)
click to toggle source
Calls superclass method
# File lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb, line 13 def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, retries: 1) attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => topic, 'messaging.destination_kind' => 'topic' } attributes['messaging.kafka.message_key'] = key if key attributes['messaging.kafka.partition'] = partition if partition tracer.in_span("#{topic} send", attributes: attributes, kind: :producer) do OpenTelemetry.propagation.inject(headers) super end end
each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1_048_576) { |message| ... }
click to toggle source
Calls superclass method
# File lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb, line 29 def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1_048_576, &block) super do |message| attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => message.topic, 'messaging.destination_kind' => 'topic', 'messaging.kafka.partition' => message.partition } attributes['messaging.kafka.message_key'] = message.key if message.key parent_context = OpenTelemetry.propagation.extract(message.headers) OpenTelemetry::Context.with_current(parent_context) do tracer.in_span("#{topic} process", attributes: attributes, kind: :consumer) do yield message end end end end
Private Instance Methods
tracer()
click to toggle source
# File lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb, line 51 def tracer RubyKafka::Instrumentation.instance.tracer end