class ActiveMessaging::Adapters::Sqs::Connection

Constants

GET_QUEUE_ATTRIBUTES
NUMBER_OF_MESSAGES
QUEUE_NAME_LENGTH
SET_QUEUE_ATTRIBUTES
URI_ENCODING_REPLACEMENTS
VISIBILITY_TIMEOUT

Attributes

access_key_id[RW]

configurable params

aws_version[RW]

configurable params

cache_queue_list[RW]

configurable params

content_type[RW]

configurable params

host[RW]

configurable params

max_message_size[RW]

configurable params

poll_interval[RW]

configurable params

port[RW]

configurable params

reconnect_delay[RW]

configurable params

secret_access_key[RW]

configurable params

Public Class Methods

new(cfg) click to toggle source

generic init method needed by a13g

# File lib/activemessaging/adapters/sqs.rb, line 32
def initialize cfg
  raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?)
  raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?)

  @access_key_id = cfg[:access_key_id]
  @secret_access_key = cfg[:secret_access_key]
  @region = cfg[:region]                          || 'us-east-1'
  @request_expires = cfg[:requestExpires]         || 10
  @request_retry_count = cfg[:requestRetryCount]  || 5
  @aws_version = cfg[:aws_version]                || '2012-11-05'
  @content_type = cfg[:content_type]              || 'text/plain'
  @host = cfg[:host]                              || "sqs.#{@region}.amazonaws.com"
  @port = cfg[:port]                              || 80
  @protocol = cfg[:protocol]                      || 'http'
  @poll_interval = cfg[:poll_interval]            || 1
  @reconnect_delay = cfg[:reconnectDelay]         || 5

  @max_message_size = cfg[:max_message_size].to_i > 0 ? cfg[:max_message_size].to_i : 8

  @aws_url = "#{@protocol}://#{@host}/"

  @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list]
  @reliable =         cfg[:reliable].nil?         ? true : cfg[:reliable]

  #initialize the subscriptions and queues
  @subscriptions = {}
  @queues_by_priority = {}
  @current_subscription = 0
  queues
end

Public Instance Methods

add_queue(url) click to toggle source

internal data structure methods

# File lib/activemessaging/adapters/sqs.rb, line 319
def add_queue(url)
  q = Queue.from_url url
  queues[q.name] = q if self.cache_queue_list
  return q
end
check_errors(response) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 304
def check_errors(response)
  raise 'http response was nil' if (response.nil?)
  raise response.errors if (response && response.errors?)
  response
end
create_queue(name) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 148
def create_queue(name)
  validate_new_queue name
        response = make_request('CreateQueue', nil, { 'QueueName' => name }, {
    'DelaySeconds' => 0,
    'MaximumMessageSize' => 262144,
    'MessageRetentionPeriod' => 4 * 24 * 60 * 60,
    'ReceiveMessageWaitTimeSeconds' => 0,
    'VisibilityTimeout' => 90 * 60
  })
  add_queue(response.get_text("/CreateQueueResponse/CreateQueueResult/QueueUrl")) unless response.nil?
end
delete_message(message) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 221
def delete_message message
  response = make_request('DeleteMessage', message.queue.queue_url, { 'ReceiptHandle' => message.receipt_handle })
end
delete_queue(queue) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 160
def delete_queue(queue)
  validate_queue queue
  response = make_request('DeleteQueue', queue.queue_url)
  queues.delete(queue.name)
end
disconnect() click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 63
def disconnect
  return true
end
get_or_create_queue(queue_name) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 325
def get_or_create_queue queue_name
  qs = queues
  q = qs.has_key?(queue_name) ? qs[queue_name] : create_queue(queue_name)
  raise "could not get or create queue: #{queue_name}" unless q
  q
end
get_queue_attributes(queue, attributes = ['All']) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 173
def get_queue_attributes(queue, attributes = ['All'])
  params = {}
  attributes.each_with_index do |attribute, i|
    validate_get_queue_attribute(attribute)
    params["AttributeName.#{i+1}"] = attribute
  end
  response = make_request('GetQueueAttributes', queue.queue_url, params)
  attributes = {}
  response.each_node('/GetQueueAttributesResponse/GetQueueAttributesResult/Attribute') { |n|
    name = n.elements['Name'].text
    value = n.elements['Value'].text
    attributes[name] = value
  }
  attributes
end
http_request(h, p, r) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 293
def http_request h, p, r
  http = Net::HTTP.new(h, p)
  # http.set_debug_output(STDOUT)

  http.use_ssl = 'https' == @protocol

  # Don't carp about SSL cert verification
  http.verify_mode = OpenSSL::SSL::VERIFY_NONE
  return http.request(r)
