class Chore::Strategy::Batcher

Handles holding jobs in memory until such time as the batch has become full, per the developers configured threshold, or enough time elapses that Chore determines to not wait any longer (20 seconds by default)

Attributes

batch[RW]
callback[RW]

Public Class Methods

new(size) click to toggle source
# File lib/chore/strategies/consumer/batcher.rb, line 10
def initialize(size)
  @size = size
  @batch = []
  @mutex = Mutex.new
  @callback = nil
  @running = true
end

Public Instance Methods

add(item) click to toggle source

Adds the item to the current batch

# File lib/chore/strategies/consumer/batcher.rb, line 45
def add(item)
  @batch << item
  execute if ready?
end
execute(force = false) click to toggle source

Calls for the batch to be executed. If force is set to true, the batch will execute even if it is not full yet

# File lib/chore/strategies/consumer/batcher.rb, line 51
def execute(force = false)
  batch = nil
  @mutex.synchronize do
    if force || ready?
      batch = @batch.slice!(0...@size)
    end
  end

  if batch && !batch.empty?
    @callback.call(batch)
  end
end
ready?() click to toggle source

Determines if the batch is ready to fire, by comparing it's size to the configured batch_size

# File lib/chore/strategies/consumer/batcher.rb, line 65
def ready?
  @batch.size >= @size
end
schedule(batch_timeout) click to toggle source

The main entry point of the Batcher, schedule begins a thread with the provided batch_timeout as the only argument. While the Batcher is running, it will attempt to check if either the batch is full, or if the batch_timeout has elapsed since the oldest message was added. If either case is true, the items in the batch will be executed.

Calling stop will cause the thread to finish it's current check, and exit

# File lib/chore/strategies/consumer/batcher.rb, line 24
def schedule(batch_timeout)
  @thread = Thread.new(batch_timeout) do |timeout|
    Chore.logger.info "Batching thread starting with #{batch_timeout} second timeout"
    while @running do
      begin
        oldest_item = @batch.first
        timestamp = oldest_item && oldest_item.created_at
        Chore.logger.debug "Oldest message in batch: #{timestamp}, size: #{@batch.size}"
        if timestamp && Time.now > (timestamp + timeout)
          Chore.logger.debug "Batching timeout reached (#{timestamp + timeout}), current size: #{@batch.size}"
          self.execute(true)
        end
        sleep(1)
      rescue => e
        Chore.logger.error "Batcher#schedule raised an exception: #{e.inspect}"
      end
    end
  end
end
stop() click to toggle source

Sets a flag which will begin shutting down the Batcher

# File lib/chore/strategies/consumer/batcher.rb, line 70
def stop
  @running = false
end