class Backports::FilteredQueue
Like ::Queue
, but with
-
filtering
-
timeout
-
raises on closed queues
Independent from other Ractor
related backports.
Constants
- CONSUME_ON_ESCAPE
Attributes
num_waiting[R]
Public Class Methods
new()
click to toggle source
Timeout processing based on spin.atomicobject.com/2017/06/28/queue-pop-with-timeout-fixed/
# File lib/backports/ractor/filtered_queue.rb, line 34 def initialize @mutex = ::Mutex.new @queue = [] @closed = false @received = ::ConditionVariable.new @num_waiting = 0 end
Public Instance Methods
<<(x)
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 54 def <<(x) @mutex.synchronize do ensure_open @queue << Message.new(x) @received.signal end self end
Also aliased as: push
clear()
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 64 def clear @mutex.synchronize do @queue.clear end self end
close()
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 42 def close @mutex.synchronize do @closed = true @received.broadcast end self end
closed?()
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 50 def closed? @closed end
empty?()
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 85 def empty? avail = @mutex.synchronize do available! end !avail end
pop(timeout: nil, &block)
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 71 def pop(timeout: nil, &block) msg = nil exclude = [] if block # exclusion list of messages rejected by this call timeout_time = timeout + Time.now.to_f if timeout while true do # rubocop:disable Style/InfiniteLoop, Style/WhileUntilDo @mutex.synchronize do reenter if reentrant? msg = acquire!(timeout_time, exclude) return consume!(msg).value unless block end return msg.value if filter?(msg, &block) end end
Protected Instance Methods
closed_queue_value()
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 97 def closed_queue_value ensure_open end
reenter()
click to toggle source
@return if outer message should be consumed or not
# File lib/backports/ractor/filtered_queue.rb, line 102 def reenter true end
timeout_value()
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 93 def timeout_value raise self.class::TimeoutError, "timeout elapsed" end
Private Instance Methods
acquire!(timeout_time, exclude = nil)
click to toggle source
private methods assume @mutex synchonized adds to exclude list
# File lib/backports/ractor/filtered_queue.rb, line 172 def acquire!(timeout_time, exclude = nil) while true do # rubocop:disable Style/InfiniteLoop, Style/WhileUntilDo if (msg = available!(exclude)) msg.reserved = true exclude << msg if exclude return msg end return closed_queue_value if @closed # wait for element or timeout if timeout_time remaining_time = timeout_time - ::Time.now.to_f return timeout_value if remaining_time <= 0 end begin @num_waiting += 1 @received.wait(@mutex, remaining_time) ensure @num_waiting -= 1 end end end
available!(exclude = nil)
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 194 def available!(exclude = nil) @queue.find do |msg| next if exclude && exclude.include?(msg) !msg.reserved end end
commit(msg, consume)
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 135 def commit(msg, consume) @mutex.synchronize do if consume consume!(msg) else reject!(msg) end end end
consume!(msg)
click to toggle source
@returns msg
# File lib/backports/ractor/filtered_queue.rb, line 126 def consume!(msg) @queue.delete(msg) end
consume_on_reentry(msg) { || ... }
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 145 def consume_on_reentry(msg) q_map = current_filtered_queues if (outer_msg = q_map[self]) commit(outer_msg, reenter) end q_map[self] = msg begin yield ensure reentered = !q_map.delete(self) end reentered end
current_filtered_queues()
click to toggle source
@returns Hash
{ FilteredQueue
=> Message
}
# File lib/backports/ractor/filtered_queue.rb, line 164 def current_filtered_queues t = Thread.current t.thread_variable_get(:backports_currently_filtered_queues) or t.thread_variable_set(:backports_currently_filtered_queues, {}.compare_by_identity) end
ensure_open()
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 201 def ensure_open raise self.class::ClosedQueueError, 'queue closed' if @closed end
filter?(msg) { |value)| ... }
click to toggle source
@returns:
-
true if message consumed (block result truthy or due to reentrant call)
-
false if rejected
# File lib/backports/ractor/filtered_queue.rb, line 113 def filter?(msg) consume = self.class::CONSUME_ON_ESCAPE begin reentered = consume_on_reentry(msg) do consume = !!(yield msg.value) end reentered ? reenter : consume ensure commit(msg, consume) unless reentered end end
reentrant?()
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 159 def reentrant? !!current_filtered_queues[self] end
reject!(msg)
click to toggle source
# File lib/backports/ractor/filtered_queue.rb, line 130 def reject!(msg) msg.reserved = false @received.broadcast end