module GirlFriday
QUEUE1 = GirlFriday::Queue.new
(‘ham_cannon’, :size => 15) do |msg|
puts msg
end QUEUE2 = GirlFriday::Queue.new
(‘image_crawler’, :size => 5) do |msg|
puts msg
end
Constants
- Queue
- VERSION
Public Class Methods
add_queue(ref)
click to toggle source
# File lib/girl_friday.rb, line 24 def self.add_queue(ref) @lock.synchronize do @queues ||= [] @queues.reject! { |q| !q.weakref_alive? } @queues << ref end end
queues()
click to toggle source
# File lib/girl_friday.rb, line 38 def self.queues @queues || [] end
remove_queue(ref)
click to toggle source
# File lib/girl_friday.rb, line 32 def self.remove_queue(ref) @lock.synchronize do @queues.delete ref end end
shutdown!(timeout=30)
click to toggle source
Notify girl_friday to shutdown ASAP. Workers will not pick up any new work; any new work pushed onto the queues will be pushed onto the backlog (and persisted). This method will block until all queues are quiet or the timeout has passed.
Note that shutdown! just works with existing queues. If you create a new queue, it will act as normal.
# File lib/girl_friday.rb, line 60 def self.shutdown!(timeout=30) qs = queues.select { |q| q.weakref_alive? } count = qs.size if count > 0 m = Mutex.new var = ConditionVariable.new qs.each do |q| next if !q.weakref_alive? begin q.__getobj__.shutdown do |queue| m.synchronize do count -= 1 var.signal if count == 0 end end rescue WeakRef::RefError m.synchronize do count -= 1 var.signal if count == 0 end end end m.synchronize do var.wait(m, timeout) if count != 0 end end count end
status()
click to toggle source
# File lib/girl_friday.rb, line 42 def self.status queues.inject({}) do |memo, queue| begin memo = memo.merge(queue.__getobj__.status) rescue WeakRef::RefError end memo end end