class Leveret::Worker

Subscribes to one or more queues and forks workers to perform jobs as they arrive

Call {#do_work} to subscribe to all queues and block the main thread.

Attributes

consumers[RW]

@!attribute queues

@return [Array<Queue>] All of the queues this worker is going to subscribe to

@!attribute consumers

@return [Array<Bunny::Consumer>] All of the actively subscribed queues
queues[RW]

@!attribute queues

@return [Array<Queue>] All of the queues this worker is going to subscribe to

@!attribute consumers

@return [Array<Bunny::Consumer>] All of the actively subscribed queues

Public Class Methods

new(*queue_names) click to toggle source

Create a new worker to process jobs from the list of queue names passed

@param [Array<String>] queue_names ([Leveret.configuration.default_queue_name]) A list of queue names for this

worker to subscribe to and process
# File lib/leveret/worker.rb, line 20
def initialize(*queue_names)
  queue_names << configuration.default_queue_name if queue_names.empty?

  self.queues = queue_names.map { |name| Leveret::Queue.new(name) }
  self.consumers = []
  @time_to_die = false
end

Public Instance Methods

do_work() click to toggle source

Subscribe to all of the {#queues} and begin processing jobs from them. This will block the main thread until an interrupt is received.

# File lib/leveret/worker.rb, line 30
def do_work
  log.info "Starting master process for #{queues.map(&:name).join(', ')}"
  prepare_for_work

  loop do
    if @time_to_die
      cancel_subscriptions
      break
    end
    sleep 1
  end
  log.info "Exiting master process"
end

Private Instance Methods

cancel_subscriptions() click to toggle source

Send cancel to each consumer in the {#consumers} list. This will end the current subscription.

# File lib/leveret/worker.rb, line 79
def cancel_subscriptions
  log.info "Interrupt received, preparing to exit"
  consumers.each do |consumer|
    log.debug "Cancelling consumer on #{consumer.queue.name}"
    consumer.cancel
  end
end
fork_and_run(incoming_message) click to toggle source

Fork the current process and run the job described by payload in the newly created child process. Detach the main process from the child so we can return to the main loop without waiting for it to finish processing the job.

@param [Message] payload Message meta and payload to process

# File lib/leveret/worker.rb, line 92
def fork_and_run(incoming_message)
  pid = fork do
    self.process_name = 'leveret-worker-child'
    log.info "[#{incoming_message.delivery_tag}] Forked to child process #{pid} to run" \
      "#{incoming_message.params[:job]}"

    Leveret.reset_connection!
    Leveret.configuration.after_fork.call

    result = perform_job(incoming_message.params)
    result_handler = Leveret::ResultHandler.new(incoming_message)
    result_handler.handle(result)

    log.info "[#{incoming_message.delivery_tag}] Exiting child process #{pid}"
    exit!(0)
  end

  # Master doesn't need to know how it all went down, the worker will report it's own status back to the queue
  Process.detach(pid)
end
perform_job(payload) click to toggle source

Constantize the class name in the payload and execute the job with parameters

@param [Parameters] payload The job name and parameters the job requires @return [Symbol] :success, :reject or :requeue depending on how job execution went

# File lib/leveret/worker.rb, line 117
def perform_job(payload)
  job_klass = Object.const_get(payload[:job])
  job_klass.perform(Leveret::Parameters.new(payload[:params]))
end
prepare_for_work() click to toggle source

Steps that need to be prepared before we can begin processing jobs

# File lib/leveret/worker.rb, line 47
def prepare_for_work
  setup_traps
  self.process_name = 'leveret-worker-parent'
  start_subscriptions
end
process_name=(name) click to toggle source

Set the title of this process so it's easier on the eye in top

# File lib/leveret/worker.rb, line 64
def process_name=(name)
  Process.setproctitle(name)
end
setup_traps() click to toggle source

Catch INT and TERM signals and set an instance variable to signal the main loop to quit when possible

# File lib/leveret/worker.rb, line 54
def setup_traps
  trap('INT') do
    @time_to_die = true
  end
  trap('TERM') do
    @time_to_die = true
  end
end
start_subscriptions() click to toggle source

Subscribe to each queue defined in {#queues} and add the returned consumer to {#consumers}. This will allow us to gracefully cancel these subscriptions when we need to quit.

# File lib/leveret/worker.rb, line 70
def start_subscriptions
  queues.map do |queue|
    consumers << queue.subscribe do |incoming_message|
      fork_and_run(incoming_message)
    end
  end
end