class Chimp::ChimpQueue
The ChimpQueue
is a singleton that contains the chimp work queue
Attributes
delay[RW]
group[RW]
max_threads[RW]
processing[RW]
retry_count[RW]
Public Class Methods
[](group)
click to toggle source
Allow the groups to be accessed as ChimpQueue.group[:foo]
# File lib/right_chimp/queue/chimp_queue.rb, line 139 def self.[](group) return ChimpQueue.instance.group[group] end
[]=(k,v)
click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 143 def self.[]=(k,v) ChimpQueue.instance.group[k] = v end
new()
click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 11 def initialize @delay = 0 @retry_count = 0 @max_threads = 10 @workers_never_exit = true @threads = [] @semaphore = Mutex.new @processing = {} self.reset! end
Public Instance Methods
create_group(name, type = :parallel, concurrency = 1)
click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 58 def create_group(name, type = :parallel, concurrency = 1) Log.debug "Creating new execution group #{name} type=#{type} concurrency=#{concurrency}" new_group = ExecutionGroupFactory.from_type(type) new_group.group_id = name new_group.concurrency = concurrency ChimpQueue[name] = new_group end
get_job(id)
click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 163 def get_job(id) jobs = self.get_jobs jobs.each do |j| return j if j.job_id == id.to_i end end
get_jobs()
click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 171 def get_jobs r = [] @group.values.each do |group| group.get_jobs.each { |job| r << job } end r end
get_jobs_by_status(status)
click to toggle source
Return an array of all jobs with the requested status.
# File lib/right_chimp/queue/chimp_queue.rb, line 151 def get_jobs_by_status(status) r = [] @group.values.each do |group| v = group.get_jobs_by_status(status) if v != nil and v != [] r += v end end return r end
get_jobs_by_uuid(uuid)
click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 180 def get_jobs_by_uuid(uuid) r = [] jobs = self.get_jobs jobs.each do |j| r << j if j.job_uuid == uuid end r end
push(g, w)
click to toggle source
Push a task into the queue
# File lib/right_chimp/queue/chimp_queue.rb, line 51 def push(g, w) raise "no group specified" unless g create_group(g) if not ChimpQueue[g] ChimpQueue[g].push(w) unless ChimpQueue[g].get_job(w.job_id) end
quit()
click to toggle source
Quit - empty the queue and wait for remaining jobs to complete
# File lib/right_chimp/queue/chimp_queue.rb, line 98 def quit i = 0 @group.keys.each do |group| wait_until_done(group) do if i < 30 sleep 1 i += 1 print "." else break end end end @threads.each { |t| t.kill } puts " done." end
reset!()
click to toggle source
Reset the queue and the :default group
This doesn't do anything to the groups's jobs
# File lib/right_chimp/queue/chimp_queue.rb, line 27 def reset! @group = {} @group[:default] = ParallelExecutionGroup.new(:default) end
run_threads()
click to toggle source
Run all threads forever (used by chimpd)
# File lib/right_chimp/queue/chimp_queue.rb, line 119 def run_threads @threads.each do |t| t.join(5) end end
shift()
click to toggle source
Grab the oldest work item available
# File lib/right_chimp/queue/chimp_queue.rb, line 69 def shift r = nil @semaphore.synchronize do @group.values.each do |group| if group.ready? r = group.shift Log.debug "Shifting job '#{r.job_id}' from group '#{group.group_id}'" unless r.nil? break end end end return(r) end
size()
click to toggle source
return the total number of queued (non-executing) objects
# File lib/right_chimp/queue/chimp_queue.rb, line 128 def size s = 0 @group.values.each do |group| s += group.size end return(s) end
start()
click to toggle source
Start up queue runners
# File lib/right_chimp/queue/chimp_queue.rb, line 35 def start self.sort_queues! for i in (1..max_threads) @threads << Thread.new(i) do worker = QueueWorker.new worker.delay = @delay worker.retry_count = @retry_count worker.run end end end
wait_until_done(g) { || ... }
click to toggle source
Wait until a group is done
# File lib/right_chimp/queue/chimp_queue.rb, line 86 def wait_until_done(g, &block) while @group[g].running? @threads.each do |t| t.join(1) yield end end end
Protected Instance Methods
sort_queues!()
click to toggle source
Sort all the things, er, queues
# File lib/right_chimp/queue/chimp_queue.rb, line 196 def sort_queues! return @group.values.each { |group| group.sort! } end