class Belated::Queue

Job queues that Belated uses. queue is the jobs that are currenly waiting for a worker to start working on them. future_jobs is a SortedSet of jobs that are going to be added to queue at some point in the future.

Constants

FILE_NAME

Attributes

future_jobs[RW]
future_jobs_db[RW]

Public Class Methods

new(queue: Thread::Queue.new, future_jobs: SortedSet.new) click to toggle source
# File lib/belated/queue.rb, line 20
def initialize(queue: Thread::Queue.new, future_jobs: SortedSet.new)
  @queue = queue
  @mutex = Mutex.new
  self.future_jobs = future_jobs
  self.future_jobs_db = PStore.new("future_jobs_#{Belated.environment}.pstore", true) # pass true for thread safety
end

Public Instance Methods

clear() click to toggle source
# File lib/belated/queue.rb, line 60
def clear
  @queue.clear
  self.future_jobs = []
end
connected?() click to toggle source
# File lib/belated/queue.rb, line 98
def connected?
  true
end
delete_job(job) click to toggle source
# File lib/belated/queue.rb, line 111
def delete_job(job)
  log "Deleting #{future_jobs.delete(job)} from future jobs"
  future_jobs_db.transaction do
    future_jobs_db.delete(job.id)
  end
end
empty?() click to toggle source
# File lib/belated/queue.rb, line 65
def empty?
  @queue.empty?
end
enqueue_future_jobs() click to toggle source
# File lib/belated/queue.rb, line 27
def enqueue_future_jobs
  loop do
    job = future_jobs.min
    if job.nil?
      sleep Belated.heartbeat
      next
    end
    if job.at <= Time.now.to_f
      delete_job(job)
      push(job)
    end
  rescue DRb::DRbConnError
    error 'DRb connection error!!!!!!'
    log stats
  end
end
find(job_id) click to toggle source
# File lib/belated/queue.rb, line 102
def find(job_id)
  job = nil
  future_jobs_db.transaction(true) do
    job = future_jobs_db[job_id]
  end
  job = future_jobs.find { |j| j.id == job_id } if job.nil?
  job
end
length() click to toggle source
# File lib/belated/queue.rb, line 69
def length
  @queue.length
end
load_jobs() click to toggle source
# File lib/belated/queue.rb, line 73
def load_jobs
  future_jobs_db.transaction(true) do
    future_jobs_db.roots.each do |id|
      future_jobs << future_jobs_db[id]
    end
  end
  return unless File.exist?(FILE_NAME)

  jobs = YAML.load(File.binread(FILE_NAME))
  jobs.each do |job|
    @queue.push(job)
  end
  File.delete(FILE_NAME)
end
pop() click to toggle source
# File lib/belated/queue.rb, line 56
def pop
  @queue.pop
end
push(job) click to toggle source
# File lib/belated/queue.rb, line 44
def push(job)
  if job.is_a?(Symbol) || job.at.nil? ||
     job.at <= Time.now.to_f
    @queue.push(job)
  else
    @mutex.synchronize do
      @future_jobs << job
      insert_into_future_jobs_db(job) unless job.proc_klass
    end
  end
end
save_jobs() click to toggle source
# File lib/belated/queue.rb, line 88
def save_jobs
  class_array = []
  @queue.length.times do |_i|
    unless proc_or_shutdown?(klass = @queue.pop)
      class_array << klass
    end
  end
  pp File.open(FILE_NAME, 'wb') { |f| f.write(YAML.dump(class_array)) }
end

Private Instance Methods

insert_into_future_jobs_db(job) click to toggle source
# File lib/belated/queue.rb, line 124
def insert_into_future_jobs_db(job)
  future_jobs_db.transaction do
    future_jobs_db[job.id] = job
  end
end
proc_or_shutdown?(job) click to toggle source
# File lib/belated/queue.rb, line 120
def proc_or_shutdown?(job)
  job.is_a?(Symbol) || job.job.instance_of?(Proc)
end