class Discobolo::Actor
Attributes
queues[RW]
Public Class Methods
new(*args)
click to toggle source
# File lib/discobolo/actor.rb, line 7 def initialize(*args) args = Hash[*args.flatten] if args.is_a?(Array) Discobolo::Config.logger.info "Initialize actor with: #{args}" @queues = Discobolo::Config.queues async.fetch if args[:fetch] end
Public Instance Methods
fetch()
click to toggle source
# File lib/discobolo/actor.rb, line 14 def fetch client = Discobolo::Config.client Discobolo::Config.logger.info "Listen Disque queues: #{self.queues} with concurrency of #{Discobolo::Config.actor_concurrency} workers" options = Discobolo::Config.fetch_options.merge({from: self.queues}) loop do jobs = client.fetch(options) jobs.to_a.each do |queue, job_id, options| Discobolo::Config.logger.info "#{queue} queue: received #{job_id} received #{options}" #since we are supervising the actor, let it crash #begin # Claims to be still working with the specified job #client.working(job_id) options = JSON.parse(options) klass = Object.const_get(options['class']) instance = klass.new instance.job_id = job_id instance.async.perform_async(*options['args']) #rescue => e # Discobolo::Config.logger.error "Terrible error happened #{e}" #end end end end