class ExceptionalSynchrony::LimitedWorkQueue

Public Class Methods

new(em, limit) click to toggle source
# File lib/exceptional_synchrony/limited_work_queue.rb, line 4
def initialize(em, limit)
  @em = em
  limit > 0 or raise ArgumentError, "limit must be positive"
  @limit = limit
  @worker_count = 0
  @job_procs = []
  @paused = false
end

Public Instance Methods

add(proc = nil, &block)
Alias for: add!
add!(proc = nil, &block) click to toggle source

Adds a job_proc to work.

# File lib/exceptional_synchrony/limited_work_queue.rb, line 14
def add!(proc = nil, &block)
  job = proc || block
  job.respond_to?(:call) or raise "Must respond_to?(:call)! #{job.inspect}"
  if @job_procs.any? && job.respond_to?(:merge) && (merged_queue = job.merge(@job_procs))
    @job_procs = merged_queue
  else
    @job_procs << job
  end
  work! unless paused?
end
Also aliased as: add
items() click to toggle source
# File lib/exceptional_synchrony/limited_work_queue.rb, line 50
def items
  @job_procs
end
pause!() click to toggle source
# File lib/exceptional_synchrony/limited_work_queue.rb, line 42
def pause!
  @paused = true
end
paused?() click to toggle source
# File lib/exceptional_synchrony/limited_work_queue.rb, line 38
def paused?
  @paused
end
queue_empty?() click to toggle source
# File lib/exceptional_synchrony/limited_work_queue.rb, line 34
def queue_empty?
  @job_procs.empty?
end
unpause!() click to toggle source
# File lib/exceptional_synchrony/limited_work_queue.rb, line 46
def unpause!
  @paused = false
end
work!() click to toggle source
# File lib/exceptional_synchrony/limited_work_queue.rb, line 54
def work!
  until queue_empty? || workers_full?
    job_proc = @job_procs.shift
    @worker_count += 1
    Fiber.new do
      begin
        job_proc.call
      rescue => ex
        ExceptionHandling.log_error(ex, "LimitedWorkQueue encountered an exception")
      ensure
        worker_done
      end
    end.resume
  end
end
workers_empty?() click to toggle source
# File lib/exceptional_synchrony/limited_work_queue.rb, line 26
def workers_empty?
  @worker_count.zero?
end
workers_full?() click to toggle source
# File lib/exceptional_synchrony/limited_work_queue.rb, line 30
def workers_full?
  @worker_count >= @limit
end

Private Instance Methods

worker_done() click to toggle source
# File lib/exceptional_synchrony/limited_work_queue.rb, line 71
def worker_done
  @worker_count -= 1
  work!
end