end
list_queues(queue_name_prefix = nil) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 166
def list_queues(queue_name_prefix = nil)
  validate_queue_name queue_name_prefix unless queue_name_prefix.nil?
  params = queue_name_prefix.nil? ? {} : { "QueueNamePrefix" => queue_name_prefix }
        response = make_request('ListQueues', nil, params)
  response.nil? ? [] : response.nodes("/ListQueuesResponse/ListQueuesResult/QueueUrl").collect{ |n| add_queue(n.text) }
end
make_request(action, url=nil, params = {}, attributes = {}) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 225
def make_request(action, url=nil, params = {}, attributes = {})
  url ||= @aws_url

        params['Action'] = action
        params['Version'] = @aws_version
  params['Expires']= (Time.now + @request_expires).utc.iso8601

  attributes.keys.sort.each_with_index do |k, i|
    params["Attributes.#{i + 1}.Name"] = k
    params["Attributes.#{i + 1}.Value"] = attributes[k]
  end

  # Sort and encode query params
  query_params = params.keys.sort.map do |key|
    key + "=" + url_encode(params[key])
  end

  # Put these together with the uri to get the request query string
  request_url = "#{url}?#{query_params.join("&")}"

  # Create the request
  init_headers = {
    'Date' => Time.now.utc.iso8601,
    'Host' => @host
  }
  request = Net::HTTP::Get.new(request_url, init_headers)

  # Sign the request
  signer = AWS4Signer.new({
    :access_key => @access_key_id,
    :secret_key => @secret_access_key,
    :region => @region
  })

  headers = {}
  request.canonical_each { |k, v| headers[k] = v }

  signature = signer.sign('GET', URI.parse(request_url), headers, nil, false)
  signature.each { |k, v| request[k] = v }

  # Make the request
  retry_count = 0
  while retry_count < @request_retry_count.to_i
          retry_count = retry_count + 1
    begin
      http_response = http_request(host,port,request)
      response = SQSResponse.new(http_response)
      check_errors(response)
      return response
    rescue Object=>ex
      raise ex unless reliable
                  sleep(@reconnect_delay)
    end
  end
end
message_size_range() click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 352
def message_size_range
  @_message_size_range ||= 1..(max_message_size * 1024)
end
queues() click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 310
def queues
  return @queues if (@queues && cache_queue_list)
  @queues = {}
  list_queues.each { |q| @queues[q.name] = q }
  return @queues
end
receive(options = {}) click to toggle source

new receive respects priorities

# File lib/activemessaging/adapters/sqs.rb, line 103
def receive(options = {})
  message = nil

  only_priorities = options[:priorities]

  # loop through the priorities
  @queues_by_priority.keys.sort.each do |priority|

    # skip this priority if there is a list, and it is not in the list
    next if only_priorities && !only_priorities.include?(priority.to_i)

    # loop through queues for the same priority in random order each time
    @queues_by_priority[priority].shuffle.each do |queue_name|
      queue = queues[queue_name]
      subscription = @subscriptions[queue_name]

      next if queue.nil? || subscription.nil?
      messages = retrieve_messsages(queue, 1, subscription.headers[:visibility_timeout])

      if (messages && !messages.empty?)
        message = messages[0]
      end

      break if message
    end

    break if message
  end
  message
end
received(message, headers={}) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 134
def received(message, headers={})
  begin
    delete_message(message)
  rescue Object => exception
    logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: "
    logger.error exception
  end
end
retrieve_messsages(queue, num_messages = 1, timeout = nil, waittime = nil) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 206
def retrieve_messsages(queue, num_messages = 1, timeout = nil, waittime = nil)
  validate_queue queue
  validate_number_of_messages num_messages
  validate_timeout timeout if timeout

  params = { 'MaxNumberOfMessages' => num_messages.to_s, 'AttributeName' => 'All' }
  params['VisibilityTimeout'] = timeout.to_s if timeout
  params['WaitTimeSeconds'] = waittime.to_s if waittime

  response = make_request('ReceiveMessage', queue.queue_url, params)
  response.nodes('/ReceiveMessageResponse/ReceiveMessageResult/Message').map do |n|
    Message.from_element(n, response, queue)
  end unless response.nil?
end
send(queue_name, message_body, message_headers = {}) click to toggle source

queue_name string, body string, headers hash send a single message to a queue

# File lib/activemessaging/adapters/sqs.rb, line 97
def send(queue_name, message_body, message_headers = {})
  queue = get_or_create_queue(queue_name)
  send_messsage queue, message_body
