class Belated::Queue
Job queues that Belated
uses. queue is the jobs that are currenly waiting for a worker to start working on them. future_jobs
is a SortedSet of jobs that are going to be added to queue at some point in the future.
Constants
- FILE_NAME
Attributes
future_jobs[RW]
future_jobs_db[RW]
Public Class Methods
new(queue: Thread::Queue.new, future_jobs: SortedSet.new)
click to toggle source
# File lib/belated/queue.rb, line 20 def initialize(queue: Thread::Queue.new, future_jobs: SortedSet.new) @queue = queue @mutex = Mutex.new self.future_jobs = future_jobs self.future_jobs_db = PStore.new("future_jobs_#{Belated.environment}.pstore", true) # pass true for thread safety end
Public Instance Methods
clear()
click to toggle source
# File lib/belated/queue.rb, line 60 def clear @queue.clear self.future_jobs = [] end
connected?()
click to toggle source
# File lib/belated/queue.rb, line 98 def connected? true end
delete_job(job)
click to toggle source
# File lib/belated/queue.rb, line 111 def delete_job(job) log "Deleting #{future_jobs.delete(job)} from future jobs" future_jobs_db.transaction do future_jobs_db.delete(job.id) end end
empty?()
click to toggle source
# File lib/belated/queue.rb, line 65 def empty? @queue.empty? end
enqueue_future_jobs()
click to toggle source
# File lib/belated/queue.rb, line 27 def enqueue_future_jobs loop do job = future_jobs.min if job.nil? sleep Belated.heartbeat next end if job.at <= Time.now.to_f delete_job(job) push(job) end rescue DRb::DRbConnError error 'DRb connection error!!!!!!' log stats end end
find(job_id)
click to toggle source
# File lib/belated/queue.rb, line 102 def find(job_id) job = nil future_jobs_db.transaction(true) do job = future_jobs_db[job_id] end job = future_jobs.find { |j| j.id == job_id } if job.nil? job end
length()
click to toggle source
# File lib/belated/queue.rb, line 69 def length @queue.length end
load_jobs()
click to toggle source
# File lib/belated/queue.rb, line 73 def load_jobs future_jobs_db.transaction(true) do future_jobs_db.roots.each do |id| future_jobs << future_jobs_db[id] end end return unless File.exist?(FILE_NAME) jobs = YAML.load(File.binread(FILE_NAME)) jobs.each do |job| @queue.push(job) end File.delete(FILE_NAME) end
pop()
click to toggle source
# File lib/belated/queue.rb, line 56 def pop @queue.pop end
push(job)
click to toggle source
# File lib/belated/queue.rb, line 44 def push(job) if job.is_a?(Symbol) || job.at.nil? || job.at <= Time.now.to_f @queue.push(job) else @mutex.synchronize do @future_jobs << job insert_into_future_jobs_db(job) unless job.proc_klass end end end
save_jobs()
click to toggle source
# File lib/belated/queue.rb, line 88 def save_jobs class_array = [] @queue.length.times do |_i| unless proc_or_shutdown?(klass = @queue.pop) class_array << klass end end pp File.open(FILE_NAME, 'wb') { |f| f.write(YAML.dump(class_array)) } end
Private Instance Methods
insert_into_future_jobs_db(job)
click to toggle source
# File lib/belated/queue.rb, line 124 def insert_into_future_jobs_db(job) future_jobs_db.transaction do future_jobs_db[job.id] = job end end
proc_or_shutdown?(job)
click to toggle source
# File lib/belated/queue.rb, line 120 def proc_or_shutdown?(job) job.is_a?(Symbol) || job.job.instance_of?(Proc) end