class Mimi::Messaging::SQS_SNS::ReplyConsumer
ReplyConsumer
listens on a particular SQS queue for replies and passes them to registered Queues (see Ruby ::Queue class).
Attributes
reply_queue_name[R]
reply_queue_url[R]
Public Class Methods
new(adapter, reply_queue_name)
click to toggle source
# File lib/mimi/messaging/sqs_sns/reply_consumer.rb, line 13 def initialize(adapter, reply_queue_name) @mutex = Mutex.new @queues = {} @adapter = adapter @reply_queue_name = reply_queue_name @consumer = TemporaryQueueConsumer.new(adapter, reply_queue_name) do |message| dispatch_message(message) end @reply_queue_url = @consumer.queue_url end
Public Instance Methods
register_request_id(request_id)
click to toggle source
Register a new request_id to listen for.
Whenever the message with given request_id will be received, it will be dispatched to a returned Queue.
@param request_id [String] @return [Queue] a new Queue object registered for this request_id
# File lib/mimi/messaging/sqs_sns/reply_consumer.rb, line 38 def register_request_id(request_id) queue = TimeoutQueue.new @mutex.synchronize do queue = @queues[request_id] ||= queue end queue end
stop()
click to toggle source
# File lib/mimi/messaging/sqs_sns/reply_consumer.rb, line 24 def stop @consumer.stop rescue StandardError => e raise Mimi::Messaging::Error, "Failed to stop reply consumer: #{e}" end
Private Instance Methods
deserialize_headers(message)
click to toggle source
Deserializes headers from the message
@param message @return [Hash<Symbol,String>] symbolized keys, string values
# File lib/mimi/messaging/sqs_sns/reply_consumer.rb, line 53 def deserialize_headers(message) message.message_attributes.to_h.map { |k, v| [k.to_sym, v.string_value] }.to_h end
dispatch_message(message)
click to toggle source
Dispatch message received on a reply queue
@param message [] an AWS SQS message
# File lib/mimi/messaging/sqs_sns/reply_consumer.rb, line 61 def dispatch_message(message) queue = nil @mutex.synchronize do headers = deserialize_headers(message) request_id = headers[:__request_id] Mimi::Messaging.log "dispatching response, headers:#{headers}" queue = @queues.delete(request_id) end queue&.push(message) rescue StandardError => e Mimi::Messaging.log "reply listener failed to process reply: #{e}" # TODO: propagate exception to main thread? end