class EventQ::Amazon::SubscriptionManager
Public Class Methods
new(options)
click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 6 def initialize(options) mandatory = %i[client queue_manager] missing = mandatory - options.keys raise "[#{self.class}] - Missing options #{missing} must be specified." unless missing.empty? @client = options[:client] @manager = options[:queue_manager] end
Public Instance Methods
subscribe(event_type, queue, topic_region = nil, queue_region = nil, topic_namespaces = [EventQ.namespace])
click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 15 def subscribe(event_type, queue, topic_region = nil, queue_region = nil, topic_namespaces = [EventQ.namespace]) if queue.isolated method = :get_topic_arn else method = :create_topic_arn end topic_arn = @client.sns_helper(topic_region).public_send(method, event_type, topic_region) raise Exceptions::EventTypeNotFound, "SNS topic not found, unable to subscribe to #{event_type}" unless topic_arn queue_arn = configure_queue(queue, queue_region) # subscribe the queue to the topic with the namespaces provided topic_namespaces.each do |namespace| namespaced_topic_arn = topic_arn.gsub(":#{EventQ.namespace}-", ":#{namespace}-") # create the sns topic - this method is idempotent & returns the topic arn if it already exists @client.sns_helper.create_topic_arn("#{namespace}-#{event_type}".delete('.')) unless queue.isolated # skip subscribe if subscription for given queue/topic already exists # this is a workaround for a localstack issue: https://github.com/localstack/localstack/issues/933 return true if existing_subscription?(queue_arn, namespaced_topic_arn) EventQ.logger.debug do "[#{self.class} #subscribe] - Subscribing Queue: #{queue.name} to topic_arn: #{namespaced_topic_arn}, endpoint: #{queue_arn}" end @client.sns(topic_region).subscribe( topic_arn: namespaced_topic_arn, protocol: 'sqs', endpoint: queue_arn ) end true end
unsubscribe(_queue)
click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 52 def unsubscribe(_queue) raise "[#{self.class}] - Not implemented. Please unsubscribe the queue from the topic inside the AWS Management Console." end
Private Instance Methods
configure_queue(queue, region)
click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 58 def configure_queue(queue, region) q = @manager.get_queue(queue) queue_arn = @client.sqs_helper(region).get_queue_arn(queue) attributes = default_queue_attributes(q, queue_arn) @client.sqs(region).set_queue_attributes(attributes) queue_arn end
default_queue_attributes(queue, queue_arn)
click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 67 def default_queue_attributes(queue, queue_arn) { queue_url: queue, attributes: { 'Policy' => queue_policy(queue_arn) } } end
existing_subscription?(queue_arn, topic_arn)
click to toggle source
check if there is an existing subscription for the given queue/topic
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 94 def existing_subscription?(queue_arn, topic_arn) subscriptions = @client.sns.list_subscriptions.subscriptions subscriptions.any? { |subscription| subscription.topic_arn == topic_arn && subscription.endpoint == queue_arn } end
queue_policy(queue_arn)
click to toggle source
# File lib/eventq/eventq_aws/aws_subscription_manager.rb, line 77 def queue_policy(queue_arn) '{ "Version": "2012-10-17", "Id": "SNStoSQS", "Statement": [ { "Sid":"rule1", "Effect": "Allow", "Principal": "*", "Action": "sqs:*", "Resource": "' + queue_arn + '" } ] }' end