class Freddy::Consumers::RespondToConsumer

Public Class Methods

consume(*attrs, &block) click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 4
def self.consume(*attrs, &block)
  new(*attrs).consume(&block)
end
new(thread_pool:, destination:, channel:, handler_adapter_factory:) click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 8
def initialize(thread_pool:, destination:, channel:, handler_adapter_factory:)
  @consume_thread_pool = thread_pool
  @destination = destination
  @channel = channel
  @handler_adapter_factory = handler_adapter_factory
end

Public Instance Methods

consume(&block) click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 15
def consume(&block)
  consumer = consume_from_destination do |delivery|
    adapter = @handler_adapter_factory.for(delivery)

    msg_handler = MessageHandler.new(adapter, delivery)
    block.call(delivery.payload, msg_handler)
  end

  ResponderHandler.new(consumer, @consume_thread_pool)
end

Private Instance Methods

consume_from_destination(&block) click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 28
def consume_from_destination(&block)
  @channel.queue(@destination).subscribe(manual_ack: true) do |delivery|
    process_message(delivery, &block)
  end
end
process_message(delivery, &block) click to toggle source
# File lib/freddy/consumers/respond_to_consumer.rb, line 34
def process_message(delivery, &block)
  @consume_thread_pool.process do
    begin
      Freddy.trace = delivery.build_trace("freddy:respond:#{@destination}",
        tags: {
          'peer.address': "#{@destination}:#{delivery.payload[:type]}",
          'component': 'freddy',
          'span.kind': 'server' # RPC
        }
      )
      Freddy.trace.log(
        event: 'Received message through respond_to',
        queue: @destination,
        payload: delivery.payload,
        correlation_id: delivery.correlation_id
      )

      block.call(delivery)
    ensure
      @channel.acknowledge(delivery.tag, false)
      Freddy.trace.finish
      Freddy.trace = nil
    end
  end
end