class Disc::Worker
Attributes
count[R]
disque[R]
queues[R]
timeout[R]
Public Class Methods
current()
click to toggle source
# File lib/disc/worker.rb, line 10 def self.current @current ||= new end
new(options = {})
click to toggle source
# File lib/disc/worker.rb, line 22 def initialize(options = {}) @disque = options.fetch(:disque, Disc.disque) @queues = options.fetch( :queues, ENV.fetch('QUEUES', Disc.default_queue) ).split(',') @count = Integer( options.fetch( :count, ENV.fetch('DISQUE_COUNT', '1') ) ) @timeout = Integer( options.fetch( :timeout, ENV.fetch('DISQUE_TIMEOUT', '2000') ) ) self.run if options[:run] self end
run()
click to toggle source
# File lib/disc/worker.rb, line 14 def self.run current.run end
stop()
click to toggle source
# File lib/disc/worker.rb, line 18 def self.stop current.stop end
Public Instance Methods
run()
click to toggle source
# File lib/disc/worker.rb, line 49 def run $stdout.puts("Disc::Worker listening in #{queues}") loop do jobs = disque.fetch(from: queues, timeout: timeout, count: count) Array(jobs).each do |queue, msgid, serialized_job| begin job_instance, arguments = Disc.load_job(serialized_job, msgid) job_instance.perform(*arguments) disque.call('ACKJOB', msgid) $stdout.puts("Completed #{ job_instance.class.name } id #{ msgid }") rescue => err Disc.on_error(err, { disque_id: msgid, queue: queue, class: defined?(job_instance) ? job_instance.class.name : '', arguments: defined?(arguments) ? arguments : [] }) end end break if @stop end ensure disque.quit end
stop()
click to toggle source
# File lib/disc/worker.rb, line 45 def stop @stop = true end