class ActiveMessaging::Adapters::Sqs::Connection
Constants
- GET_QUEUE_ATTRIBUTES
- NUMBER_OF_MESSAGES
- QUEUE_NAME_LENGTH
- SET_QUEUE_ATTRIBUTES
- URI_ENCODING_REPLACEMENTS
- VISIBILITY_TIMEOUT
Attributes
access_key_id[RW]
configurable params
aws_version[RW]
configurable params
cache_queue_list[RW]
configurable params
content_type[RW]
configurable params
host[RW]
configurable params
max_message_size[RW]
configurable params
poll_interval[RW]
configurable params
port[RW]
configurable params
reconnect_delay[RW]
configurable params
secret_access_key[RW]
configurable params
Public Class Methods
new(cfg)
click to toggle source
generic init method needed by a13g
# File lib/activemessaging/adapters/sqs.rb, line 32 def initialize cfg raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?) raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?) @access_key_id = cfg[:access_key_id] @secret_access_key = cfg[:secret_access_key] @region = cfg[:region] || 'us-east-1' @request_expires = cfg[:requestExpires] || 10 @request_retry_count = cfg[:requestRetryCount] || 5 @aws_version = cfg[:aws_version] || '2012-11-05' @content_type = cfg[:content_type] || 'text/plain' @host = cfg[:host] || "sqs.#{@region}.amazonaws.com" @port = cfg[:port] || 80 @protocol = cfg[:protocol] || 'http' @poll_interval = cfg[:poll_interval] || 1 @reconnect_delay = cfg[:reconnectDelay] || 5 @max_message_size = cfg[:max_message_size].to_i > 0 ? cfg[:max_message_size].to_i : 8 @aws_url = "#{@protocol}://#{@host}/" @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list] @reliable = cfg[:reliable].nil? ? true : cfg[:reliable] #initialize the subscriptions and queues @subscriptions = {} @queues_by_priority = {} @current_subscription = 0 queues end
Public Instance Methods
add_queue(url)
click to toggle source
internal data structure methods
# File lib/activemessaging/adapters/sqs.rb, line 319 def add_queue(url) q = Queue.from_url url queues[q.name] = q if self.cache_queue_list return q end
check_errors(response)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 304 def check_errors(response) raise 'http response was nil' if (response.nil?) raise response.errors if (response && response.errors?) response end
create_queue(name)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 148 def create_queue(name) validate_new_queue name response = make_request('CreateQueue', nil, { 'QueueName' => name }, { 'DelaySeconds' => 0, 'MaximumMessageSize' => 262144, 'MessageRetentionPeriod' => 4 * 24 * 60 * 60, 'ReceiveMessageWaitTimeSeconds' => 0, 'VisibilityTimeout' => 90 * 60 }) add_queue(response.get_text("/CreateQueueResponse/CreateQueueResult/QueueUrl")) unless response.nil? end
delete_message(message)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 221 def delete_message message response = make_request('DeleteMessage', message.queue.queue_url, { 'ReceiptHandle' => message.receipt_handle }) end
delete_queue(queue)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 160 def delete_queue(queue) validate_queue queue response = make_request('DeleteQueue', queue.queue_url) queues.delete(queue.name) end
disconnect()
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 63 def disconnect return true end
get_or_create_queue(queue_name)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 325 def get_or_create_queue queue_name qs = queues q = qs.has_key?(queue_name) ? qs[queue_name] : create_queue(queue_name) raise "could not get or create queue: #{queue_name}" unless q q end
get_queue_attributes(queue, attributes = ['All'])
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 173 def get_queue_attributes(queue, attributes = ['All']) params = {} attributes.each_with_index do |attribute, i| validate_get_queue_attribute(attribute) params["AttributeName.#{i+1}"] = attribute end response = make_request('GetQueueAttributes', queue.queue_url, params) attributes = {} response.each_node('/GetQueueAttributesResponse/GetQueueAttributesResult/Attribute') { |n| name = n.elements['Name'].text value = n.elements['Value'].text attributes[name] = value } attributes end
http_request(h, p, r)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 293 def http_request h, p, r http = Net::HTTP.new(h, p) # http.set_debug_output(STDOUT) http.use_ssl = 'https' == @protocol # Don't carp about SSL cert verification http.verify_mode = OpenSSL::SSL::VERIFY_NONE return http.request(r) end
list_queues(queue_name_prefix = nil)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 166 def list_queues(queue_name_prefix = nil) validate_queue_name queue_name_prefix unless queue_name_prefix.nil? params = queue_name_prefix.nil? ? {} : { "QueueNamePrefix" => queue_name_prefix } response = make_request('ListQueues', nil, params) response.nil? ? [] : response.nodes("/ListQueuesResponse/ListQueuesResult/QueueUrl").collect{ |n| add_queue(n.text) } end
make_request(action, url=nil, params = {}, attributes = {})
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 225 def make_request(action, url=nil, params = {}, attributes = {}) url ||= @aws_url params['Action'] = action params['Version'] = @aws_version params['Expires']= (Time.now + @request_expires).utc.iso8601 attributes.keys.sort.each_with_index do |k, i| params["Attributes.#{i + 1}.Name"] = k params["Attributes.#{i + 1}.Value"] = attributes[k] end # Sort and encode query params query_params = params.keys.sort.map do |key| key + "=" + url_encode(params[key]) end # Put these together with the uri to get the request query string request_url = "#{url}?#{query_params.join("&")}" # Create the request init_headers = { 'Date' => Time.now.utc.iso8601, 'Host' => @host } request = Net::HTTP::Get.new(request_url, init_headers) # Sign the request signer = AWS4Signer.new({ :access_key => @access_key_id, :secret_key => @secret_access_key, :region => @region }) headers = {} request.canonical_each { |k, v| headers[k] = v } signature = signer.sign('GET', URI.parse(request_url), headers, nil, false) signature.each { |k, v| request[k] = v } # Make the request retry_count = 0 while retry_count < @request_retry_count.to_i retry_count = retry_count + 1 begin http_response = http_request(host,port,request) response = SQSResponse.new(http_response) check_errors(response) return response rescue Object=>ex raise ex unless reliable sleep(@reconnect_delay) end end end
message_size_range()
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 352 def message_size_range @_message_size_range ||= 1..(max_message_size * 1024) end
queues()
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 310 def queues return @queues if (@queues && cache_queue_list) @queues = {} list_queues.each { |q| @queues[q.name] = q } return @queues end
receive(options = {})
click to toggle source
new receive respects priorities
# File lib/activemessaging/adapters/sqs.rb, line 103 def receive(options = {}) message = nil only_priorities = options[:priorities] # loop through the priorities @queues_by_priority.keys.sort.each do |priority| # skip this priority if there is a list, and it is not in the list next if only_priorities && !only_priorities.include?(priority.to_i) # loop through queues for the same priority in random order each time @queues_by_priority[priority].shuffle.each do |queue_name| queue = queues[queue_name] subscription = @subscriptions[queue_name] next if queue.nil? || subscription.nil? messages = retrieve_messsages(queue, 1, subscription.headers[:visibility_timeout]) if (messages && !messages.empty?) message = messages[0] end break if message end break if message end message end
received(message, headers={})
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 134 def received(message, headers={}) begin delete_message(message) rescue Object => exception logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: " logger.error exception end end
retrieve_messsages(queue, num_messages = 1, timeout = nil, waittime = nil)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 206 def retrieve_messsages(queue, num_messages = 1, timeout = nil, waittime = nil) validate_queue queue validate_number_of_messages num_messages validate_timeout timeout if timeout params = { 'MaxNumberOfMessages' => num_messages.to_s, 'AttributeName' => 'All' } params['VisibilityTimeout'] = timeout.to_s if timeout params['WaitTimeSeconds'] = waittime.to_s if waittime response = make_request('ReceiveMessage', queue.queue_url, params) response.nodes('/ReceiveMessageResponse/ReceiveMessageResult/Message').map do |n| Message.from_element(n, response, queue) end unless response.nil? end
send(queue_name, message_body, message_headers = {})
click to toggle source
queue_name string, body string, headers hash send a single message to a queue
# File lib/activemessaging/adapters/sqs.rb, line 97 def send(queue_name, message_body, message_headers = {}) queue = get_or_create_queue(queue_name) send_messsage queue, message_body end
send_messsage(queue, message)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 199 def send_messsage(queue, message) validate_queue queue validate_message message response = make_request('SendMessage', queue.queue_url, { 'MessageBody' => message }) response.get_text('/SendMessageResponse/SendMessageResult/MessageId') unless response.nil? end
set_queue_attributes(queue, attributes)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 189 def set_queue_attributes(queue, attributes) attributes.keys.each { |a| validate_set_queue_attribute(a) } response = make_request('SetQueueAttributes', queue.queue_url, {}, attributes) end
subscribe(queue_name, message_headers={})
click to toggle source
queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues
# File lib/activemessaging/adapters/sqs.rb, line 69 def subscribe queue_name, message_headers={} # look at the existing queues, create any that are missing queue = get_or_create_queue queue_name if @subscriptions.has_key? queue.name @subscriptions[queue.name].add else @subscriptions[queue.name] = Subscription.new(queue.name, message_headers) end priority = @subscriptions[queue.name].priority @queues_by_priority[priority] = [] unless @queues_by_priority.has_key?(priority) @queues_by_priority[priority] << queue.name unless @queues_by_priority[priority].include?(queue.name) end
unreceive(message, headers = {})
click to toggle source
do nothing; by not deleting the message will eventually become visible again
# File lib/activemessaging/adapters/sqs.rb, line 144 def unreceive(message, headers = {}) return true end
unsubscribe(queue_name, message_headers={})
click to toggle source
queue_name string, headers hash for sqs, attempt delete the queues, won't work if not empty, that's ok
# File lib/activemessaging/adapters/sqs.rb, line 85 def unsubscribe queue_name, message_headers={} if @subscriptions[queue_name] @subscriptions[queue_name].remove if @subscriptions[queue_name].count <= 0 sub = @subscriptions.delete(queue_name) @queues_by_priority[sub.priority].delete(queue_name) end end end
url_encode(param)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 281 def url_encode(param) param = param.to_s if param.respond_to?(:encode) param = param.encode('UTF-8') end param = CGI::escape(param) URI_ENCODING_REPLACEMENTS.each { |k, v| param = param.gsub(k, v) } param end
validate_get_queue_attribute(qa)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 360 def validate_get_queue_attribute qa raise "Queue Attribute name, #{qa}, not in list of valid attributes to get: #{GET_QUEUE_ATTRIBUTES.to_sentence}." unless GET_QUEUE_ATTRIBUTES.include?(qa) end
validate_message(m)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 347 def validate_message m raise "Message cannot be nil." if m.nil? raise "Message length, #{m.length}, must be between #{message_size_range.min} and #{message_size_range.max}." unless message_size_range.include?(m.length) end
validate_new_queue(qn)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 338 def validate_new_queue qn validate_queue_name qn raise "Queue already exists: #{qn}" if queues.has_key? qn end
validate_number_of_messages(nom)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 368 def validate_number_of_messages nom raise "Number of messages, #{nom}, must be between #{NUMBER_OF_MESSAGES.min} and #{NUMBER_OF_MESSAGES.max}." unless NUMBER_OF_MESSAGES.include?(nom) end
validate_queue(q)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 343 def validate_queue q raise "Never heard of queue, can't use it: #{q.name}" unless queues.has_key? q.name end
validate_queue_name(qn)
click to toggle source
validation methods
# File lib/activemessaging/adapters/sqs.rb, line 333 def validate_queue_name qn raise "Queue name, '#{qn}', must be between #{QUEUE_NAME_LENGTH.min} and #{QUEUE_NAME_LENGTH.max} characters." unless QUEUE_NAME_LENGTH.include?(qn.length) raise "Queue name, '#{qn}', must be alphanumeric only." if (qn =~ /[^\w\-\_]/ ) end
validate_set_queue_attribute(qa)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 364 def validate_set_queue_attribute qa raise "Queue Attribute name, #{qa}, not in list of valid attributes to set: #{SET_QUEUE_ATTRIBUTES.to_sentence}." unless SET_QUEUE_ATTRIBUTES.include?(qa) end
validate_timeout(to)
click to toggle source
# File lib/activemessaging/adapters/sqs.rb, line 356 def validate_timeout to raise "Timeout, #{to}, must be between #{VISIBILITY_TIMEOUT.min} and #{VISIBILITY_TIMEOUT.max}." unless VISIBILITY_TIMEOUT.include?(to) end