class CloseableQueue

A wrapper around Queue to provide support for `#close`.

Once closed, threads waiting on dequeue will drain the queue and then receive nil on future dequeues. If close(true) is used, pop from an empty closed queue or attempts to push raise ClosedQueue, a subclass of StopIteration.

Example usage:

 queue = ClosableQueue.new
 consumer = Thread.new { while number = queue.pop ; puts number ; end }
 5.times {|x| queue.push(x) }
 queue.close
 consumer.join

`#close` is thread-safe and can be called safely multiple times.

This is anticipated to be obsolete by Ruby 2.3 with Queue#close.

Constants

VERSION

Public Class Methods

new(limit = nil) click to toggle source

Set up a new queue. limit will use a SizedQueue, default unbounded Queue.

# File lib/closeable_queue.rb, line 32
def initialize(limit = nil)
  @mutex           = Mutex.new
  @waiting         = Set.new
  @num_waiting     = Concurrent::AtomicFixnum.new
  @closed          = Concurrent::AtomicBoolean.new
  @raise_exception = Concurrent::AtomicBoolean.new(false)

  if limit
    @queue = SizedQueue.new(Integer(limit))
  else
    @queue = Queue.new
  end
end

Public Instance Methods

close(raise_exception = false) click to toggle source

Close the queue if it hasn't been already. Wake up waiting threads if any.

# File lib/closeable_queue.rb, line 102
def close(raise_exception = false)
  @raise_exception.make_true if raise_exception
  @mutex.synchronize { @waiting.each(&:wakeup) } if @closed.make_true
  self
end
closed?() click to toggle source

Return true if the queue has been closed

# File lib/closeable_queue.rb, line 97
def closed?
  @closed.true?
end
empty?() click to toggle source

Return true if the queue is empty

# File lib/closeable_queue.rb, line 92
def empty?
  @queue.empty?
end
inspect() click to toggle source
# File lib/closeable_queue.rb, line 46
def inspect
  "#<#{self.class.name} size=#{length} closed=#{closed?} waiting=#{num_waiting}>"
end
length() click to toggle source

Get the number of items remaining on the queue

# File lib/closeable_queue.rb, line 87
def length
  @queue.length
end
name()
Alias for: pop
num_waiting() click to toggle source

Get an atomic snapshot if the number of threads waiting on the queue.

# File lib/closeable_queue.rb, line 82
def num_waiting
  @num_waiting.value
end
pop() click to toggle source

Take the first element off the queue.

If the queue is empty and closed?, return nil, or optionally raise ClosedQueue (a subclass of StopIteration)

# File lib/closeable_queue.rb, line 54
def pop
  @queue.pop(true)
rescue ThreadError
  if closed?
    raise ClosedQueue if @raise_exception.true?
    return nil
  else
    sleep
    retry
  end
end
Also aliased as: name
push(item) click to toggle source

Add an item to the queue and wakeup any sleeping consumers.

If the queue is closed, raises ClosedQueueError.

# File lib/closeable_queue.rb, line 71
def push(item)
  fail ClosedQueueError if closed?

  @queue.push(item)
  wakeup
  self
end
Also aliased as: name

Private Instance Methods

sleep() click to toggle source
# File lib/closeable_queue.rb, line 110
def sleep
  @mutex.synchronize do
    @waiting << Thread.current
    @num_waiting.increment
    @mutex.sleep
    @waiting.delete(Thread.current)
    @num_waiting.decrement
  end
end
wakeup() click to toggle source
# File lib/closeable_queue.rb, line 120
def wakeup
  return if @num_waiting.value.zero?

  @mutex.synchronize do
    @waiting.first.wakeup if @waiting.any?
  end
end