class EventQ::RabbitMq::QueueWorker
Attributes
context[RW]
is_running[RW]
Public Class Methods
new()
click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb, line 8 def initialize @serialization_provider_manager = EventQ::SerializationProviders::Manager.new @signature_provider_manager = EventQ::SignatureProviders::Manager.new end
Public Instance Methods
acknowledge_message(channel, delivery_tag)
click to toggle source
Logic for the RabbitMq
adapter when a message is accepted
# File lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb, line 106 def acknowledge_message(channel, delivery_tag) channel.acknowledge(delivery_tag, false) end
configure(options = {})
click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb, line 101 def configure(options = {}) options[:durable] ||= true end
deserialize_message(payload)
click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb, line 56 def deserialize_message(payload) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.deserialize(payload) end
pre_process(context, options)
click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb, line 13 def pre_process(context, options) manager = EventQ::RabbitMq::QueueManager.new manager.durable = options[:durable] options[:manager] = manager connection = options[:client].dup.get_connection options[:connection] = connection end
reject_message(channel, message, delivery_tag, retry_exchange, queue, abort)
click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb, line 66 def reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) EventQ.logger.info("[#{self.class}] - Message rejected removing from queue.") # reject the message to remove from queue channel.reject(delivery_tag, false) # check if the message retry limit has been exceeded if message.retry_attempts >= queue.max_retry_attempts EventQ.logger.info("[#{self.class}] - Message retry attempt limit exceeded. Msg: #{serialize_message(message)}") context.call_on_retry_exceeded_block(message) # check if the message is allowed to be retried elsif queue.allow_retry message.retry_attempts += 1 retry_attempts = message.retry_attempts - queue.retry_back_off_grace retry_attempts = 1 if retry_attempts < 1 if queue.allow_retry_back_off == true message_ttl = retry_attempts * queue.retry_delay if (retry_attempts * queue.retry_delay) > queue.max_retry_delay EventQ.logger.debug { "[#{self.class}] - Max message back off retry delay reached." } message_ttl = queue.max_retry_delay end else message_ttl = queue.retry_delay end EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{message_ttl}" } retry_exchange.publish(serialize_message(message), :expiration => message_ttl) context.call_on_retry_block(message) end return true end
serialize_message(msg)
click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb, line 61 def serialize_message(msg) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.serialize(msg) end
thread_process_iteration(queue, options, block)
click to toggle source
This method should not be called iteratively and will sit in a loop The reason is because this uses a push notification from the subscribe mechanism to trigger the block and will exit if you do not block.
# File lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb, line 25 def thread_process_iteration(queue, options, block) manager = options[:manager] channel = options[:connection].create_channel channel.prefetch(1) q = manager.get_queue(channel, queue) retry_exchange = manager.get_retry_exchange(channel, queue) q.subscribe(:manual_ack => true, :block => false, :exclusive => false) do |delivery_info, properties, payload| begin tag_processing_thread process_message(payload, queue, channel, retry_exchange, delivery_info.delivery_tag, block) rescue => e EventQ.logger.error( "[#{self.class}] - An error occurred attempting to process a message. Error: #{e} | "\ "Backtrace: #{e.backtrace}" ) context.call_on_error_block(error: e) ensure untag_processing_thread end end # we don't want to stop the subscribe process as it will not block. sleep 5 while context.running? if channel != nil && channel.open? channel.close end end
Private Instance Methods
process_message(payload, queue, channel, retry_exchange, delivery_tag, block)
click to toggle source
# File lib/eventq/eventq_rabbitmq/rabbitmq_queue_worker.rb, line 112 def process_message(payload, queue, channel, retry_exchange, delivery_tag, block) message = deserialize_message(payload) retry_attempts = message.retry_attempts @signature_provider_manager.validate_signature(message: message, queue: queue) status, message_args = context.process_message(block, message, retry_attempts, [channel, delivery_tag]) case status when :duplicate channel.acknowledge(delivery_tag, false) when :accepted # Acceptance was handled directly when QueueWorker#process_message was called when :reject reject_message(channel, message, delivery_tag, retry_exchange, queue, message_args.abort) else raise "Unrecognized status: #{status}" end end