module Dynamo::Record::BatchWrite::ClassMethods

Public Instance Methods

batch_delete!(records) click to toggle source
# File lib/dynamo/record/batch_write.rb, line 14
def batch_delete!(records)
  batch_method(records, :delete)
end
batch_save!(records) click to toggle source
# File lib/dynamo/record/batch_write.rb, line 10
def batch_save!(records)
  batch_method(records, :save)
end

Private Instance Methods

batch_method(records, type) click to toggle source
# File lib/dynamo/record/batch_write.rb, line 20
def batch_method(records, type)
  requests = records.map do |record|
    BatchRequest.new(record, type)
  end
  process_batch_write_requests(requests)
end
pre_batch(requests) click to toggle source
# File lib/dynamo/record/batch_write.rb, line 71
def pre_batch(requests)
  return [] if requests.empty?

  current_batch_size = 0
  requests.each_with_object([[]]) do |request, batches|
    if batches.last.length >= BATCH_SIZE || current_batch_size + request.request_size > MAX_PAYLOAD_SIZE
      batches.push([])
      current_batch_size = 0
    end
    batches.last.push(request)
    current_batch_size += request.request_size
  end
end
process_batch_write_requests(requests, retry_count = 1) click to toggle source
# File lib/dynamo/record/batch_write.rb, line 27
def process_batch_write_requests(requests, retry_count = 1)
  # Need to batch with every request. This allows for unprocessed items
  # to be batched in with the rest of the requests.
  batched_requests = pre_batch(requests)

  # Need to start the exponential backoff at the first sign of a failed request
  request_failed = false

  # TODO: Can further optimize this request with parallelization
  # i.e. `unprocessed_items = Parallel.map(batched_requests) do |batch|`
  # Note that `Parallel` doesn't seem to play well with Rspec mocks.
  unprocessed_items = batched_requests.each_with_object([]) do |batch, unprocessed|
    # Returns the batch as the defacto unprocessed items if one request
    # has already failed. This will cut down on the number of
    # `ProvisionedThroughputExceededException`s seen
    if request_failed
      unprocessed << batch
    else
      response = write_batch_to_dynamo(batch)
      if response.present?
        request_failed = true
        unprocessed << response
      end
    end
  end
  reprocess_items(unprocessed_items.flatten, retry_count) if unprocessed_items.present?
end
reprocess_items(unprocessed_items, retry_count) click to toggle source
# File lib/dynamo/record/batch_write.rb, line 62
def reprocess_items(unprocessed_items, retry_count)
  if retry_count > max_retries
    raise NumberOfRetriesExceeded.new(unprocessed_items.map(&:record)), 'Number of retries exceeded'
  end

  sleep(rand(1 << retry_count) + 1)
  process_batch_write_requests(unprocessed_items, retry_count + 1)
end
write_batch_to_dynamo(batch) click to toggle source
# File lib/dynamo/record/batch_write.rb, line 55
def write_batch_to_dynamo(batch)
  response = dynamodb_client.batch_write_item(request_items: { table_name => batch.map(&:request) })
  batch.select { |i| response.unprocessed_items[table_name]&.map(&:to_h)&.include?(i.request) }
rescue Aws::DynamoDB::Errors::ProvisionedThroughputExceededException
  batch
end