class Subservient::Worker
Attributes
state[R]
tube[R]
Public Class Methods
new(tube='default')
click to toggle source
# File lib/subservient/worker.rb, line 4 def initialize tube='default' @tube = [Subservient::namespace, tube].join('.') @state = :stopped @thread_pool = [] end
Public Instance Methods
go(threads: 5)
click to toggle source
# File lib/subservient/worker.rb, line 10 def go threads: 5 start threads: threads trap('SIGINT') do exit end at_exit do Thread.main.wakeup stop end sleep end
start(threads: 5)
click to toggle source
# File lib/subservient/worker.rb, line 22 def start threads: 5 raise MissingTaskError if Subservient::tasks.empty? raise AlreadyRunningError if @state != :stopped @state = :started threads.times do start_worker_thread end end
stop(timeout: 30)
click to toggle source
# File lib/subservient/worker.rb, line 30 def stop timeout: 30 @state = :shutdown # Tell worker threads not to accept any more jobs @thread_pool.each do |thread| thread.join(timeout) # Wait for all workers to exit gracefully... thread.kill # but kill them if they take too long end @state = :stopped end
Private Instance Methods
start_worker_thread()
click to toggle source
# File lib/subservient/worker.rb, line 41 def start_worker_thread @thread_pool << Thread.new do conn = Beaneater::Pool.new(Subservient::pool) begin work conn ensure conn.close end end end
work(conn)
click to toggle source
# File lib/subservient/worker.rb, line 52 def work conn puts "Worker started" conn.tubes.watch!(@tube) while @state == :started do begin # Increasing this will decrease load on beanstalkd, but will # also increase the time it takes to shut down the worker timeout = 2 bs_job = conn.tubes.reserve(timeout) begin job = Job.new bs_job job.execute rescue JobError => e puts "#{e} - BURYING JOB #{bs_job.id}" bs_job.bury end rescue Beaneater::TimedOutError end end puts "Worker stopped" end