class Bunny::Concurrent::ContinuationQueue

Continuation queue implementation for MRI and Rubinius

@private

Public Class Methods

new() click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 9
def initialize
  @q    = []
  @lock = ::Mutex.new
  @cond = ::ConditionVariable.new
end

Public Instance Methods

<<(item)
Alias for: push
clear() click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 47
def clear
  @lock.synchronize do
    @q.clear
  end
end
empty?() click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 53
def empty?
  @q.empty?
end
length()
Alias for: size
poll(timeout_in_ms = nil) click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 27
def poll(timeout_in_ms = nil)
  timeout_in_sec = timeout_in_ms ? timeout_in_ms / 1000.0 : nil

  @lock.synchronize do
    started_at = Bunny::Timestamp.monotonic
    while @q.empty?
      wait = !(timeout_in_sec.nil?)
      @cond.wait(@lock, timeout_in_sec)

      if wait
        ended_at = Bunny::Timestamp.monotonic
        elapsed = ended_at - started_at
        raise ::Timeout::Error if (elapsed > timeout_in_sec)
      end
    end
    item = @q.shift
    item
  end
end
pop() click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 23
def pop
  poll
end
push(item) click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 15
def push(item)
  @lock.synchronize do
    @q.push(item)
    @cond.signal
  end
end
Also aliased as: <<
size() click to toggle source
# File lib/bunny/concurrent/continuation_queue.rb, line 57
def size
  @q.size
end
Also aliased as: length