class EventQ::RabbitMq::QueueWorkerV2
Attributes
is_running[RW]
Public Class Methods
new()
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 8 def initialize @forks = [] @is_running = false @retry_exceeded_block = nil @on_retry_block = nil @on_error_block = nil @hash_helper = HashKit::Helper.new @serialization_provider_manager = EventQ::SerializationProviders::Manager.new @signature_provider_manager = EventQ::SignatureProviders::Manager.new end
Public Instance Methods
call_on_error_block(error:, message: nil)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 103 def call_on_error_block(error:, message: nil) if @on_error_block EventQ.logger.debug { "[#{self.class}] - Executing on_error block." } begin @on_error_block.call(error, message) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_error block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_error block specified to execute." } end end
call_on_retry_block(message)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 172 def call_on_retry_block(message) if @on_retry_block EventQ.logger.debug { "[#{self.class}] - Executing on_retry block." } begin @on_retry_block.call(message, abort) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_retry block specified." } end end
call_on_retry_exceeded_block(message)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 159 def call_on_retry_exceeded_block(message) if @retry_exceeded_block != nil EventQ.logger.debug { "[#{self.class}] - Executing on_retry_exceeded block." } begin @retry_exceeded_block.call(message) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry_exceeded block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_retry_exceeded block specified." } end end
configure(queue, options = {})
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 227 def configure(queue, options = {}) @queue = queue if options.key?(:thread_count) EventQ.logger.warn("[#{self.class}] - :thread_count is deprecated.") end if options.key?(:sleep) EventQ.logger.warn("[#{self.class}] - :sleep is deprecated.") end @fork_count = 1 if options.key?(:fork_count) @fork_count = options[:fork_count] end EventQ.logger.info( "[#{self.class}] - Configuring. Process Count: #{@fork_count}." ) return true end
deserialize_message(payload)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 149 def deserialize_message(payload) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.deserialize(payload) end
on_error(&block)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 140 def on_error(&block) @on_error_block = block return nil end
on_retry(&block)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 135 def on_retry(&block) @on_retry_block = block return nil end
on_retry_exceeded(&block)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 130 def on_retry_exceeded(&block) @retry_exceeded_block = block return nil end
reject_message(channel, message, delivery_tag, retry_exchange, queue, abort)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 185 def reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) EventQ.logger.info("[#{self.class}] - Message rejected removing from queue.") # reject the message to remove from queue channel.reject(delivery_tag, false) # check if the message retry limit has been exceeded if message.retry_attempts >= queue.max_retry_attempts EventQ.logger.info("[#{self.class}] - Message retry attempt limit exceeded. Msg: #{serialize_message(message)}") call_on_retry_exceeded_block(message) # check if the message is allowed to be retried elsif queue.allow_retry EventQ.logger.debug { "[#{self.class}] - Incrementing retry attempts count." } message.retry_attempts += 1 if queue.allow_retry_back_off == true EventQ.logger.debug do "[#{self.class}] - Calculating message back off retry delay. "\ "Attempts: #{message.retry_attempts} * Retry Delay: #{queue.retry_delay}" end message_ttl = message.retry_attempts * queue.retry_delay if (message.retry_attempts * queue.retry_delay) > queue.max_retry_delay EventQ.logger.debug { "[#{self.class}] - Max message back off retry delay reached." } message_ttl = queue.max_retry_delay end else EventQ.logger.debug { "[#{self.class}] - Setting fixed retry delay for message." } message_ttl = queue.retry_delay end EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{message_ttl}" } retry_exchange.publish(serialize_message(message), :expiration => message_ttl) EventQ.logger.debug { "[#{self.class}] - Published message to retry exchange." } call_on_retry_block(message) end return true end
running?()
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 145 def running? return @is_running end
serialize_message(msg)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 154 def serialize_message(msg) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.serialize(msg) end
start(queue, options = {}, &block)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 20 def start(queue, options = {}, &block) EventQ.logger.info("[#{self.class}] - Preparing to start listening for messages.") configure(queue, options) raise "[#{self.class}] - Worker is already running." if running? if options[:client] == nil raise "[#{self.class}] - :client (QueueClient) must be specified." end EventQ.logger.info("[#{self.class}] - Listening for messages.") EventQ.logger.debug do "[#{self.class} #start] - Listening for messages on queue: #{EventQ.create_queue_name(queue.name)}" end @forks = [] if @fork_count > 1 Thread.new do @fork_count.times do pid = fork do start_process(options, queue, block) end @forks.push(pid) end @forks.each { |pid| Process.wait(pid) } end else start_process(options, queue, block) end end
start_process(options, queue, block)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 54 def start_process(options, queue, block) @is_running = true %w'INT TERM'.each do |sig| Signal.trap(sig) { stop exit } end if !options.key?(:durable) options[:durable] = true end client = options[:client].dup manager = EventQ::RabbitMq::QueueManager.new manager.durable = options[:durable] @connection = client.get_connection channel = @connection.create_channel channel.prefetch(1) q = manager.get_queue(channel, queue) retry_exchange = manager.get_retry_exchange(channel, queue) q.subscribe(:manual_ack => true, :block => false, :exclusive => false) do |delivery_info, properties, payload| begin tag_processing_thread process_message(payload, queue, channel, retry_exchange, delivery_info.delivery_tag, block) rescue => e EventQ.logger.error( "[#{self.class}] - An error occurred attempting to process a message. Error: #{e} | "\ "Backtrace: #{e.backtrace}" ) call_on_error_block(error: e) ensure untag_processing_thread end end if (options.key?(:wait) && options[:wait] == true) || (options.key?(:fork_count) && options[:fork_count] > 1) while running? do sleep 5 end end return true end
stop()
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 116 def stop EventQ.logger.info { "[#{self.class}] - Stopping..." } @is_running = false unless @connection.nil? begin @connection.close if @connection.open? rescue Timeout::Error EventQ.logger.error { 'Timeout occurred closing connection.' } end end return true end
Private Instance Methods
process_message(payload, queue, channel, retry_exchange, delivery_tag, block)
click to toggle source
# File lib/eventq_rabbitmq/rabbitmq_queue_worker_v2.rb, line 252 def process_message(payload, queue, channel, retry_exchange, delivery_tag, block) abort = false error = false message = deserialize_message(payload) EventQ.logger.info("[#{self.class}] - Message received. Retry Attempts: #{message.retry_attempts}") @signature_provider_manager.validate_signature(message: message, queue: queue) message_args = EventQ::MessageArgs.new( type: message.type, retry_attempts: message.retry_attempts, context: message.context, content_type: message.content_type, id: message.id, sent: message.created ) if(!EventQ::NonceManager.is_allowed?(message.id)) EventQ.logger.info("[#{self.class}] - Duplicate Message received. Dropping message.") channel.acknowledge(delivery_tag, false) return false end # begin worker block for queue message begin block.call(message.content, message_args) if message_args.abort == true abort = true EventQ.logger.info("[#{self.class}] - Message aborted.") else # accept the message as processed channel.acknowledge(delivery_tag, false) EventQ.logger.info("[#{self.class}] - Message acknowledged.") end rescue => e EventQ.logger.error do "[#{self.class}] - An unhandled error happened attempting to process a queue message. "\ "Error: #{e} | Backtrace: #{e.backtrace}" end error = true call_on_error_block(error: e, message: message) end if error || abort EventQ::NonceManager.failed(message.id) reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) else EventQ::NonceManager.complete(message.id) end end