class LogStash::SizedQueueTimeout

Minimal subset implement of a SizedQueue supporting a timeout option on the lock.

This will be part of the main Logstash's sized queue

Constants

DEFAULT_TIMEOUT

Public Class Methods

new(max_size, options = {}) click to toggle source
# File lib/logstash/sized_queue_timeout.rb, line 14
def initialize(max_size, options = {})
  # `concurrent-ruby` are deprecating the `Condition`
  # in favor of a Synchonization class that you need to implement.
  # this was bit overkill to only check if the wait did a timeout.
  @condition_in = ConditionVariable.new
  @condition_out = ConditionVariable.new

  @max_size = max_size
  @queue = []
  @mutex = Mutex.new
end

Public Instance Methods

<<(obj, timeout = DEFAULT_TIMEOUT)
Alias for: push
pop_no_timeout() click to toggle source
# File lib/logstash/sized_queue_timeout.rb, line 48
def pop_no_timeout
  @mutex.synchronize do
    @condition_in.wait(@mutex) while @queue.empty? # Wake up check

    obj = @queue.shift
    @condition_out.signal

    return obj
  end
end
push(obj, timeout = DEFAULT_TIMEOUT) click to toggle source
# File lib/logstash/sized_queue_timeout.rb, line 26
def push(obj, timeout = DEFAULT_TIMEOUT)
  @mutex.synchronize do
    while full? # wake up check
      start_time = Concurrent.monotonic_time
      @condition_out.wait(@mutex, timeout) 
      if start_time + timeout - Concurrent.monotonic_time  < 0
        raise TimeoutError
      end
    end

    @queue << obj
    @condition_in.signal

    return obj
  end
end
Also aliased as: <<
size() click to toggle source
# File lib/logstash/sized_queue_timeout.rb, line 44
def size
  @mutex.synchronize { @queue.size }
end

Private Instance Methods

full?() click to toggle source
# File lib/logstash/sized_queue_timeout.rb, line 60
def full?
  @queue.size == @max_size
end