class Subservient::Worker

Attributes

state[R]
tube[R]

Public Class Methods

new(tube='default') click to toggle source
# File lib/subservient/worker.rb, line 4
def initialize tube='default'
  @tube    = [Subservient::namespace, tube].join('.')
  @state   = :stopped
  @thread_pool = []
end

Public Instance Methods

go(threads: 5) click to toggle source
# File lib/subservient/worker.rb, line 10
def go threads: 5
  start threads: threads

  trap('SIGINT') do
    exit
  end
  at_exit do
    Thread.main.wakeup
    stop
  end
  sleep
end
start(threads: 5) click to toggle source
# File lib/subservient/worker.rb, line 22
def start threads: 5
  raise MissingTaskError if Subservient::tasks.empty?
  raise AlreadyRunningError if @state != :stopped
  @state = :started
  threads.times do
    start_worker_thread
  end
end
stop(timeout: 30) click to toggle source
# File lib/subservient/worker.rb, line 30
def stop timeout: 30
  @state = :shutdown        # Tell worker threads not to accept any more jobs
  @thread_pool.each do |thread|
    thread.join(timeout)    # Wait for all workers to exit gracefully...
    thread.kill             # but kill them if they take too long
  end
  @state = :stopped
end

Private Instance Methods

start_worker_thread() click to toggle source
# File lib/subservient/worker.rb, line 41
def start_worker_thread
  @thread_pool << Thread.new do
    conn = Beaneater::Pool.new(Subservient::pool)
    begin
      work conn
    ensure
      conn.close
    end
  end
end
work(conn) click to toggle source
# File lib/subservient/worker.rb, line 52
def work conn
  puts "Worker started"
  conn.tubes.watch!(@tube)
  while @state == :started do
    begin
      # Increasing this will decrease load on beanstalkd, but will
      # also increase the time it takes to shut down the worker
      timeout = 2

      bs_job = conn.tubes.reserve(timeout)
      begin
        job = Job.new bs_job
        job.execute
      rescue JobError => e
        puts "#{e} - BURYING JOB #{bs_job.id}"
        bs_job.bury
      end
    rescue Beaneater::TimedOutError
    end
  end
  puts "Worker stopped"
end