class Freddy::Consumers::TapIntoConsumer

Public Class Methods

consume(*attrs, &block) click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 4
def self.consume(*attrs, &block)
  new(*attrs).consume(&block)
end
new(thread_pool:, pattern:, channel:, options:) click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 8
def initialize(thread_pool:, pattern:, channel:, options:)
  @consume_thread_pool = thread_pool
  @pattern = pattern
  @channel = channel
  @options = options
end

Public Instance Methods

consume(&block) click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 15
def consume(&block)
  queue = create_queue

  consumer = queue.subscribe(manual_ack: true) do |delivery|
    process_message(queue, delivery, &block)
  end

  ResponderHandler.new(consumer, @consume_thread_pool)
end

Private Instance Methods

create_queue() click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 27
def create_queue
  topic_exchange = @channel.topic(Freddy::FREDDY_TOPIC_EXCHANGE_NAME)
  group = @options.fetch(:group, nil)

  if group
    @channel
      .queue("groups.#{group}")
      .bind(topic_exchange, routing_key: @pattern)
  else
    @channel
      .queue('', exclusive: true)
      .bind(topic_exchange, routing_key: @pattern)
  end
end
process_message(queue, delivery, &block) click to toggle source
# File lib/freddy/consumers/tap_into_consumer.rb, line 42
def process_message(queue, delivery, &block)
  @consume_thread_pool.process do
    begin
      Freddy.trace = delivery.build_trace("freddy:observe:#{@pattern}",
        tags: {
          'message_bus.destination': @pattern,
          'component': 'freddy',
          'span.kind': 'consumer' # Message Bus
        },
        force_follows_from: true
      )
      Freddy.trace.log(
        event: 'Received message through tap_into',
        payload: delivery.payload,
        correlation_id: delivery.correlation_id
      )

      block.call delivery.payload, delivery.routing_key
    ensure
      @channel.acknowledge(delivery.tag, false)
      Freddy.trace.finish
      Freddy.trace = nil
    end
  end
end