class Chimp::ExecutionGroup
An ExecutionGroup
contains a set of Executors to be processed
Only the subclasses SerialExecutionGroup
and ParallelExecutionGroup
should be used directly.
Attributes
Public Class Methods
# File lib/right_chimp/queue/execution_group.rb, line 28 def initialize(new_group_id=nil) @group_id = new_group_id @queue = [] @jobs_by_id = {} @log = nil @time_start = nil @time_end = nil @concurrency = 1 @started = 0 end
Public Instance Methods
Cancel a job by id
# File lib/right_chimp/queue/execution_group.rb, line 228 def cancel(id) Log.warn "Cancelling job id #{id}" job = @jobs_by_id[id] job.status = Executor::STATUS_ERROR job.owner = nil job.time_end = Time.now @queue.delete(job) end
An execution group is “done” if nothing is queued or running and at least one job has completed.
# File lib/right_chimp/queue/execution_group.rb, line 172 def done? return ( get_jobs_by_status(Executor::STATUS_NONE).size == 0 && get_jobs_by_status(Executor::STATUS_RUNNING).size == 0 && get_jobs_by_status(Executor::STATUS_DONE).size > 0 ) end
Get a particular job
# File lib/right_chimp/queue/execution_group.rb, line 131 def get_job(i) @jobs_by_id[i] end
Get all job ids
# File lib/right_chimp/queue/execution_group.rb, line 124 def get_job_ids @jobs_by_id.keys end
Get all jobs
# File lib/right_chimp/queue/execution_group.rb, line 117 def get_jobs @jobs_by_id.values end
Get jobs by status
# File lib/right_chimp/queue/execution_group.rb, line 138 def get_jobs_by_status(status) r = [] @jobs_by_id.values.each do |i| r << i if i.status == status.to_sym || status.to_sym == :all end return r end
Return total execution time
# File lib/right_chimp/queue/execution_group.rb, line 240 def get_total_exec_time if @time_start == nil return 0 elsif @time_end == nil return Time.now.to_i - @time_start.to_i else return @time_end.to_i- @time_start.to_i end end
# File lib/right_chimp/queue/execution_group.rb, line 146 def job_completed @time_end = Time.now end
Add something to the work queue
# File lib/right_chimp/queue/execution_group.rb, line 42 def push(j) raise "invalid work" if j == nil j.job_id = IDManager.get if j.job_id == nil j.group = self @queue.push(j) @jobs_by_id[j.job_id] = j end
Queue a held job by id
# File lib/right_chimp/queue/execution_group.rb, line 202 def queue(id) Log.debug "Queuing held job id #{id}" job = @jobs_by_id[id] job.owner = nil job.time_start = Time.now job.time_end = nil job.status = Executor::STATUS_NONE self.push(job) end
An execution group is “ready” if it has work that can be done; see implementation in child classes.
# File lib/right_chimp/queue/execution_group.rb, line 164 def ready? raise "unimplemented" end
Requeue a job by id
# File lib/right_chimp/queue/execution_group.rb, line 215 def requeue(id) Log.debug "Requeuing job id #{id}" job = @jobs_by_id[id] job.status = Executor::STATUS_NONE job.owner = nil job.time_start = Time.now job.time_end = nil self.push(job) end
Requeue all failed jobs
# File lib/right_chimp/queue/execution_group.rb, line 193 def requeue_failed_jobs! get_jobs_by_status(Executor::STATUS_ERROR).each do |job| requeue(job.job_id) end end
Reset the queue
# File lib/right_chimp/queue/execution_group.rb, line 110 def reset! @queue = [] end
Return a hash of the results
# File lib/right_chimp/queue/execution_group.rb, line 71 def results return self.get_jobs.map do |task| next if task == nil next if task.server == nil { :job_id => task.job_id, :name => task.info[0], :host => task.server.name, :status => task.status, :error => task.error, :total => self.get_total_execution_time(task.status, task.time_start, task.time_end), :start => task.time_start, :end => task.time_end, :worker => task } end end
Is this execution group running anything?
# File lib/right_chimp/queue/execution_group.rb, line 183 def running? total_jobs_running = get_jobs_by_status(Executor::STATUS_NONE).size + get_jobs_by_status(Executor::STATUS_RUNNING).size + get_jobs_by_status(Executor::STATUS_RETRYING).size (total_jobs_running > 0) end
Reset all jobs and bulk set them
# File lib/right_chimp/queue/execution_group.rb, line 153 def set_jobs(jobs=[]) self.reset! jobs.each do |job| self.push(job) end end
Take something from the queue
# File lib/right_chimp/queue/execution_group.rb, line 53 def shift updated_queue = [] found_job = nil @queue.each do |job| if found_job || job.status == Executor::STATUS_HOLDING updated_queue.push(job) elsif job.status == Executor::STATUS_NONE found_job = job end end @queue = updated_queue @time_start = Time.now if @time_start == nil return found_job end
Size of the active queue
# File lib/right_chimp/queue/execution_group.rb, line 92 def size return @queue.size end
Sort queue by server nickname
# File lib/right_chimp/queue/execution_group.rb, line 99 def sort! if @queue != nil @queue.sort! do |a,b| a.server.nickname <=> b.server.nickname end end end
Print out ExecutionGroup
information
# File lib/right_chimp/queue/execution_group.rb, line 253 def to_s return "#{self.class}[#{group_id}]: ready=#{self.ready?} total_jobs=#{@jobs_by_id.size} queued_jobs=#{self.size}" end
Protected Instance Methods
Return total execution time or -1 for errors
# File lib/right_chimp/queue/execution_group.rb, line 264 def get_total_execution_time(status, time_begin, time_end) return(status != :error ? time_end.to_i - time_begin.to_i : -1) end