end
send_messsage(queue, message) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 199
def send_messsage(queue, message)
  validate_queue queue
  validate_message message
  response = make_request('SendMessage', queue.queue_url, { 'MessageBody' => message })
  response.get_text('/SendMessageResponse/SendMessageResult/MessageId') unless response.nil?
end
set_queue_attributes(queue, attributes) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 189
def set_queue_attributes(queue, attributes)
  attributes.keys.each { |a| validate_set_queue_attribute(a) }
  response = make_request('SetQueueAttributes', queue.queue_url, {}, attributes)
end
subscribe(queue_name, message_headers={}) click to toggle source

queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues

# File lib/activemessaging/adapters/sqs.rb, line 69
def subscribe queue_name, message_headers={}
  # look at the existing queues, create any that are missing
  queue = get_or_create_queue queue_name
  if @subscriptions.has_key? queue.name
    @subscriptions[queue.name].add
  else
    @subscriptions[queue.name] = Subscription.new(queue.name, message_headers)
  end
  priority = @subscriptions[queue.name].priority

  @queues_by_priority[priority] = [] unless @queues_by_priority.has_key?(priority)
  @queues_by_priority[priority] << queue.name unless @queues_by_priority[priority].include?(queue.name)
end
unreceive(message, headers = {}) click to toggle source

do nothing; by not deleting the message will eventually become visible again

# File lib/activemessaging/adapters/sqs.rb, line 144
def unreceive(message, headers = {})
  return true
end
unsubscribe(queue_name, message_headers={}) click to toggle source

queue_name string, headers hash for sqs, attempt delete the queues, won't work if not empty, that's ok

# File lib/activemessaging/adapters/sqs.rb, line 85
def unsubscribe queue_name, message_headers={}
  if @subscriptions[queue_name]
    @subscriptions[queue_name].remove
    if @subscriptions[queue_name].count <= 0
      sub = @subscriptions.delete(queue_name)
      @queues_by_priority[sub.priority].delete(queue_name)
    end
  end
end
url_encode(param) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 281
def url_encode(param)
  param = param.to_s

  if param.respond_to?(:encode)
    param = param.encode('UTF-8')
  end

  param = CGI::escape(param)
  URI_ENCODING_REPLACEMENTS.each { |k, v| param = param.gsub(k, v) }
  param
end
validate_get_queue_attribute(qa) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 360
def validate_get_queue_attribute qa
  raise "Queue Attribute name, #{qa}, not in list of valid attributes to get: #{GET_QUEUE_ATTRIBUTES.to_sentence}." unless GET_QUEUE_ATTRIBUTES.include?(qa)
end
validate_message(m) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 347
def validate_message m
  raise "Message cannot be nil." if m.nil?
  raise "Message length, #{m.length}, must be between #{message_size_range.min} and #{message_size_range.max}." unless message_size_range.include?(m.length)
end
validate_new_queue(qn) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 338
def validate_new_queue qn
  validate_queue_name qn
  raise "Queue already exists: #{qn}" if queues.has_key? qn
end
validate_number_of_messages(nom) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 368
def validate_number_of_messages nom
  raise "Number of messages, #{nom}, must be between #{NUMBER_OF_MESSAGES.min} and #{NUMBER_OF_MESSAGES.max}." unless NUMBER_OF_MESSAGES.include?(nom)
end
validate_queue(q) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 343
def validate_queue q
    raise "Never heard of queue, can't use it: #{q.name}" unless queues.has_key? q.name
end
validate_queue_name(qn) click to toggle source

validation methods

# File lib/activemessaging/adapters/sqs.rb, line 333
def validate_queue_name qn
  raise "Queue name, '#{qn}', must be between #{QUEUE_NAME_LENGTH.min} and #{QUEUE_NAME_LENGTH.max} characters." unless QUEUE_NAME_LENGTH.include?(qn.length)
  raise "Queue name, '#{qn}', must be alphanumeric only." if (qn =~ /[^\w\-\_]/ )
end
validate_set_queue_attribute(qa) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 364
def validate_set_queue_attribute qa
  raise "Queue Attribute name, #{qa}, not in list of valid attributes to set: #{SET_QUEUE_ATTRIBUTES.to_sentence}." unless SET_QUEUE_ATTRIBUTES.include?(qa)
end
validate_timeout(to) click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 356
def validate_timeout to
  raise "Timeout, #{to}, must be between #{VISIBILITY_TIMEOUT.min} and #{VISIBILITY_TIMEOUT.max}." unless VISIBILITY_TIMEOUT.include?(to)
end