class RabbitJobs::Worker
Worker
daemon.
Attributes
consumer[R]
process_name[RW]
Public Class Methods
new(*queues)
click to toggle source
Workers should be initialized with an array of string queue names. The order is important: a Worker
will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker
will again check the first queue given, and so forth. In this way the queue list passed to a Worker
on startup defines the priorities of queues.
If passed a single “*”, this Worker
will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.
# File lib/rabbit_jobs/worker.rb, line 35 def initialize(*queues) @queues = queues.map { |queue| queue.to_s.strip }.flatten.uniq if @queues == ['*'] || @queues.empty? @queues = RabbitJobs.config.routing_keys end fail 'Cannot initialize worker without queues.' if @queues.empty? end
Public Instance Methods
consumer=(value)
click to toggle source
# File lib/rabbit_jobs/worker.rb, line 13 def consumer=(value) unless value.respond_to?(:process_message) fail ArgumentError, 'value must implement #process_message' end @consumer = value end
queue_params(routing_key)
click to toggle source
# File lib/rabbit_jobs/worker.rb, line 20 def queue_params(routing_key) RJ.config[:queues][routing_key.to_sym] end
queues()
click to toggle source
# File lib/rabbit_jobs/worker.rb, line 44 def queues @queues || [] end
work()
click to toggle source
Subscribes to queue and working on jobs
# File lib/rabbit_jobs/worker.rb, line 49 def work return false unless startup @consumer ||= RJ::Consumer::JobConsumer.new $0 = process_name || "rj_worker (#{queues.join(', ')})" @processed_count = 0 begin consumer_channel.prefetch(1) queues.each do |routing_key| consume_queue(routing_key) end RJ.logger.info 'Started.' return main_loop do RJ.logger.info "Processed jobs: #{@processed_count}." end rescue log_daemon_error($!) end true end
Private Instance Methods
consume_message(delivery_info, properties, payload)
click to toggle source
# File lib/rabbit_jobs/worker.rb, line 78 def consume_message(delivery_info, properties, payload) if RJ.run_before_process_message_callbacks begin @consumer.process_message(delivery_info, properties, payload) @processed_count += 1 rescue ScriptError, StandardError RabbitJobs.logger.error( short_message: $!.message, _payload: payload, _exception: $!.class, full_message: $!.backtrace.join("\r\n")) end true else RJ.logger.warn "before_process_message hook failed, requeuing payload: #{payload.inspect}" false end end
consume_queue(routing_key)
click to toggle source
# File lib/rabbit_jobs/worker.rb, line 97 def consume_queue(routing_key) RJ.logger.info "Subscribing to #{routing_key}" routing_key = routing_key.to_sym queue = consumer_channel.queue(routing_key, queue_params(routing_key)) explicit_ack = queue_params(routing_key)[:manual_ack].present? queue.subscribe(manual_ack: explicit_ack) do |delivery_info, properties, payload| if consume_message(delivery_info, properties, payload) consumer_channel.ack(delivery_info.delivery_tag) if explicit_ack else requeue = false consumer_channel.nack(delivery_info.delivery_tag, requeue) if explicit_ack end end end