class Mimi::Messaging::SQS_SNS::Adapter

AWS SQS/SNS adapter class

An adapter implementation must implement the following methods:

Constants

DEFAULT_OPTIONS
SQS_SNS_ALPHABET_MAP

NOTE: AWS SQS/SNS alphabet for queue and topic names is different from what mimi-messaging allows: '.' is not an allowed character.

SQS_SNS_ALPHABET_MAP structure is used to convert names from mimi-messaging alphabet to SQS/SNS alphabet.

Mimi::Messaging still accepts queue and topic names containing the '.', but the adapter will convert those to valid SQS/SNS names using this mapping.

Attributes

options[R]
sns_client[R]
sqs_client[R]
worker_pool[R]

Public Class Methods

new(options) click to toggle source

Initializes SQS/SNS adapter

@param options [Hash] @option options [String] :mq_adapter @option options [String,nil] :mq_aws_region @option options [String,nil] :mq_aws_access_key_id @option options [String,nil] :mq_aws_secret_access_key @option options [String,nil] :mq_aws_sqs_endpoint @option options [String,nil] :mq_namespace @option options [String,nil] :mq_reply_queue_prefix @option options [Integer,nil] :mq_default_query_timeout

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 81
def initialize(options)
  @options = DEFAULT_OPTIONS.merge(options).dup
  @reply_consumer_mutex = Mutex.new
end

Public Instance Methods

command(target, message, _opts = {}) click to toggle source

Sends the command to the given target

Example:

Mimi::Messaging.command("users/create", name: "John Smith")

@param target [String] “<queue>/<method>” @param message [Hash,Mimi::Messaging::Message] @param opts [Hash] additional adapter-specific options

@return nil

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 124
def command(target, message, _opts = {})
  queue_name, method_name = target.split("/")
  message = Mimi::Messaging::Message.new(message, __method: method_name)
  queue_url = find_queue!(queue_name)
  deliver_message_queue(queue_url, message)
end
create_queue(queue_name) click to toggle source

Creates a new queue

@param queue_name [String] name of the topic to be created @return [String] a new queue URL

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 215
def create_queue(queue_name)
  fqn = sqs_sns_converted_full_name(queue_name)
  Mimi::Messaging.log "Creating a queue: #{fqn}"
  attrs = {}
  if options[:mq_aws_sqs_sns_kms_master_key_id]
    attrs["KmsMasterKeyId"] = options[:mq_aws_sqs_sns_kms_master_key_id]
  end
  result = sqs_client.create_queue(queue_name: fqn, attributes: attrs)
  result.queue_url
rescue StandardError => e
  raise Mimi::Messaging::ConnectionError, "Failed to create queue '#{queue_name}': #{e}"
end
delete_queue(queue_url) click to toggle source

Deletes a queue identified by the queue URL

@param queue_url [String]

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 244
def delete_queue(queue_url)
  Mimi::Messaging.log "Deleting a queue: #{queue_url}"
  sqs_client.delete_queue(queue_url: queue_url)
rescue StandardError => e
  raise Mimi::Messaging::ConnectionError,
    "Failed to delete queue with url '#{queue_url}': #{e}"
end
event(target, message, _opts = {}) click to toggle source

Broadcasts the event with the given target

@param target [String] “<topic>#<event_type>”, e.g. “customers#created” @param message [Mimi::Messaging::Message] @param opts [Hash] additional options

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 164
def event(target, message, _opts = {})
  topic_name, event_type = target.split("#")
  message = Mimi::Messaging::Message.new(message, __event_type: event_type)
  topic_arn = find_or_create_topic(topic_name) # TODO: or find_topic!(...) ?
  deliver_message_topic(topic_arn, message)
end
find_or_create_queue(queue_name) click to toggle source

Finds a queue URL for a queue with given name.

If an existing queue with this name is not found, the method will try to create a new one.

@param queue_name [String] @return [String] a queue URL

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 236
def find_or_create_queue(queue_name)
  queue_registry(queue_name) || create_queue(queue_name)
end
query(target, message, opts = {}) click to toggle source

Executes the query to the given target and returns response

@param target [String] “<queue>/<method>” @param message [Hash,Mimi::Messaging::Message] @param opts [Hash] additional options, e.g. :timeout

@return [Hash] @raise [SomeError,Timeout::Error]

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 140
def query(target, message, opts = {})
  queue_name, method_name = target.split("/")
  queue_url = find_queue!(queue_name)
  request_id = SecureRandom.hex(8)
  reply_queue = reply_consumer.register_request_id(request_id)

  message = Mimi::Messaging::Message.new(
    message,
    __method: method_name,
    __reply_queue_url: reply_consumer.reply_queue_url,
    __request_id: request_id
  )
  deliver_message_queue(queue_url, message)
  timeout = opts[:timeout] || options[:mq_default_query_timeout]
  response = reply_queue.pop(true, timeout)
  deserialize(response.body)
