class EventQ::Amazon::QueueWorker
Constants
- APPROXIMATE_RECEIVE_COUNT
- AWS_MAX_VISIBILITY_TIMEOUT
- MESSAGE
Attributes
context[RW]
Public Class Methods
new()
click to toggle source
# File lib/eventq/eventq_aws/aws_queue_worker.rb, line 17 def initialize @serialization_provider_manager = EventQ::SerializationProviders::Manager.new @signature_provider_manager = EventQ::SignatureProviders::Manager.new @calculate_visibility_timeout = Amazon::CalculateVisibilityTimeout.new( max_timeout: AWS_MAX_VISIBILITY_TIMEOUT ) end
Public Instance Methods
acknowledge_message(poller, msg)
click to toggle source
Logic for the RabbitMq
adapter when a message is accepted
# File lib/eventq/eventq_aws/aws_queue_worker.rb, line 77 def acknowledge_message(poller, msg) poller.delete_message(msg) end
configure(options = {})
click to toggle source
# File lib/eventq/eventq_aws/aws_queue_worker.rb, line 70 def configure(options = {}) options[:queue_poll_wait] ||= 10 EventQ.logger.info("[#{self.class}] - Configuring. Queue Poll Wait: #{options[:queue_poll_wait]}") end
deserialize_message(payload)
click to toggle source
# File lib/eventq/eventq_aws/aws_queue_worker.rb, line 60 def deserialize_message(payload) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) provider.deserialize(payload) end
pre_process(context, options)
click to toggle source
# File lib/eventq/eventq_aws/aws_queue_worker.rb, line 25 def pre_process(context, options) # don't do anything specific to set up the process before threads are fired off. end
serialize_message(msg)
click to toggle source
# File lib/eventq/eventq_aws/aws_queue_worker.rb, line 65 def serialize_message(msg) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) provider.serialize(msg) end
thread_process_iteration(queue, options, block)
click to toggle source
# File lib/eventq/eventq_aws/aws_queue_worker.rb, line 29 def thread_process_iteration(queue, options, block) client = options[:client] manager = options[:manager] || EventQ::Amazon::QueueManager.new({ client: client }) # get the queue queue_url = manager.get_queue(queue) poller = Aws::SQS::QueuePoller.new(queue_url, attribute_names: [APPROXIMATE_RECEIVE_COUNT]) # Polling will block indefinitely unless we force it to stop poller.before_request do |stats| unless context.running? EventQ.logger.info("AWS Poller shutting down") throw :stop_polling end end poller.poll(skip_delete: true, wait_time_seconds: options[:queue_poll_wait]) do |msg, stats| begin tag_processing_thread process_message(msg, poller, queue, block) rescue => e EventQ.logger.error do "[#{self.class}] - An unhandled error occurred. Error: #{e} | Backtrace: #{e.backtrace}" end context.call_on_error_block(error: e) ensure untag_processing_thread end end end
Private Instance Methods
process_message(msg, poller, queue, block)
click to toggle source
# File lib/eventq/eventq_aws/aws_queue_worker.rb, line 83 def process_message(msg, poller, queue, block) retry_attempts = msg.attributes[APPROXIMATE_RECEIVE_COUNT].to_i - 1 # deserialize the message payload payload = JSON.load(msg.body) message = deserialize_message(payload[MESSAGE]) @signature_provider_manager.validate_signature(message: message, queue: queue) status, message_args = context.process_message(block, message, retry_attempts, [poller, msg]) case status when :duplicate # don't do anything, this is previous logic. Not sure it is correct when :accepted # Acceptance was handled directly when QueueWorker#process_message was called when :reject reject_message(queue, poller, msg, retry_attempts, message, message_args) else raise "Unrecognized status: #{status}" end end
reject_message(queue, poller, msg, retry_attempts, message, args)
click to toggle source
# File lib/eventq/eventq_aws/aws_queue_worker.rb, line 106 def reject_message(queue, poller, msg, retry_attempts, message, args) if !queue.allow_retry || retry_attempts >= queue.max_retry_attempts EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected removing from queue. Message: #{serialize_message(message)}") # remove the message from the queue so that it does not get retried again poller.delete_message(msg) if retry_attempts >= queue.max_retry_attempts EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Retry attempt limit exceeded.") context.call_on_retry_exceeded_block(message) end elsif queue.allow_retry retry_attempts += 1 EventQ.logger.warn("[#{self.class}] - Message Id: #{args.id}. Rejected requesting retry. Attempts: #{retry_attempts}") visibility_timeout = @calculate_visibility_timeout.call( retry_attempts: retry_attempts, queue_settings: { allow_retry_back_off: queue.allow_retry_back_off, max_retry_delay: queue.max_retry_delay, retry_back_off_grace: queue.retry_back_off_grace, retry_back_off_weight: queue.retry_back_off_weight, retry_delay: queue.retry_delay } ) EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{visibility_timeout}" } poller.change_message_visibility_timeout(msg, visibility_timeout) context.call_on_retry_block(message) end end