class DatWorkerPool::DefaultQueue

Attributes

on_pop_callbacks[R]
on_push_callbacks[R]

Public Class Methods

new() click to toggle source
# File lib/dat-worker-pool/default_queue.rb, line 12
def initialize
  @work_items = LockedArray.new
  @cond_var   = ConditionVariable.new

  @on_push_callbacks = []
  @on_pop_callbacks  = []
end

Public Instance Methods

empty?() click to toggle source
# File lib/dat-worker-pool/default_queue.rb, line 21
def empty?;     @work_items.empty?; end
on_pop(&block) click to toggle source
# File lib/dat-worker-pool/default_queue.rb, line 24
def on_pop(&block);  @on_pop_callbacks  << block; end
on_push(&block) click to toggle source
# File lib/dat-worker-pool/default_queue.rb, line 23
def on_push(&block); @on_push_callbacks << block; end
work_items() click to toggle source
# File lib/dat-worker-pool/default_queue.rb, line 20
def work_items; @work_items.values; end

Private Instance Methods

pop!() click to toggle source

check if the queue is empty, if so sleep (`@cond_var.wait(@mutex)`) until signaled (via `push!` or `shutdown!`); once a work item is available pop it from the front and return it; if shutdown, return `nil` which the workers will ignore

# File lib/dat-worker-pool/default_queue.rb, line 47
def pop!
  work_item = @work_items.with_lock do |mutex, work_items|
    while !self.shutdown? && work_items.empty?
      @cond_var.wait(mutex)
    end
    work_items.shift unless self.shutdown?
  end
  @on_pop_callbacks.each{ |p| p.call(self, work_item) } if !work_item.nil?
  work_item
end
push!(work_item) click to toggle source

add the work item and wakeup (`@cond_var.signal`) the first sleeping worker (from calling `pop`)

# File lib/dat-worker-pool/default_queue.rb, line 35
def push!(work_item)
  @work_items.with_lock do |mutex, work_items|
    work_items << work_item
    @cond_var.signal
  end
  @on_push_callbacks.each{ |p| p.call(self, work_item) }
end
shutdown!() click to toggle source

wake up workers (`@cond_var.broadcast`) who are sleeping because of `pop`

# File lib/dat-worker-pool/default_queue.rb, line 29
def shutdown!
  @work_items.with_lock{ @cond_var.broadcast }
end