class Aws::S3::MultipartFileUploader

@api private

Constants

CREATE_OPTIONS

@api private

FILE_TOO_SMALL
MAX_PARTS
MIN_PART_SIZE
THREAD_COUNT
UPLOAD_PART_OPTIONS

@api private

Attributes

client[R]

@return [Client]

Public Class Methods

new(options = {}) click to toggle source

@option options [Client] :client @option options [Integer] :thread_count (THREAD_COUNT)

# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 31
def initialize(options = {})
  @client = options[:client] || Client.new
  @thread_count = options[:thread_count] || THREAD_COUNT
end

Public Instance Methods

upload(source, options = {}) click to toggle source

@param [String, Pathname, File, Tempfile] source The file to upload. @option options [required, String] :bucket The bucket to upload to. @option options [required, String] :key The key for the object. @option options [Proc] :progress_callback

A Proc that will be called when each chunk of the upload is sent.
It will be invoked with [bytes_read], [total_sizes]

@return [void]

# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 46
def upload(source, options = {})
  if File.size(source) < MIN_PART_SIZE
    raise ArgumentError, FILE_TOO_SMALL
  else
    upload_id = initiate_upload(options)
    parts = upload_parts(upload_id, source, options)
    complete_upload(upload_id, parts, options)
  end
end

Private Instance Methods

abort_upload(upload_id, options, errors) click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 82
def abort_upload(upload_id, options, errors)
  @client.abort_multipart_upload(
    bucket: options[:bucket],
    key: options[:key],
    upload_id: upload_id
  )
  msg = "multipart upload failed: #{errors.map(&:message).join("; ")}"
  raise MultipartUploadError.new(msg, errors)
rescue MultipartUploadError => error
  raise error
rescue => error
  msg = "failed to abort multipart upload: #{error.message}"
  raise MultipartUploadError.new(msg, errors + [error])
end
complete_upload(upload_id, parts, options) click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 62
def complete_upload(upload_id, parts, options)
  @client.complete_multipart_upload(
    bucket: options[:bucket],
    key: options[:key],
    upload_id: upload_id,
    multipart_upload: { parts: parts }
  )
end
compute_default_part_size(source_size) click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 165
def compute_default_part_size(source_size)
  [(source_size.to_f / MAX_PARTS).ceil, MIN_PART_SIZE].max.to_i
end
compute_parts(upload_id, source, options) click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 97
def compute_parts(upload_id, source, options)
  size = File.size(source)
  default_part_size = compute_default_part_size(size)
  offset = 0
  part_number = 1
  parts = []
  while offset < size
    parts << upload_part_opts(options).merge(
      upload_id: upload_id,
      part_number: part_number,
      body: FilePart.new(
        source: source,
        offset: offset,
        size: part_size(size, default_part_size, offset)
      )
    )
    part_number += 1
    offset += default_part_size
  end
  parts
end
create_opts(options) click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 119
def create_opts(options)
  CREATE_OPTIONS.inject({}) do |hash, key|
    hash[key] = options[key] if options.key?(key)
    hash
  end
end
initiate_upload(options) click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 58
def initiate_upload(options)
  @client.create_multipart_upload(create_opts(options)).upload_id
end
part_size(total_size, part_size, offset) click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 169
def part_size(total_size, part_size, offset)
  if offset + part_size > total_size
    total_size - offset
  else
    part_size
  end
end
upload_in_threads(pending, completed, options) click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 133
def upload_in_threads(pending, completed, options)
  threads = []
  if (callback = options[:progress_callback])
    progress = MultipartProgress.new(pending, callback)
  end
  @thread_count.times do
    thread = Thread.new do
      begin
        while part = pending.shift
          if progress
            part[:on_chunk_sent] =
              proc do |_chunk, bytes, _total|
                progress.call(part[:part_number], bytes)
              end
          end
          resp = @client.upload_part(part)
          part[:body].close
          completed.push(etag: resp.etag, part_number: part[:part_number])
        end
        nil
      rescue => error
        # keep other threads from uploading other parts
        pending.clear!
        error
      end
    end
    thread.abort_on_exception = true
    threads << thread
  end
  threads.map(&:value).compact
end
upload_part_opts(options) click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 126
def upload_part_opts(options)
  UPLOAD_PART_OPTIONS.inject({}) do |hash, key|
    hash[key] = options[key] if options.key?(key)
    hash
  end
end
upload_parts(upload_id, source, options) click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 71
def upload_parts(upload_id, source, options)
  pending = PartList.new(compute_parts(upload_id, source, options))
  completed = PartList.new
  errors = upload_in_threads(pending, completed, options)
  if errors.empty?
    completed.to_a.sort_by { |part| part[:part_number] }
  else
    abort_upload(upload_id, options, errors)
  end
end