class Disc::Worker

Attributes

count[R]
disque[R]
queues[R]
timeout[R]

Public Class Methods

current() click to toggle source
# File lib/disc/worker.rb, line 10
def self.current
  @current ||= new
end
new(options = {}) click to toggle source
# File lib/disc/worker.rb, line 22
def initialize(options = {})
  @disque = options.fetch(:disque, Disc.disque)
  @queues = options.fetch(
    :queues,
    ENV.fetch('QUEUES', Disc.default_queue)
  ).split(',')
  @count = Integer(
    options.fetch(
      :count,
      ENV.fetch('DISQUE_COUNT', '1')
    )
  )
  @timeout = Integer(
    options.fetch(
      :timeout,
      ENV.fetch('DISQUE_TIMEOUT', '2000')
    )
  )

  self.run if options[:run]
  self
end
run() click to toggle source
# File lib/disc/worker.rb, line 14
def self.run
  current.run
end
stop() click to toggle source
# File lib/disc/worker.rb, line 18
def self.stop
  current.stop
end

Public Instance Methods

run() click to toggle source
# File lib/disc/worker.rb, line 49
def run
  $stdout.puts("Disc::Worker listening in #{queues}")
  loop do
    jobs = disque.fetch(from: queues, timeout: timeout, count: count)
    Array(jobs).each do |queue, msgid, serialized_job|
      begin
        job_instance, arguments = Disc.load_job(serialized_job, msgid)
        job_instance.perform(*arguments)
        disque.call('ACKJOB', msgid)
        $stdout.puts("Completed #{ job_instance.class.name } id #{ msgid }")
      rescue => err
        Disc.on_error(err, {
          disque_id: msgid,
          queue: queue,
          class: defined?(job_instance) ? job_instance.class.name : '',
          arguments: defined?(arguments) ? arguments : []
        })
      end
    end

    break if @stop
  end
ensure
  disque.quit
end
stop() click to toggle source
# File lib/disc/worker.rb, line 45
def stop
  @stop = true
end