class Delayed::Threaded::Worker

Threaded DJ worker implementation.

Constants

THREAD_LOCAL_ACCESSORS

Public Class Methods

lifecycle() click to toggle source

@override to return the same as Delayed::Worker.lifecycle (uses class instance state)

# File lib/delayed/threaded/worker.rb, line 17
def self.lifecycle; Delayed::Worker.lifecycle end
setup_lifecycle() click to toggle source

@override since `initialize` (DJ 4.1) does: `self.class.setup_lifecycle`

# File lib/delayed/threaded/worker.rb, line 20
def self.setup_lifecycle; Delayed::Worker.setup_lifecycle end

Public Instance Methods

exit!() click to toggle source
# File lib/delayed/threaded/worker.rb, line 131
def exit!
  return if @exit # #stop?
  say "Stoping job worker"
  @exit = true # #stop
  if Delayed.const_defined?(:Job) && Delayed::Job.respond_to?(:clear_locks!)
    Delayed::Job.clear_locks!(name)
  end
end
name() click to toggle source

e.g. :

def self.sleep_delay=(value)
  (Thread.current[:delayed_threaded_worker_config] ||= Config.new).sleep_delay = value
end

def self.min_priority
  if (config = Thread.current[:delayed_threaded_worker_config]) && config.key?(:sleep_delay)
    config.sleep_delay
  else
    superclass.sleep_delay # Delayed::Worker.sleep_delay
  end
end
# File lib/delayed/threaded/worker.rb, line 93
def name
  if (@name ||= nil).nil?
    # super - [prefix]host:hostname pid:process_pid
    begin
      @name = "#{super} thread:#{thread_id}".freeze
    rescue
      @name = "#{@name_prefix}thread:#{thread_id}".freeze
    end
  end
  @name
end
start() click to toggle source
# File lib/delayed/threaded/worker.rb, line 145
def start
  say "Starting job worker"
  trap

  loop do
    result = nil

    realtime = Benchmark.realtime do
      result = work_off
    end

    count = result.sum

    break if @exit

    if count.zero?
      sleep(self.class.sleep_delay)
    else
      say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
    end

    break if @exit
  end
end
stop() click to toggle source
# File lib/delayed/threaded/worker.rb, line 171
def stop; @exit = true; end
stop?() click to toggle source
# File lib/delayed/threaded/worker.rb, line 170
def stop?; !!@exit; end
thread_id() click to toggle source
# File lib/delayed/threaded/worker.rb, line 108
def thread_id
  if ( name = Thread.current.name || '' ).empty?
    name = java.lang.Thread.currentThread.getName
    if name.size > 100 && match = name.match(/(.*?)\:\s.*?[\/\\]+/)
      name = match[1]
    end
  end
  name
end
to_s() click to toggle source
# File lib/delayed/threaded/worker.rb, line 105
def to_s; name; end

Protected Instance Methods

trap(name = nil) click to toggle source
# File lib/delayed/threaded/worker.rb, line 177
def trap(name = nil)
  # catch invocations from #start traps TERM and INT
  at_exit { exit! } if ! name || name.to_s == 'TERM'
end