module GirlFriday

QUEUE1 = GirlFriday::Queue.new(‘ham_cannon’, :size => 15) do |msg|

puts msg

end QUEUE2 = GirlFriday::Queue.new(‘image_crawler’, :size => 5) do |msg|

puts msg

end

Constants

Queue
VERSION

Public Class Methods

add_queue(ref) click to toggle source
# File lib/girl_friday.rb, line 24
def self.add_queue(ref)
  @lock.synchronize do
    @queues ||= []
    @queues.reject! { |q| !q.weakref_alive? }
    @queues << ref
  end
end
queues() click to toggle source
# File lib/girl_friday.rb, line 38
def self.queues
  @queues || []
end
remove_queue(ref) click to toggle source
# File lib/girl_friday.rb, line 32
def self.remove_queue(ref)
  @lock.synchronize do
    @queues.delete ref
  end
end
shutdown!(timeout=30) click to toggle source

Notify girl_friday to shutdown ASAP. Workers will not pick up any new work; any new work pushed onto the queues will be pushed onto the backlog (and persisted). This method will block until all queues are quiet or the timeout has passed.

Note that shutdown! just works with existing queues. If you create a new queue, it will act as normal.

# File lib/girl_friday.rb, line 60
def self.shutdown!(timeout=30)
  qs = queues.select { |q| q.weakref_alive? }
  count = qs.size

  if count > 0
    m = Mutex.new
    var = ConditionVariable.new

    qs.each do |q|
      next if !q.weakref_alive?
      begin
        q.__getobj__.shutdown do |queue|
          m.synchronize do
            count -= 1
            var.signal if count == 0
          end
        end
      rescue WeakRef::RefError
        m.synchronize do
          count -= 1
          var.signal if count == 0
        end
      end
    end

    m.synchronize do
      var.wait(m, timeout) if count != 0
    end
  end
  count
end
status() click to toggle source
# File lib/girl_friday.rb, line 42
def self.status
  queues.inject({}) do |memo, queue|
    begin
      memo = memo.merge(queue.__getobj__.status)
    rescue WeakRef::RefError
    end
    memo
  end
end