class Osbourne::Subscription

Attributes

queue[R]
topics[R]

Public Class Methods

new(topics, queue) click to toggle source
# File lib/osbourne/subscription.rb, line 8
def initialize(topics, queue)
  @topics = topics
  @queue = queue
  subscribe_all
end

Private Instance Methods

build_policy() click to toggle source
# File lib/osbourne/subscription.rb, line 35
def build_policy
  # The aws ruby SDK doesn't have a policy builder :{
  {
    "Policy" => {
      "Version"   => "2012-10-17",
      "Id"        => "Osbourne/#{queue.prefixed_name}/SNSPolicy",
      "Statement" => topics.map {|t| build_policy_statement(t) }
    }.to_json
  }
end
build_policy_statement(topic) click to toggle source
# File lib/osbourne/subscription.rb, line 46
def build_policy_statement(topic)
  {
    "Sid"       => "Sid#{topic.prefixed_name}",
    "Effect"    => "Allow",
    "Principal" => {"AWS" => "*"},
    "Action"    => "SQS:SendMessage",
    "Resource"  => queue.arn,
    "Condition" => {
      "ArnEquals" => {"aws:SourceArn" => topic.arn}
    }
  }
end
set_queue_policy() click to toggle source
# File lib/osbourne/subscription.rb, line 30
def set_queue_policy
  Osbourne.logger.info("[Osbourne] Setting policy for #{queue.prefixed_name} (attributes: #{build_policy})")
  sqs.set_queue_attributes(queue_url: queue.url, attributes: build_policy)
end
subscribe(topic) click to toggle source
# File lib/osbourne/subscription.rb, line 21
def subscribe(topic)
  Osbourne.logger.info("[Osbourne] Checking subscription for #{queue.prefixed_name} to #{topic.prefixed_name}")
  return if Osbourne.existing_subscriptions_for(topic).include? queue.arn

  Osbourne.logger.info("[Osbourne] Subscribing #{queue.prefixed_name} to #{topic.prefixed_name}")
  sns.subscribe(topic_arn: topic.arn, protocol: "sqs", endpoint: queue.arn).subscription_arn
  Osbourne.clear_subscriptions_for(topic)
end
subscribe_all() click to toggle source
# File lib/osbourne/subscription.rb, line 16
def subscribe_all
  topics.each {|topic| subscribe(topic) }
  set_queue_policy
end