class Mimi::Messaging::SQS_SNS::ReplyConsumer

ReplyConsumer listens on a particular SQS queue for replies and passes them to registered Queues (see Ruby ::Queue class).

Attributes

reply_queue_name[R]
reply_queue_url[R]

Public Class Methods

new(adapter, reply_queue_name) click to toggle source
# File lib/mimi/messaging/sqs_sns/reply_consumer.rb, line 13
def initialize(adapter, reply_queue_name)
  @mutex = Mutex.new
  @queues = {}
  @adapter = adapter
  @reply_queue_name = reply_queue_name
  @consumer = TemporaryQueueConsumer.new(adapter, reply_queue_name) do |message|
    dispatch_message(message)
  end
  @reply_queue_url = @consumer.queue_url
end

Public Instance Methods

register_request_id(request_id) click to toggle source

Register a new request_id to listen for.

Whenever the message with given request_id will be received, it will be dispatched to a returned Queue.

@param request_id [String] @return [Queue] a new Queue object registered for this request_id

# File lib/mimi/messaging/sqs_sns/reply_consumer.rb, line 38
def register_request_id(request_id)
  queue = TimeoutQueue.new
  @mutex.synchronize do
    queue = @queues[request_id] ||= queue
  end
  queue
end
stop() click to toggle source
# File lib/mimi/messaging/sqs_sns/reply_consumer.rb, line 24
def stop
  @consumer.stop
rescue StandardError => e
  raise Mimi::Messaging::Error, "Failed to stop reply consumer: #{e}"
end

Private Instance Methods

deserialize_headers(message) click to toggle source

Deserializes headers from the message

@param message @return [Hash<Symbol,String>] symbolized keys, string values

# File lib/mimi/messaging/sqs_sns/reply_consumer.rb, line 53
def deserialize_headers(message)
  message.message_attributes.to_h.map { |k, v| [k.to_sym, v.string_value] }.to_h
end
dispatch_message(message) click to toggle source

Dispatch message received on a reply queue

@param message [] an AWS SQS message

# File lib/mimi/messaging/sqs_sns/reply_consumer.rb, line 61
def dispatch_message(message)
  queue = nil
  @mutex.synchronize do
    headers = deserialize_headers(message)
    request_id = headers[:__request_id]
    Mimi::Messaging.log "dispatching response, headers:#{headers}"
    queue = @queues.delete(request_id)
  end
  queue&.push(message)
rescue StandardError => e
  Mimi::Messaging.log "reply listener failed to process reply: #{e}"
  # TODO: propagate exception to main thread?
end