class Rabbitek::Batcher
A service to group messages from queue by batches.
Public Class Methods
new(consumer)
click to toggle source
# File lib/rabbitek/server/batcher.rb, line 7 def initialize(consumer) @consumer = consumer @batch_size = consumer.opts[:batch][:of] @batch = [] end
Public Instance Methods
perform(message) { |batch| ... }
click to toggle source
# File lib/rabbitek/server/batcher.rb, line 13 def perform(message) collect_batch(message) yield(@batch) rescue StandardError retry_all_messages raise end
Private Instance Methods
collect_batch(message)
click to toggle source
# File lib/rabbitek/server/batcher.rb, line 23 def collect_batch(message) loop do @batch << message break if @batch.size >= @batch_size # stop collecting batch when maximum batch size has been reached message = @consumer.pop_message_manually break unless message # stop collecting batch when there are no more messages waiting end end
retry_all_messages()
click to toggle source
# File lib/rabbitek/server/batcher.rb, line 33 def retry_all_messages @batch.each { |message| Rabbitek::Retryer.call(@consumer, message) } end