class RabbitJobs::Worker

Worker daemon.

Attributes

consumer[R]
process_name[RW]

Public Class Methods

new(*queues) click to toggle source

Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.

If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.

# File lib/rabbit_jobs/worker.rb, line 35
def initialize(*queues)
  @queues = queues.map { |queue| queue.to_s.strip }.flatten.uniq
  if @queues == ['*'] || @queues.empty?
    @queues = RabbitJobs.config.routing_keys
  end

  fail 'Cannot initialize worker without queues.' if @queues.empty?
end

Public Instance Methods

consumer=(value) click to toggle source
# File lib/rabbit_jobs/worker.rb, line 13
def consumer=(value)
  unless value.respond_to?(:process_message)
    fail ArgumentError, 'value must implement #process_message'
  end
  @consumer = value
end
queue_params(routing_key) click to toggle source
# File lib/rabbit_jobs/worker.rb, line 20
def queue_params(routing_key)
  RJ.config[:queues][routing_key.to_sym]
end
queues() click to toggle source
# File lib/rabbit_jobs/worker.rb, line 44
def queues
  @queues || []
end
work() click to toggle source

Subscribes to queue and working on jobs

# File lib/rabbit_jobs/worker.rb, line 49
def work
  return false unless startup
  @consumer ||= RJ::Consumer::JobConsumer.new

  $0 = process_name || "rj_worker (#{queues.join(', ')})"

  @processed_count = 0

  begin
    consumer_channel.prefetch(1)

    queues.each do |routing_key|
      consume_queue(routing_key)
    end

    RJ.logger.info 'Started.'

    return main_loop do
      RJ.logger.info "Processed jobs: #{@processed_count}."
    end
  rescue
    log_daemon_error($!)
  end

  true
end

Private Instance Methods

consume_message(delivery_info, properties, payload) click to toggle source
# File lib/rabbit_jobs/worker.rb, line 78
def consume_message(delivery_info, properties, payload)
  if RJ.run_before_process_message_callbacks
    begin
      @consumer.process_message(delivery_info, properties, payload)
      @processed_count += 1
    rescue ScriptError, StandardError
      RabbitJobs.logger.error(
        short_message: $!.message,
        _payload: payload,
        _exception: $!.class,
        full_message: $!.backtrace.join("\r\n"))
    end
    true
  else
    RJ.logger.warn "before_process_message hook failed, requeuing payload: #{payload.inspect}"
    false
  end
end
consume_queue(routing_key) click to toggle source
# File lib/rabbit_jobs/worker.rb, line 97
def consume_queue(routing_key)
  RJ.logger.info "Subscribing to #{routing_key}"
  routing_key = routing_key.to_sym

  queue = consumer_channel.queue(routing_key, queue_params(routing_key))

  explicit_ack = queue_params(routing_key)[:manual_ack].present?

  queue.subscribe(manual_ack: explicit_ack) do |delivery_info, properties, payload|
    if consume_message(delivery_info, properties, payload)
      consumer_channel.ack(delivery_info.delivery_tag) if explicit_ack
    else
      requeue = false
      consumer_channel.nack(delivery_info.delivery_tag, requeue) if explicit_ack
    end
  end
end