class DatWorkerPool::DefaultQueue
Attributes
on_pop_callbacks[R]
on_push_callbacks[R]
Public Class Methods
new()
click to toggle source
# File lib/dat-worker-pool/default_queue.rb, line 12 def initialize @work_items = LockedArray.new @cond_var = ConditionVariable.new @on_push_callbacks = [] @on_pop_callbacks = [] end
Public Instance Methods
empty?()
click to toggle source
# File lib/dat-worker-pool/default_queue.rb, line 21 def empty?; @work_items.empty?; end
on_pop(&block)
click to toggle source
# File lib/dat-worker-pool/default_queue.rb, line 24 def on_pop(&block); @on_pop_callbacks << block; end
on_push(&block)
click to toggle source
# File lib/dat-worker-pool/default_queue.rb, line 23 def on_push(&block); @on_push_callbacks << block; end
work_items()
click to toggle source
# File lib/dat-worker-pool/default_queue.rb, line 20 def work_items; @work_items.values; end
Private Instance Methods
pop!()
click to toggle source
check if the queue is empty, if so sleep (`@cond_var.wait(@mutex)`) until signaled (via `push!` or `shutdown!`); once a work item is available pop it from the front and return it; if shutdown, return `nil` which the workers will ignore
# File lib/dat-worker-pool/default_queue.rb, line 47 def pop! work_item = @work_items.with_lock do |mutex, work_items| while !self.shutdown? && work_items.empty? @cond_var.wait(mutex) end work_items.shift unless self.shutdown? end @on_pop_callbacks.each{ |p| p.call(self, work_item) } if !work_item.nil? work_item end
push!(work_item)
click to toggle source
add the work item and wakeup (`@cond_var.signal`) the first sleeping worker (from calling `pop`)
# File lib/dat-worker-pool/default_queue.rb, line 35 def push!(work_item) @work_items.with_lock do |mutex, work_items| work_items << work_item @cond_var.signal end @on_push_callbacks.each{ |p| p.call(self, work_item) } end
shutdown!()
click to toggle source
wake up workers (`@cond_var.broadcast`) who are sleeping because of `pop`
# File lib/dat-worker-pool/default_queue.rb, line 29 def shutdown! @work_items.with_lock{ @cond_var.broadcast } end