class LogStash::SizedQueueTimeout
Minimal subset implement of a SizedQueue supporting a timeout option on the lock.
This will be part of the main Logstash's sized queue
Constants
- DEFAULT_TIMEOUT
Public Class Methods
new(max_size, options = {})
click to toggle source
# File lib/logstash/sized_queue_timeout.rb, line 14 def initialize(max_size, options = {}) # `concurrent-ruby` are deprecating the `Condition` # in favor of a Synchonization class that you need to implement. # this was bit overkill to only check if the wait did a timeout. @condition_in = ConditionVariable.new @condition_out = ConditionVariable.new @max_size = max_size @queue = [] @mutex = Mutex.new end
Public Instance Methods
pop_no_timeout()
click to toggle source
# File lib/logstash/sized_queue_timeout.rb, line 48 def pop_no_timeout @mutex.synchronize do @condition_in.wait(@mutex) while @queue.empty? # Wake up check obj = @queue.shift @condition_out.signal return obj end end
push(obj, timeout = DEFAULT_TIMEOUT)
click to toggle source
# File lib/logstash/sized_queue_timeout.rb, line 26 def push(obj, timeout = DEFAULT_TIMEOUT) @mutex.synchronize do while full? # wake up check start_time = Concurrent.monotonic_time @condition_out.wait(@mutex, timeout) if start_time + timeout - Concurrent.monotonic_time < 0 raise TimeoutError end end @queue << obj @condition_in.signal return obj end end
Also aliased as: <<
size()
click to toggle source
# File lib/logstash/sized_queue_timeout.rb, line 44 def size @mutex.synchronize { @queue.size } end
Private Instance Methods
full?()
click to toggle source
# File lib/logstash/sized_queue_timeout.rb, line 60 def full? @queue.size == @max_size end