class Chimp::ChimpQueue

The ChimpQueue is a singleton that contains the chimp work queue

Attributes

delay[RW]
group[RW]
max_threads[RW]
processing[RW]
retry_count[RW]

Public Class Methods

[](group) click to toggle source

Allow the groups to be accessed as ChimpQueue.group[:foo]

# File lib/right_chimp/queue/chimp_queue.rb, line 139
def self.[](group)
  return ChimpQueue.instance.group[group]
end
[]=(k,v) click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 143
def self.[]=(k,v)
  ChimpQueue.instance.group[k] = v
end
new() click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 11
def initialize
  @delay = 0
  @retry_count = 0
  @max_threads = 10
  @workers_never_exit = true
  @threads = []
  @semaphore = Mutex.new
  @processing = {}
  self.reset!
end

Public Instance Methods

create_group(name, type = :parallel, concurrency = 1) click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 58
def create_group(name, type = :parallel, concurrency = 1)
  Log.debug "Creating new execution group #{name} type=#{type} concurrency=#{concurrency}"
  new_group = ExecutionGroupFactory.from_type(type)
  new_group.group_id = name
  new_group.concurrency = concurrency
  ChimpQueue[name] = new_group
end
get_job(id) click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 163
def get_job(id)
  jobs = self.get_jobs

  jobs.each do |j|
    return j if j.job_id == id.to_i
  end
end
get_jobs() click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 171
def get_jobs
  r = []
  @group.values.each do |group|
    group.get_jobs.each { |job| r << job }
  end

  r
end
get_jobs_by_status(status) click to toggle source

Return an array of all jobs with the requested status.

# File lib/right_chimp/queue/chimp_queue.rb, line 151
def get_jobs_by_status(status)
  r = []
  @group.values.each do |group|
    v = group.get_jobs_by_status(status)
    if v != nil and v != []
      r += v
    end
  end

  return r
end
get_jobs_by_uuid(uuid) click to toggle source
# File lib/right_chimp/queue/chimp_queue.rb, line 180
def get_jobs_by_uuid(uuid)
  r     = []
  jobs = self.get_jobs

  jobs.each do |j|
    r << j if j.job_uuid == uuid
  end

  r
end
push(g, w) click to toggle source

Push a task into the queue

# File lib/right_chimp/queue/chimp_queue.rb, line 51
def push(g, w)
  raise "no group specified" unless g
  create_group(g) if not ChimpQueue[g]

  ChimpQueue[g].push(w) unless ChimpQueue[g].get_job(w.job_id)
end
quit() click to toggle source

Quit - empty the queue and wait for remaining jobs to complete

# File lib/right_chimp/queue/chimp_queue.rb, line 98
def quit
  i = 0
  @group.keys.each do |group|
    wait_until_done(group) do
      if i < 30
        sleep 1
        i += 1
        print "."
      else
        break
      end
    end
  end

  @threads.each { |t| t.kill }
  puts " done."
end
reset!() click to toggle source

Reset the queue and the :default group

This doesn't do anything to the groups's jobs

# File lib/right_chimp/queue/chimp_queue.rb, line 27
def reset!
  @group = {}
  @group[:default] = ParallelExecutionGroup.new(:default)
end
run_threads() click to toggle source

Run all threads forever (used by chimpd)

# File lib/right_chimp/queue/chimp_queue.rb, line 119
def run_threads
  @threads.each do |t|
    t.join(5)
  end
end
shift() click to toggle source

Grab the oldest work item available

# File lib/right_chimp/queue/chimp_queue.rb, line 69
def shift
  r = nil
  @semaphore.synchronize do
    @group.values.each do |group|
      if group.ready?
        r = group.shift
        Log.debug "Shifting job '#{r.job_id}' from group '#{group.group_id}'" unless r.nil?
        break
      end
    end
  end
  return(r)
end
size() click to toggle source

return the total number of queued (non-executing) objects

# File lib/right_chimp/queue/chimp_queue.rb, line 128
def size
  s = 0
  @group.values.each do |group|
    s += group.size
  end
  return(s)
end
start() click to toggle source

Start up queue runners

# File lib/right_chimp/queue/chimp_queue.rb, line 35
def start
  self.sort_queues!

  for i in (1..max_threads)
    @threads << Thread.new(i) do
      worker = QueueWorker.new
      worker.delay = @delay
      worker.retry_count = @retry_count
      worker.run
    end
  end
end
wait_until_done(g) { || ... } click to toggle source

Wait until a group is done

# File lib/right_chimp/queue/chimp_queue.rb, line 86
def wait_until_done(g, &block)
  while @group[g].running?
    @threads.each do |t|
      t.join(1)
      yield
    end
  end
end

Protected Instance Methods

sort_queues!() click to toggle source

Sort all the things, er, queues

# File lib/right_chimp/queue/chimp_queue.rb, line 196
def sort_queues!
  return @group.values.each { |group| group.sort! }
end