class Mimi::Messaging::SQS_SNS::TimeoutQueue
TimeoutQueue
solves the problem the native Ruby Queue class has with waiting for elements.
See the excellent blog post discussing the issue: medium.com/workday-engineering/ruby-concurrency-building-a-timeout-queue-5d7c588ca80d
TLDR – using Ruby standard Timeout.timeout() around Queue#pop() is unsafe
Public Class Methods
new()
click to toggle source
# File lib/mimi/messaging/sqs_sns/timeout_queue.rb, line 17 def initialize @elems = [] @mutex = Mutex.new @cond_var = ConditionVariable.new end
Public Instance Methods
<<(elem)
click to toggle source
Pushes an element into the queue
@param elem [Object]
# File lib/mimi/messaging/sqs_sns/timeout_queue.rb, line 27 def <<(elem) @mutex.synchronize do @elems << elem @cond_var.signal end end
Also aliased as: push
pop(blocking = true, timeout = nil)
click to toggle source
Pops an element from the queue in either non-blocking or a blocking (with an optional timeout) way.
@param blocking [true,false] wait for a new element (true) or return immediately @param timeout [nil,Integer] if in blocking mode,
wait at most given number of seconds or forever (nil)
@raise [Timeout::Error] if a timeout in blocking mode was reached
# File lib/mimi/messaging/sqs_sns/timeout_queue.rb, line 43 def pop(blocking = true, timeout = nil) @mutex.synchronize do if blocking if timeout.nil? while @elems.empty? @cond_var.wait(@mutex) end else timeout_time = Time.now.to_f + timeout while @elems.empty? && (remaining_time = timeout_time - Time.now.to_f) > 0 @cond_var.wait(@mutex, remaining_time) end end end raise Timeout::Error, "queue timeout expired" if @elems.empty? @elems.shift end end