class EventQ::RabbitMq::QueueWorker
Attributes
is_running[RW]
Public Class Methods
new()
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 10 def initialize @is_running = false @retry_exceeded_block = nil @on_retry_block = nil @on_error_block = nil @hash_helper = HashKit::Helper.new @serialization_provider_manager = EventQ::SerializationProviders::Manager.new @signature_provider_manager = EventQ::SignatureProviders::Manager.new @last_gc_flush = Time.now @gc_flush_interval = 10 end
Public Instance Methods
call_on_error_block(error:, message: nil)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 126 def call_on_error_block(error:, message: nil) if @on_error_block EventQ.logger.debug { "[#{self.class}] - Executing on_error block." } begin @on_error_block.call(error, message) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_error block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_error block specified to execute." } end end
call_on_retry_block(message)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 233 def call_on_retry_block(message) if @on_retry_block EventQ.logger.debug { "[#{self.class}] - Executing on_retry block." } begin @on_retry_block.call(message, abort) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_retry block specified." } end end
call_on_retry_exceeded_block(message)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 220 def call_on_retry_exceeded_block(message) if @retry_exceeded_block != nil EventQ.logger.debug { "[#{self.class}] - Executing on_retry_exceeded block." } begin @retry_exceeded_block.call(message) rescue => e EventQ.logger.error("[#{self.class}] - An error occurred executing the on_retry_exceeded block. Error: #{e}") end else EventQ.logger.debug { "[#{self.class}] - No on_retry_exceeded block specified." } end end
configure(queue, options = {})
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 289 def configure(queue, options = {}) @queue = queue #default thread count @thread_count = 4 if options.key?(:thread_count) @thread_count = options[:thread_count] end #default sleep time in seconds @sleep = 15 if options.key?(:sleep) @sleep = options[:sleep] end @gc_flush_interval = 10 if options.key?(:gc_flush_interval) @gc_flush_interval = options[:gc_flush_interval] end EventQ.logger.info("[#{self.class}] - Configuring. Thread Count: #{@thread_count} | Interval Sleep: #{@sleep}.") return true end
deserialize_message(payload)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 210 def deserialize_message(payload) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.deserialize(payload) end
gc_flush()
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 139 def gc_flush if Time.now - last_gc_flush > @gc_flush_interval GC.start @last_gc_flush = Time.now end end
last_gc_flush()
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 146 def last_gc_flush @last_gc_flush end
on_error(&block)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 201 def on_error(&block) @on_error_block = block return nil end
on_retry(&block)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 196 def on_retry(&block) @on_retry_block = block return nil end
on_retry_exceeded(&block)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 191 def on_retry_exceeded(&block) @retry_exceeded_block = block return nil end
reject_message(channel, message, delivery_tag, retry_exchange, queue, abort)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 246 def reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) EventQ.logger.info("[#{self.class}] - Message rejected removing from queue.") #reject the message to remove from queue channel.reject(delivery_tag, false) #check if the message retry limit has been exceeded if message.retry_attempts >= queue.max_retry_attempts EventQ.logger.info("[#{self.class}] - Message retry attempt limit exceeded. Msg: #{serialize_message(message)}") call_on_retry_exceeded_block(message) #check if the message is allowed to be retried elsif queue.allow_retry EventQ.logger.debug { "[#{self.class}] - Incrementing retry attempts count." } message.retry_attempts += 1 if queue.allow_retry_back_off == true EventQ.logger.debug { "[#{self.class}] - Calculating message back off retry delay. Attempts: #{message.retry_attempts} * Retry Delay: #{queue.retry_delay}" } message_ttl = message.retry_attempts * queue.retry_delay if (message.retry_attempts * queue.retry_delay) > queue.max_retry_delay EventQ.logger.debug { "[#{self.class}] - Max message back off retry delay reached." } message_ttl = queue.max_retry_delay end else EventQ.logger.debug { "[#{self.class}] - Setting fixed retry delay for message." } message_ttl = queue.retry_delay end EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{message_ttl}" } retry_exchange.publish(serialize_message(message), :expiration => message_ttl) EventQ.logger.debug { "[#{self.class}] - Published message to retry exchange." } call_on_retry_block(message) end return true end
running?()
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 206 def running? return @is_running end
serialize_message(msg)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 215 def serialize_message(msg) provider = @serialization_provider_manager.get_provider(EventQ::Configuration.serialization_provider) return provider.serialize(msg) end
start(queue, options = {}, &block)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 23 def start(queue, options = {}, &block) EventQ.logger.info("[#{self.class}] - Preparing to start listening for messages.") configure(queue, options) raise "[#{self.class}] - Worker is already running." if running? if options[:client] == nil raise "[#{self.class}] - :client (QueueClient) must be specified." end EventQ.logger.info("[#{self.class}] - Listening for messages.") EventQ.logger.debug do "[#{self.class} #start] - Listening for messages on queue: #{EventQ.create_queue_name(queue.name)}" end start_process(options, queue, block) return true end
start_process(options, queue, block)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 45 def start_process(options, queue, block) @is_running = true %w'INT TERM'.each do |sig| Signal.trap(sig) { stop exit } end if !options.key?(:durable) options[:durable] = true end client = options[:client] manager = EventQ::RabbitMq::QueueManager.new manager.durable = options[:durable] @connection = client.get_connection @executor = java.util.concurrent.Executors::newFixedThreadPool @thread_count #loop through each thread count @thread_count.times do @executor.execute do #begin the queue loop for this thread while true do #check if the worker is still allowed to run and break out of thread loop if not unless running? break end if @executor.is_shutdown break end has_received_message = false begin channel = @connection.create_channel has_received_message = thread_process_iteration(channel, manager, queue, block) rescue => e EventQ.logger.error("An unhandled error occurred. Error: #{e} | Backtrace: #{e.backtrace}") call_on_error_block(error: e) end if channel != nil && channel.open? channel.close end gc_flush if !has_received_message EventQ.logger.debug { "[#{self.class}] - No message received." } if @sleep > 0 EventQ.logger.debug { "[#{self.class}] - Sleeping for #{@sleep} seconds" } sleep(@sleep) end end end end end if options.key?(:wait) && options[:wait] == true while running? do end @connection.close if @connection.open? end return true end
stop()
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 181 def stop EventQ.logger.info { "[#{self.class}] - Stopping..." } @is_running = false @executor.shutdown if @connection != nil @connection.close if @connection.open? end return true end
thread_process_iteration(channel, manager, queue, block)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 150 def thread_process_iteration(channel, manager, queue, block) #get the queue q = manager.get_queue(channel, queue) retry_exchange = manager.get_retry_exchange(channel, queue) received = false begin delivery_info, payload = manager.pop_message(queue: q) #check that message was received if payload != nil received = true begin tag_processing_thread process_message(payload, queue, channel, retry_exchange, delivery_info, block) ensure untag_processing_thread end end rescue => e EventQ.logger.error("[#{self.class}] - An error occurred attempting to process a message. Error: #{e} | Backtrace: #{e.backtrace}") call_on_error_block(error: e) end return received end
Private Instance Methods
process_message(payload, queue, channel, retry_exchange, delivery_tag, block)
click to toggle source
# File lib/eventq_rabbitmq/jruby/rabbitmq_queue_worker.rb, line 318 def process_message(payload, queue, channel, retry_exchange, delivery_tag, block) abort = false error = false message = deserialize_message(payload) EventQ.logger.info("[#{self.class}] - Message received. Retry Attempts: #{message.retry_attempts}") @signature_provider_manager.validate_signature(message: message, queue: queue) message_args = EventQ::MessageArgs.new(type: message.type, retry_attempts: message.retry_attempts, context: message.context, content_type: message.content_type) if(!EventQ::NonceManager.is_allowed?(message.id)) EventQ.logger.info("[#{self.class}] - Duplicate Message received. Dropping message.") channel.acknowledge(delivery_tag, false) return false end #begin worker block for queue message begin block.call(message.content, message_args) if message_args.abort == true abort = true EventQ.logger.info("[#{self.class}] - Message aborted.") else #accept the message as processed channel.acknowledge(delivery_tag, false) EventQ.logger.info("[#{self.class}] - Message acknowledged.") end rescue => e EventQ.logger.error("[#{self.class}] - An unhandled error happened attempting to process a queue message. Error: #{e} | Backtrace: #{e.backtrace}") error = true call_on_error_block(error: e, message: message) end if error || abort EventQ::NonceManager.failed(message.id) reject_message(channel, message, delivery_tag, retry_exchange, queue, abort) else EventQ::NonceManager.complete(message.id) end end