class Aws::S3::MultipartStreamUploader
@api private
Constants
- CREATE_OPTIONS
@api private
- PART_SIZE
api private
- TEMPFILE_PREIX
api private
- THREAD_COUNT
api private
- UPLOAD_PART_OPTIONS
@api private
Attributes
client[R]
@return [Client]
Public Class Methods
new(options = {})
click to toggle source
@option options [Client] :client
# File lib/aws-sdk-s3/multipart_stream_uploader.rb, line 30 def initialize(options = {}) @client = options[:client] || Client.new @tempfile = options[:tempfile] @part_size = options[:part_size] || PART_SIZE @thread_count = options[:thread_count] || THREAD_COUNT end
Public Instance Methods
upload(options = {}, &block)
click to toggle source
@option options [required,String] :bucket @option options [required,String] :key @return [void]
# File lib/aws-sdk-s3/multipart_stream_uploader.rb, line 43 def upload(options = {}, &block) upload_id = initiate_upload(options) parts = upload_parts(upload_id, options, &block) complete_upload(upload_id, parts, options) end
Private Instance Methods
abort_upload(upload_id, options, errors)
click to toggle source
# File lib/aws-sdk-s3/multipart_stream_uploader.rb, line 87 def abort_upload(upload_id, options, errors) @client.abort_multipart_upload( bucket: options[:bucket], key: options[:key], upload_id: upload_id ) msg = "multipart upload failed: #{errors.map(&:message).join("; ")}" raise MultipartUploadError.new(msg, errors) rescue MultipartUploadError => error raise error rescue => error msg = "failed to abort multipart upload: #{error.message}" raise MultipartUploadError.new(msg, errors + [error]) end
complete_upload(upload_id, parts, options)
click to toggle source
# File lib/aws-sdk-s3/multipart_stream_uploader.rb, line 55 def complete_upload(upload_id, parts, options) @client.complete_multipart_upload( bucket: options[:bucket], key: options[:key], upload_id: upload_id, multipart_upload: { parts: parts }) end
create_opts(options)
click to toggle source
# File lib/aws-sdk-s3/multipart_stream_uploader.rb, line 102 def create_opts(options) CREATE_OPTIONS.inject({}) do |hash, key| hash[key] = options[key] if options.key?(key) hash end end
initiate_upload(options)
click to toggle source
# File lib/aws-sdk-s3/multipart_stream_uploader.rb, line 51 def initiate_upload(options) @client.create_multipart_upload(create_opts(options)).upload_id end
read_to_part_body(read_pipe)
click to toggle source
# File lib/aws-sdk-s3/multipart_stream_uploader.rb, line 116 def read_to_part_body(read_pipe) return if read_pipe.closed? temp_io = @tempfile ? Tempfile.new(TEMPFILE_PREIX) : StringIO.new(String.new) temp_io.binmode bytes_copied = IO.copy_stream(read_pipe, temp_io, @part_size) temp_io.rewind if bytes_copied == 0 if Tempfile === temp_io temp_io.close temp_io.unlink end nil else temp_io end end
upload_in_threads(read_pipe, completed, options)
click to toggle source
# File lib/aws-sdk-s3/multipart_stream_uploader.rb, line 133 def upload_in_threads(read_pipe, completed, options) mutex = Mutex.new part_number = 0 @thread_count.times.map do thread = Thread.new do begin loop do body, thread_part_number = mutex.synchronize do [read_to_part_body(read_pipe), part_number += 1] end break unless (body || thread_part_number == 1) begin part = options.merge( body: body, part_number: thread_part_number, ) resp = @client.upload_part(part) completed << {etag: resp.etag, part_number: part[:part_number]} ensure if Tempfile === body body.close body.unlink elsif StringIO === body body.string.clear end end end nil rescue => error # keep other threads from uploading other parts mutex.synchronize { read_pipe.close_read unless read_pipe.closed? } error end end thread.abort_on_exception = true thread end end
upload_part_opts(options)
click to toggle source
# File lib/aws-sdk-s3/multipart_stream_uploader.rb, line 109 def upload_part_opts(options) UPLOAD_PART_OPTIONS.inject({}) do |hash, key| hash[key] = options[key] if options.key?(key) hash end end
upload_parts(upload_id, options, &block)
click to toggle source
# File lib/aws-sdk-s3/multipart_stream_uploader.rb, line 63 def upload_parts(upload_id, options, &block) completed = Queue.new errors = begin IO.pipe do |read_pipe, write_pipe| threads = upload_in_threads(read_pipe, completed, upload_part_opts(options).merge(upload_id: upload_id)) begin block.call(write_pipe) ensure # Ensure the pipe is closed to avoid https://github.com/jruby/jruby/issues/6111 write_pipe.close end threads.map(&:value).compact end rescue => e [e] end if errors.empty? Array.new(completed.size) { completed.pop }.sort_by { |part| part[:part_number] } else abort_upload(upload_id, options, errors) end end