module Dynamo::Record::BatchWrite::ClassMethods
Public Instance Methods
batch_delete!(records)
click to toggle source
# File lib/dynamo/record/batch_write.rb, line 14 def batch_delete!(records) batch_method(records, :delete) end
batch_save!(records)
click to toggle source
# File lib/dynamo/record/batch_write.rb, line 10 def batch_save!(records) batch_method(records, :save) end
Private Instance Methods
batch_method(records, type)
click to toggle source
# File lib/dynamo/record/batch_write.rb, line 20 def batch_method(records, type) requests = records.map do |record| BatchRequest.new(record, type) end process_batch_write_requests(requests) end
pre_batch(requests)
click to toggle source
# File lib/dynamo/record/batch_write.rb, line 71 def pre_batch(requests) return [] if requests.empty? current_batch_size = 0 requests.each_with_object([[]]) do |request, batches| if batches.last.length >= BATCH_SIZE || current_batch_size + request.request_size > MAX_PAYLOAD_SIZE batches.push([]) current_batch_size = 0 end batches.last.push(request) current_batch_size += request.request_size end end
process_batch_write_requests(requests, retry_count = 1)
click to toggle source
# File lib/dynamo/record/batch_write.rb, line 27 def process_batch_write_requests(requests, retry_count = 1) # Need to batch with every request. This allows for unprocessed items # to be batched in with the rest of the requests. batched_requests = pre_batch(requests) # Need to start the exponential backoff at the first sign of a failed request request_failed = false # TODO: Can further optimize this request with parallelization # i.e. `unprocessed_items = Parallel.map(batched_requests) do |batch|` # Note that `Parallel` doesn't seem to play well with Rspec mocks. unprocessed_items = batched_requests.each_with_object([]) do |batch, unprocessed| # Returns the batch as the defacto unprocessed items if one request # has already failed. This will cut down on the number of # `ProvisionedThroughputExceededException`s seen if request_failed unprocessed << batch else response = write_batch_to_dynamo(batch) if response.present? request_failed = true unprocessed << response end end end reprocess_items(unprocessed_items.flatten, retry_count) if unprocessed_items.present? end
reprocess_items(unprocessed_items, retry_count)
click to toggle source
# File lib/dynamo/record/batch_write.rb, line 62 def reprocess_items(unprocessed_items, retry_count) if retry_count > max_retries raise NumberOfRetriesExceeded.new(unprocessed_items.map(&:record)), 'Number of retries exceeded' end sleep(rand(1 << retry_count) + 1) process_batch_write_requests(unprocessed_items, retry_count + 1) end
write_batch_to_dynamo(batch)
click to toggle source
# File lib/dynamo/record/batch_write.rb, line 55 def write_batch_to_dynamo(batch) response = dynamodb_client.batch_write_item(request_items: { table_name => batch.map(&:request) }) batch.select { |i| response.unprocessed_items[table_name]&.map(&:to_h)&.include?(i.request) } rescue Aws::DynamoDB::Errors::ProvisionedThroughputExceededException batch end