class BatchQueue

Constants

VERSION

Public Class Methods

new(max_batch_size: nil, max_interval_seconds: nil, &block) click to toggle source

starts the queue either max_batch_size or interval_milliseconds or both must be set

# File lib/batch_queue/batch_queue.rb, line 4
def initialize(max_batch_size: nil, max_interval_seconds: nil, &block)
  if max_batch_size.nil? && max_interval_seconds.nil?
    raise 'either max_batch_size or max_interval_seconds or both must be set'
  end
  @is_running = true
  @queue = Queue.new
  @block = block
  @max_batch_size = max_batch_size
  @max_interval_seconds = max_interval_seconds
  @mutex = Mutex.new
  @cond_var = ConditionVariable.new
  @runner = Thread.new { run }

  at_exit do
    stop
  end
end

Public Instance Methods

<<(object)
Alias for: push
push(object) click to toggle source
# File lib/batch_queue/batch_queue.rb, line 22
def push(object)
  @mutex.synchronize do
    raise 'BatchQueue is stopped' unless @is_running
    @queue.push(object)
    @cond_var.signal
  end
  object
end
Also aliased as: <<
size() click to toggle source
# File lib/batch_queue/batch_queue.rb, line 32
def size
  @mutex.synchronize do
    @queue.size
  end
end
stop() click to toggle source

stops the queue and signals to flush remaining queue, blocking until done.

# File lib/batch_queue/batch_queue.rb, line 39
def stop
  @mutex.synchronize do
    @is_running = false
    @cond_var.signal
  end
  @runner.join
end

Private Instance Methods

process_batch(arr) click to toggle source

we assume that we have the mutex lock before calling

# File lib/batch_queue/batch_queue.rb, line 80
def process_batch(arr)
  @mutex.unlock
  begin
    @block.call(arr)
  rescue StandardError => exc
    puts "BatchQueue: Unhandled exception #{exc.inspect}"
  ensure
    @mutex.lock
  end
end
run() click to toggle source
# File lib/batch_queue/batch_queue.rb, line 49
  def run
    @mutex.synchronize do
      t0 = Time.now
      while @is_running do
        while (@queue.size >= @max_batch_size) ||
            (!@max_interval_seconds.nil? && @queue.size > 0 && Time.now >= t0 + @max_interval_seconds) do
          arr = take_batch
          process_batch(arr)
        end
        t0 = Time.now
        @cond_var.wait(@mutex, @max_interval_seconds)
      end

      # exiting
      while @queue.size > 0
        arr = take_batch
        process_batch(arr)
      end
    end
  end


  def take_batch
    arr = []
    [@queue.size, @max_batch_size].min.times do
      arr << @queue.pop
    end
    arr
  end

  # we assume that we have the mutex lock before calling
  def process_batch(arr)
    @mutex.unlock
    begin
      @block.call(arr)
    rescue StandardError => exc
      puts "BatchQueue: Unhandled exception #{exc.inspect}"
    ensure
      @mutex.lock
    end
  end
end
take_batch() click to toggle source
# File lib/batch_queue/batch_queue.rb, line 71
def take_batch
  arr = []
  [@queue.size, @max_batch_size].min.times do
    arr << @queue.pop
  end
  arr
end