end
start() click to toggle source
# File lib/mimi/messaging/sqs_sns/adapter.rb, line 86
def start
  @sqs_client = Aws::SQS::Client.new(sqs_client_config)
  @sns_client = Aws::SNS::Client.new(sns_client_config)
  start_worker_pool!
  check_availability!
end
start_event_processor(topic_name, processor, opts = {}) click to toggle source
# File lib/mimi/messaging/sqs_sns/adapter.rb, line 193
def start_event_processor(topic_name, processor, opts = {})
  # NOTE: due to SQS/SNS limitations, implementing this will
  # require creating a temporary queue and subscribing it to the topic
  raise "Not implemented"
end
start_event_processor_with_queue(topic_name, queue_name, processor, opts = {}) click to toggle source
# File lib/mimi/messaging/sqs_sns/adapter.rb, line 199
def start_event_processor_with_queue(topic_name, queue_name, processor, opts = {})
  @consumers ||= []
  opts = opts.dup
  topic_arn = find_or_create_topic(topic_name) # TODO: or find_topic!(...) ?
  queue_url = find_or_create_queue(queue_name)
  subscribe_topic_queue(topic_arn, queue_url)
  @consumers << Consumer.new(self, queue_url) do |m|
    process_event_message(processor, m)
  end
end
start_request_processor(queue_name, processor, opts = {}) click to toggle source

Starts a request (command/query) processor.

Processor must respond to call_command() AND call_query() which accepts 3 arguments: (method, message, opts).

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

