class Aws::S3::MultipartFileUploader
@api private
Constants
- CREATE_OPTIONS
@api private
- FILE_TOO_SMALL
- MAX_PARTS
- MIN_PART_SIZE
- THREAD_COUNT
- UPLOAD_PART_OPTIONS
@api private
Attributes
client[R]
@return [Client]
Public Class Methods
new(options = {})
click to toggle source
@option options [Client] :client @option options [Integer] :thread_count (THREAD_COUNT
)
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 31 def initialize(options = {}) @client = options[:client] || Client.new @thread_count = options[:thread_count] || THREAD_COUNT end
Public Instance Methods
upload(source, options = {})
click to toggle source
@param [String, Pathname, File, Tempfile] source The file to upload. @option options [required, String] :bucket The bucket to upload to. @option options [required, String] :key The key for the object. @option options [Proc] :progress_callback
A Proc that will be called when each chunk of the upload is sent. It will be invoked with [bytes_read], [total_sizes]
@return [void]
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 46 def upload(source, options = {}) if File.size(source) < MIN_PART_SIZE raise ArgumentError, FILE_TOO_SMALL else upload_id = initiate_upload(options) parts = upload_parts(upload_id, source, options) complete_upload(upload_id, parts, options) end end
Private Instance Methods
abort_upload(upload_id, options, errors)
click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 82 def abort_upload(upload_id, options, errors) @client.abort_multipart_upload( bucket: options[:bucket], key: options[:key], upload_id: upload_id ) msg = "multipart upload failed: #{errors.map(&:message).join("; ")}" raise MultipartUploadError.new(msg, errors) rescue MultipartUploadError => error raise error rescue => error msg = "failed to abort multipart upload: #{error.message}" raise MultipartUploadError.new(msg, errors + [error]) end
complete_upload(upload_id, parts, options)
click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 62 def complete_upload(upload_id, parts, options) @client.complete_multipart_upload( bucket: options[:bucket], key: options[:key], upload_id: upload_id, multipart_upload: { parts: parts } ) end
compute_default_part_size(source_size)
click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 165 def compute_default_part_size(source_size) [(source_size.to_f / MAX_PARTS).ceil, MIN_PART_SIZE].max.to_i end
compute_parts(upload_id, source, options)
click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 97 def compute_parts(upload_id, source, options) size = File.size(source) default_part_size = compute_default_part_size(size) offset = 0 part_number = 1 parts = [] while offset < size parts << upload_part_opts(options).merge( upload_id: upload_id, part_number: part_number, body: FilePart.new( source: source, offset: offset, size: part_size(size, default_part_size, offset) ) ) part_number += 1 offset += default_part_size end parts end
create_opts(options)
click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 119 def create_opts(options) CREATE_OPTIONS.inject({}) do |hash, key| hash[key] = options[key] if options.key?(key) hash end end
initiate_upload(options)
click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 58 def initiate_upload(options) @client.create_multipart_upload(create_opts(options)).upload_id end
part_size(total_size, part_size, offset)
click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 169 def part_size(total_size, part_size, offset) if offset + part_size > total_size total_size - offset else part_size end end
upload_in_threads(pending, completed, options)
click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 133 def upload_in_threads(pending, completed, options) threads = [] if (callback = options[:progress_callback]) progress = MultipartProgress.new(pending, callback) end @thread_count.times do thread = Thread.new do begin while part = pending.shift if progress part[:on_chunk_sent] = proc do |_chunk, bytes, _total| progress.call(part[:part_number], bytes) end end resp = @client.upload_part(part) part[:body].close completed.push(etag: resp.etag, part_number: part[:part_number]) end nil rescue => error # keep other threads from uploading other parts pending.clear! error end end thread.abort_on_exception = true threads << thread end threads.map(&:value).compact end
upload_part_opts(options)
click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 126 def upload_part_opts(options) UPLOAD_PART_OPTIONS.inject({}) do |hash, key| hash[key] = options[key] if options.key?(key) hash end end
upload_parts(upload_id, source, options)
click to toggle source
# File lib/aws-sdk-s3/multipart_file_uploader.rb, line 71 def upload_parts(upload_id, source, options) pending = PartList.new(compute_parts(upload_id, source, options)) completed = PartList.new errors = upload_in_threads(pending, completed, options) if errors.empty? completed.to_a.sort_by { |part| part[:part_number] } else abort_upload(upload_id, options, errors) end end