class QC::Worker
A Worker
object can process jobs from one or many queues.
Attributes
Public Class Methods
Creates a new worker but does not start the worker. See Worker#start
. This method takes a single hash argument. The following keys are read:
- fork_worker
-
Worker
forks each job execution. - wait_interval
-
Time to wait between failed lock attempts
- connection
-
PG::Connection object.
- q_name
-
Name of a single queue to process.
- q_names
-
Names of queues to process. Will process left to right.
- top_bound
-
Offset to the head of the queue. 1 == strict FIFO.
# File lib/queue_classic/worker.rb, line 18 def initialize(args={}) @fork_worker = args[:fork_worker] || QC.fork_worker? @wait_interval = args[:wait_interval] || QC.wait_time if args[:connection] @conn_adapter = ConnAdapter.new(args[:connection]) else @conn_adapter = QC.default_conn_adapter end @queues = setup_queues(@conn_adapter, (args[:q_name] || QC.queue), (args[:q_names] || QC.queues), (args[:top_bound] || QC.top_bound)) log(args.merge(:at => "worker_initialized")) @running = true end
Public Instance Methods
Each job includes a method column. We will use ruby's eval to grab the ruby object from memory. We send the method to the object and pass the args.
# File lib/queue_classic/worker.rb, line 129 def call(job) args = job[:args] receiver_str, _, message = job[:method].rpartition('.') receiver = eval(receiver_str) receiver.send(message, *args) end
Calls Worker#work
but after the current process is forked. The parent process will wait on the child process to exit.
# File lib/queue_classic/worker.rb, line 62 def fork_and_work cpid = fork {setup_child; work} log(:at => :fork, :pid => cpid) Process.wait(cpid) end
This method will be called when an exception is raised during the execution of the job.
# File lib/queue_classic/worker.rb, line 138 def handle_failure(job,e) $stderr.puts("count#qc.job-error=1 job=#{job} error=#{e.inspect} at=#{e.backtrace.first}") end
Attempt to lock a job in the queue's table. If a job can be locked, this method returns an array with 2 elements. The first element is the queue from which the job was locked and the second is a hash representation of the job. If a job is returned, its locked_at column has been set in the job's row. It is the caller's responsibility to delete the job row from the table when the job is complete.
# File lib/queue_classic/worker.rb, line 86 def lock_job log(:at => "lock_job") job = nil while @running @queues.each do |queue| if job = queue.lock return [queue, job] end end @conn_adapter.wait(@wait_interval, *@queues.map {|q| q.name}) end end
# File lib/queue_classic/worker.rb, line 149 def log(data) QC.log(data) end
A job is processed by evaluating the target code. if the job is evaluated with no exceptions then it is deleted from the queue. If the job has raised an exception the responsibility of what to do with the job is delegated to Worker#handle_failure
. If the job is not finished and an INT signal is trapped, this method will unlock the job in the queue.
# File lib/queue_classic/worker.rb, line 106 def process(queue, job) start = Time.now finished = false begin call(job).tap do queue.delete(job[:id]) finished = true end rescue => e handle_failure(job, e) finished = true ensure if !finished queue.unlock(job[:id]) end ttp = Integer((Time.now - start) * 1000) QC.measure("time-to-process=#{ttp} source=#{queue.name}") end end
This method should be overriden if your worker is forking and you need to re-establish database connections
# File lib/queue_classic/worker.rb, line 145 def setup_child log(:at => "setup_child") end
Commences the working of jobs. start() spins on @running –which is initialized as true. This method is the primary entry point to starting the worker. The canonical example of starting a worker is as follows: QC::Worker.new
.start
# File lib/queue_classic/worker.rb, line 41 def start QC.unlock_jobs_of_dead_workers while @running @fork_worker ? fork_and_work : work end end
Signals the worker to stop taking new work. This method has no immediate effect. However, there are two loops in the worker (one in start
and another in lock_job
) which check the @running variable to determine if further progress is desirable. In the case that @running is false, the aforementioned methods will short circuit and cause the blocking call to start
to unblock.
# File lib/queue_classic/worker.rb, line 56 def stop @running = false end
Blocks on locking a job, and once a job is locked, it will process the job.
# File lib/queue_classic/worker.rb, line 70 def work queue, job = lock_job if queue && job QC.log_yield(:at => "work", :job => job[:id]) do process(queue, job) end end end
Private Instance Methods
# File lib/queue_classic/worker.rb, line 155 def setup_queues(adapter, queue, queues, top_bound) names = queues.length > 0 ? queues : [queue] names.map do |name| QC::Queue.new(name, top_bound).tap do |q| q.conn_adapter = adapter end end end