class Sidekiq::Addons::Prioritize::Deqr
Attributes
script_sha[RW]
Public Class Methods
bulk_requeue(inprogress, options)
click to toggle source
Calls superclass method
# File lib/sidekiq/addons/prioritize/deqr.rb, line 36 def self.bulk_requeue(inprogress, options) return if inprogress.empty? jobs_unhandled = [] inprogress.each do |unit_work| unit_work = JSON.load(unit_work) priority = Sidekiq::Addons::Prioritize::Enqr.get_priority_from_msg(unit_work) if ( priority > 0 ) Sidekiq.redis do |con| Sidekiq::Addons::Prioritize::Enqr.enqueue_with_priority(con, unit_work["queue"], priority, unit_work) end else jobs_unhandled << unit_work.to_json end end Sidekiq.logger.info("Pushed #{inprogress.size - jobs_unhandled.size} jobs back to priority_queue in Redis") super(jobs_unhandled, options) end
Public Instance Methods
retrieve_work()
click to toggle source
Calls superclass method
# File lib/sidekiq/addons/prioritize/deqr.rb, line 20 def retrieve_work priority_job = nil @queues.uniq.each do |q| q_name = Sidekiq::Addons::Util.priority_job_queue_name(q.split("queue:").last) priority_job = zpop(q_name) break if priority_job end if priority_job.nil? return super else work = JSON.load(priority_job) return UnitOfWork.new(work["queue"], priority_job) end end
zpop(queue)
click to toggle source
# File lib/sidekiq/addons/prioritize/deqr.rb, line 9 def zpop(queue) Sidekiq.redis do |con| #TODO: for some reason pipeline and multi didnt work; revisit if ( self.script_sha.nil? ) # or !con.script(:exists, self.script_sha) ignore this check; let it crash self.script_sha = con.script(:load, Sidekiq::Addons::Util::ZPOP) end con.evalsha(self.script_sha, [queue]) end end