@param queue_name [String] “<queue>” @param processor [#call_command(),#call_query()] @param opts [Hash] additional adapter-specific options

Calls superclass method
# File lib/mimi/messaging/sqs_sns/adapter.rb, line 183
def start_request_processor(queue_name, processor, opts = {})
  super
  @consumers ||= []
  opts = opts.dup
  queue_url = find_or_create_queue(queue_name)
  @consumers << Consumer.new(self, queue_url) do |m|
    process_request_message(processor, m)
  end
end
stop() click to toggle source
# File lib/mimi/messaging/sqs_sns/adapter.rb, line 93
def stop
  stop_all_processors
  stop_worker_pool!
  @sqs_client = nil
  @sns_client = nil
end
stop_all_processors() click to toggle source

Stops all message (command, query and event) processors.

Stops currently registered processors and stops accepting new messages for processors.

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 105
def stop_all_processors
  @consumers&.each(&:signal_stop)
  @consumers&.each(&:stop)
  @consumers = nil
  @reply_consumer&.stop
  @reply_consumer = nil
end

Private Instance Methods

check_availability!() click to toggle source

Checks SQS and SNS clients availability

@raise [Mimi::Messaging::ConnectionError]

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 286
def check_availability!
  begin
    queue_registry("test")
  rescue StandardError => e
    raise Mimi::Messaging::ConnectionError, "SQS connection is not available: #{e}"
  end
  begin
    topic_registry("test")
  rescue StandardError => e
    raise Mimi::Messaging::ConnectionError, "SNS connection is not available: #{e}"
  end
end
create_topic(topic_name) click to toggle source

Creates a new topic

@param topic_name [String] name of the topic to be created @return [String] a new topic ARN

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 497
def create_topic(topic_name)
  fqn = sqs_sns_converted_full_name(topic_name)
  Mimi::Messaging.log "Creating a topic: #{fqn}"
  attrs = {}
  if options[:mq_aws_sqs_sns_kms_master_key_id]
    attrs["KmsMasterKeyId"] = options[:mq_aws_sqs_sns_kms_master_key_id]
  end
  result = sns_client.create_topic(name: fqn, attributes: attrs)
  result.topic_arn
rescue StandardError => e
  raise Mimi::Messaging::ConnectionError, "Failed to create topic '#{topic_name}': #{e}"
end
deliver_message_queue(queue_url, message) click to toggle source

Delivers a message to a queue with given URL.

@param queue_url [String] @param message [Mimi::Messaging::Message]

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 304
def deliver_message_queue(queue_url, message)
  raise ArgumentError, "Non-empty queue URL is expected" unless queue_url
  unless message.is_a?(Mimi::Messaging::Message)
    raise ArgumentError, "Message is expected as argument"
  end

  Mimi::Messaging.log "Delivering message to: #{queue_url}, headers: #{message.headers}"
  sqs_client.send_message(
    queue_url: queue_url,
    message_body: serialize(message),
    message_attributes: message.headers.map do |k, v|
      [k.to_s, { data_type: "String", string_value: v.to_s }]
    end.to_h
  )
rescue StandardError => e
  raise Mimi::Messaging::ConnectionError, "Failed to deliver message to '#{queue_url}': #{e}"
end
deliver_message_topic(topic_arn, message) click to toggle source

Delivers a message to a topic with given ARN.

@param topic_arn [String] @param message [Mimi::Messaging::Message]

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 538
def deliver_message_topic(topic_arn, message)
  raise ArgumentError, "Non-empty topic ARN is expected" unless topic_arn
  unless message.is_a?(Mimi::Messaging::Message)
    raise ArgumentError, "Message is expected as argument"
  end

  Mimi::Messaging.log "Delivering message to: #{topic_arn}"
  sns_client.publish(
    topic_arn: topic_arn,
    message: serialize(message),
    message_attributes: message.headers.map do |k, v|
      [k.to_s, { data_type: "String", string_value: v.to_s }]
    end.to_h
  )
rescue StandardError => e
  raise Mimi::Messaging::ConnectionError, "Failed to deliver message to '#{topic_arn}': #{e}"
end
deliver_query_response(queue_url, message) click to toggle source

Delivers a message as a response to a QUERY

Responses are allowed to fail. There can be a number of reasons why responses fail: reply queue does not exist (anymore?), response message is too big. In any case the error is reported, but the QUERY message is acknowledged as a successfully processed.

@param queue_url [String] @param message [Mimi::Messaging::Message]

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 356
def deliver_query_response(queue_url, message)
  deliver_message_queue(queue_url, message)
rescue Mimi::Messaging::ConnectionError => e
  Mimi::Messaging.logger&.warn("Failed to deliver QRY response: #{e}")
  # NOTE: error is recovered
end
deserialize_headers(message) click to toggle source

Deserializes headers from the message

@param message @return [Hash<Symbol,String>] symbolized keys, string values

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 428
def deserialize_headers(message)
  message.message_attributes.to_h.map { |k, v| [k.to_sym, v.string_value] }.to_h
end
find_or_create_topic(topic_name) click to toggle source

Finds a topic ARN for a topic with given name.

If an existing topic with this name is not found, the method will try to create a new one.

@param topic_name [String] @return [String] a topic ARN

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 488
def find_or_create_topic(topic_name)
  topic_registry(topic_name) || create_topic(topic_name)
end
find_queue!(queue_name) click to toggle source

Finds a queue URL for a queue with a given name, or raises an error if the queue is not found.

@param queue_name [String] @return [String] a queue URL

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 403
def find_queue!(queue_name)
  queue_registry(queue_name) || (
    raise Mimi::Messaging::ConnectionError,
      "Failed to find a queue with given name: '#{queue_name}'"
  )
end
find_topic!(topic_name) click to toggle source

Finds a topic ARN for a topic with a given name, or raises an error if the topic is not found.

@param topic_name [String] @return [String] a topic ARN

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 473
def find_topic!(topic_name)
  topic_registry(topic_name) || (
    raise Mimi::Messaging::ConnectionError,
      "Failed to find a topic with given name: '#{topic_name}'"
  )
end
process_event_message(processor, sqs_message) click to toggle source

Processes an incoming EVENT message

@param processor [#call_event()] event processor object @param sqs_message []

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 561
def process_event_message(processor, sqs_message)
  message = Mimi::Messaging::Message.new(
    deserialize(sqs_message.body),
    deserialize_headers(sqs_message)
  )
  event_type = message.headers[:__event_type]
  processor.call_event(event_type, message, {})
end
process_request_message(processor, sqs_message) click to toggle source

Processes an incoming COMMAND or QUERY message

@param processor [#call_query(),#call_command()] request processor object @param sqs_message

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 327
def process_request_message(processor, sqs_message)
  message = Mimi::Messaging::Message.new(
    deserialize(sqs_message.body),
    deserialize_headers(sqs_message)
  )
  method_name = message.headers[:__method]
  reply_to = message.headers[:__reply_queue_url]
  if reply_to
    response = processor.call_query(method_name, message, {})
    response_message = Mimi::Messaging::Message.new(
      response,
      __request_id: message.headers[:__request_id]
    )
    deliver_query_response(reply_to, response_message)
  else
    processor.call_command(method_name, message, {})
  end
end
queue_registry(queue_name) click to toggle source

Returns URL of a queue with a given name.

If the queue with given name does not exist, returns nil

@param queue_name [String] @return [String,nil] queue URL

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 370
def queue_registry(queue_name)
  fqn = sqs_sns_converted_full_name(queue_name)
  @queue_registry ||= {}
  @queue_registry[fqn] ||= begin
    result = sqs_client.get_queue_url(queue_name: fqn)
    result.queue_url
  end
rescue Aws::SQS::Errors::NonExistentQueue
  nil
rescue StandardError => e
  raise Mimi::Messaging::ConnectionError, "Failed to get queue url '#{queue_name}': #{e}"
end
reply_consumer() click to toggle source

Returns the configured reply listener for this process

@return [ReplyConsumer]

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 414
def reply_consumer
  @reply_consumer_mutex.synchronize do
    @reply_consumer ||= begin
      reply_queue_name = options[:mq_reply_queue_prefix] + SecureRandom.hex(8)
      Mimi::Messaging::SQS_SNS::ReplyConsumer.new(self, reply_queue_name)
    end
  end
end
sns_client_config() click to toggle source

Returns configuration parameters for AWS SNS client

@return [Hash]

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 272
def sns_client_config
  params = {
    region: options[:mq_aws_region],
    endpoint: options[:mq_aws_sns_endpoint],
    access_key_id: options[:mq_aws_access_key_id],
    secret_access_key: options[:mq_aws_secret_access_key]
  }
  params.compact
end
sns_list_topics() click to toggle source

Lists all SNS topics by their ARNs.

NOTE: iterates over all topics at SNS every time

@return [Array<String>] array of topic ARNs

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 438
def sns_list_topics
  result = []
  next_token = nil
  loop do
    response = sns_client.list_topics(next_token: next_token)
    result += response.topics.map(&:topic_arn)
    next_token = response.next_token
    break unless next_token
  end
  result
rescue StandardError => e
  raise Mimi::Messaging::ConnectionError, "Failed to list topics: #{e}"
end
sqs_client_config() click to toggle source

Returns configuration parameters for AWS SQS client

@return [Hash]

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 258
def sqs_client_config
  params = {
    region: options[:mq_aws_region],
    endpoint: options[:mq_aws_sqs_endpoint],
    access_key_id: options[:mq_aws_access_key_id],
    secret_access_key: options[:mq_aws_secret_access_key]
  }
  params.compact
end
sqs_sns_converted_full_name(name) click to toggle source

Converts a topic or queue name to a fully qualified (with namespace) and in a valid SQS/SNS alphabet.

@param name [String] a mimi-messaging valid name @return [String] an SQS/SNS valid name

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 389
def sqs_sns_converted_full_name(name)
  name = "#{options[:mq_namespace]}#{name}"
  SQS_SNS_ALPHABET_MAP.each do |from, to|
    name = name.gsub(from, to)
  end
  name
end
start_worker_pool!() click to toggle source

Starts the worker pool using current configuration

@return [Concurrent::ThreadPoolExecutor]

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 574
def start_worker_pool!
  Mimi::Messaging.log "Starting worker pool, " \
    "min_threads:#{options[:mq_worker_pool_min_threads]}, " \
    "max_threads:#{options[:mq_worker_pool_max_threads]}, " \
    "max_backlog:#{options[:mq_worker_pool_max_backlog]}"

  @worker_pool = Concurrent::ThreadPoolExecutor.new(
    min_threads: options[:mq_worker_pool_min_threads],
    max_threads: options[:mq_worker_pool_max_threads],
    max_queue: options[:mq_worker_pool_max_backlog],
    fallback_policy: :abort
  )
end
stop_worker_pool!() click to toggle source

Gracefully stops the worker pool, allowing all threads to finish their jobs

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 590
def stop_worker_pool!
  Mimi::Messaging.log "Stopping worker pool"
  @worker_pool.shutdown
  @worker_pool.wait_for_termination
end
subscribe_topic_queue(topic_arn, queue_url) click to toggle source

Subscribes an existing queue to an existing topic

@param topic_arn [String] @param queue_url [String]

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 515
def subscribe_topic_queue(topic_arn, queue_url)
  result = sqs_client.get_queue_attributes(
    queue_url: queue_url, attribute_names: ["QueueArn"]
  )
  queue_arn = result.attributes["QueueArn"]
  Mimi::Messaging.log "Subscribing queue to a topic: '#{topic_arn}'->'#{queue_url}'"
  _result = sns_client.subscribe(
    topic_arn: topic_arn,
    protocol: "sqs",
    endpoint: queue_arn,
    attributes: { "RawMessageDelivery" => "true" }
  )
  true
rescue StandardError => e
  raise Mimi::Messaging::ConnectionError,
    "Failed to subscribe queue to topic '#{topic_arn}'->'#{queue_url}': #{e}"
end
topic_registry(topic_name) click to toggle source

Returns ARN of a topic with a given name.

If the topic with given name does not exist, returns nil

@param topic_name [String] @return [String,nil] topic ARN or nil, if not found

# File lib/mimi/messaging/sqs_sns/adapter.rb, line 459
def topic_registry(topic_name)
  fqn = sqs_sns_converted_full_name(topic_name)
  @topic_registry ||= {}
  @topic_registry[fqn] ||= begin
    sns_list_topics.find { |topic_arn| topic_arn.split(":").last == fqn }
  end
end