class Rabbitek::Batcher

A service to group messages from queue by batches.

Public Class Methods

new(consumer) click to toggle source
# File lib/rabbitek/server/batcher.rb, line 7
def initialize(consumer)
  @consumer = consumer
  @batch_size = consumer.opts[:batch][:of]
  @batch = []
end

Public Instance Methods

perform(message) { |batch| ... } click to toggle source
# File lib/rabbitek/server/batcher.rb, line 13
def perform(message)
  collect_batch(message)
  yield(@batch)
rescue StandardError
  retry_all_messages
  raise
end

Private Instance Methods

collect_batch(message) click to toggle source
# File lib/rabbitek/server/batcher.rb, line 23
def collect_batch(message)
  loop do
    @batch << message
    break if @batch.size >= @batch_size # stop collecting batch when maximum batch size has been reached

    message = @consumer.pop_message_manually
    break unless message # stop collecting batch when there are no more messages waiting
  end
end
retry_all_messages() click to toggle source
# File lib/rabbitek/server/batcher.rb, line 33
def retry_all_messages
  @batch.each { |message| Rabbitek::Retryer.call(@consumer, message) }
end