class BatchQueue
Constants
- VERSION
Public Class Methods
new(max_batch_size: nil, max_interval_seconds: nil, &block)
click to toggle source
starts the queue either max_batch_size or interval_milliseconds or both must be set
# File lib/batch_queue/batch_queue.rb, line 4 def initialize(max_batch_size: nil, max_interval_seconds: nil, &block) if max_batch_size.nil? && max_interval_seconds.nil? raise 'either max_batch_size or max_interval_seconds or both must be set' end @is_running = true @queue = Queue.new @block = block @max_batch_size = max_batch_size @max_interval_seconds = max_interval_seconds @mutex = Mutex.new @cond_var = ConditionVariable.new @runner = Thread.new { run } at_exit do stop end end
Public Instance Methods
push(object)
click to toggle source
# File lib/batch_queue/batch_queue.rb, line 22 def push(object) @mutex.synchronize do raise 'BatchQueue is stopped' unless @is_running @queue.push(object) @cond_var.signal end object end
Also aliased as: <<
size()
click to toggle source
# File lib/batch_queue/batch_queue.rb, line 32 def size @mutex.synchronize do @queue.size end end
stop()
click to toggle source
stops the queue and signals to flush remaining queue, blocking until done.
# File lib/batch_queue/batch_queue.rb, line 39 def stop @mutex.synchronize do @is_running = false @cond_var.signal end @runner.join end
Private Instance Methods
process_batch(arr)
click to toggle source
we assume that we have the mutex lock before calling
# File lib/batch_queue/batch_queue.rb, line 80 def process_batch(arr) @mutex.unlock begin @block.call(arr) rescue StandardError => exc puts "BatchQueue: Unhandled exception #{exc.inspect}" ensure @mutex.lock end end
run()
click to toggle source
# File lib/batch_queue/batch_queue.rb, line 49 def run @mutex.synchronize do t0 = Time.now while @is_running do while (@queue.size >= @max_batch_size) || (!@max_interval_seconds.nil? && @queue.size > 0 && Time.now >= t0 + @max_interval_seconds) do arr = take_batch process_batch(arr) end t0 = Time.now @cond_var.wait(@mutex, @max_interval_seconds) end # exiting while @queue.size > 0 arr = take_batch process_batch(arr) end end end def take_batch arr = [] [@queue.size, @max_batch_size].min.times do arr << @queue.pop end arr end # we assume that we have the mutex lock before calling def process_batch(arr) @mutex.unlock begin @block.call(arr) rescue StandardError => exc puts "BatchQueue: Unhandled exception #{exc.inspect}" ensure @mutex.lock end end end
take_batch()
click to toggle source
# File lib/batch_queue/batch_queue.rb, line 71 def take_batch arr = [] [@queue.size, @max_batch_size].min.times do arr << @queue.pop end arr end