class EventQ::Amazon::SubscriptionManager

Public Class Methods

new(options) click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 6
def initialize(options)
  mandatory = %i[client queue_manager]
  missing = mandatory - options.keys
  raise "[#{self.class}] - Missing options #{missing} must be specified." unless missing.empty?

  @client = options[:client]
  @manager = options[:queue_manager]
end

Public Instance Methods

subscribe(event_type, queue, topic_region = nil, queue_region = nil, topic_namespaces = [EventQ.namespace]) click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 15
def subscribe(event_type, queue, topic_region = nil, queue_region = nil, topic_namespaces = [EventQ.namespace])
  if queue.isolated
    method = :get_topic_arn
  else
    method = :create_topic_arn
  end

  topic_arn = @client.sns_helper(topic_region).public_send(method, event_type, topic_region)
  raise Exceptions::EventTypeNotFound, "SNS topic not found, unable to subscribe to #{event_type}" unless topic_arn

  queue_arn = configure_queue(queue, queue_region)

  # subscribe the queue to the topic with the namespaces provided
  topic_namespaces.each do |namespace|
    namespaced_topic_arn = topic_arn.gsub(":#{EventQ.namespace}-", ":#{namespace}-")

    # create the sns topic - this method is idempotent & returns the topic arn if it already exists
    @client.sns_helper.create_topic_arn("#{namespace}-#{event_type}".delete('.')) unless queue.isolated

    # skip subscribe if subscription for given queue/topic already exists
    # this is a workaround for a localstack issue: https://github.com/localstack/localstack/issues/933
    return true if existing_subscription?(queue_arn, namespaced_topic_arn)

    EventQ.logger.debug do
      "[#{self.class} #subscribe] - Subscribing Queue: #{queue.name} to topic_arn: #{namespaced_topic_arn}, endpoint: #{queue_arn}"
    end

    @client.sns(topic_region).subscribe(
      topic_arn: namespaced_topic_arn,
      protocol: 'sqs',
      endpoint: queue_arn
    )
  end

  true
end
unsubscribe(_queue) click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 52
def unsubscribe(_queue)
  raise "[#{self.class}] - Not implemented. Please unsubscribe the queue from the topic inside the AWS Management Console."
end

Private Instance Methods

configure_queue(queue, region) click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 58
def configure_queue(queue, region)
  q = @manager.get_queue(queue)
  queue_arn = @client.sqs_helper(region).get_queue_arn(queue)

  attributes = default_queue_attributes(q, queue_arn)
  @client.sqs(region).set_queue_attributes(attributes)
  queue_arn
end
default_queue_attributes(queue, queue_arn) click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 67
def default_queue_attributes(queue, queue_arn)
  {
    queue_url: queue,
    attributes:
      {
        'Policy' => queue_policy(queue_arn)
      }
  }
end
existing_subscription?(queue_arn, topic_arn) click to toggle source

check if there is an existing subscription for the given queue/topic

# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 94
def existing_subscription?(queue_arn, topic_arn)
  subscriptions = @client.sns.list_subscriptions.subscriptions
  subscriptions.any? { |subscription| subscription.topic_arn == topic_arn && subscription.endpoint == queue_arn }
end
queue_policy(queue_arn) click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 77
def queue_policy(queue_arn)
  '{
    "Version": "2012-10-17",
    "Id": "SNStoSQS",
    "Statement": [
      {
        "Sid":"rule1",
        "Effect": "Allow",
        "Principal": "*",
        "Action": "sqs:*",
        "Resource": "' + queue_arn + '"
      }
    ]
  }'
end