class LogStash::Outputs::Qingstor::Uploader::MultipartUploader

Constants

MAXIMAL_PART_SIZE
MINIMAL_PART_SIZE

According to QingStor API doc, the minimal part size is 4MB. the maximal part size is 1GB. Here the minimal size would never be used and the maximal part size is 50MB. Overlarge file size would consume java heap space and throw Exceptions.

Attributes

bucket[R]
logger[R]
upload_headers[RW]

Public Class Methods

new(bucket, logger, object, upload_headers) click to toggle source
# File lib/logstash/outputs/qingstor/multipart_uploader.rb, line 20
def initialize(bucket, logger, object, upload_headers)
  @bucket = bucket
  @logger = logger
  @object = object
  @upload_headers = upload_headers
end

Public Instance Methods

calculate_segment() click to toggle source
# File lib/logstash/outputs/qingstor/multipart_uploader.rb, line 70
def calculate_segment
  segment_size = @object.size
  
  while segment_size >= MAXIMAL_PART_SIZE
    segment_size /= 2.0
    segment_size = segment_size.ceil
  end
        
  segment_size
end
complete_multipart_upload(last_part_number) click to toggle source
# File lib/logstash/outputs/qingstor/multipart_uploader.rb, line 63
def complete_multipart_upload(last_part_number)
  object_parts = (0..last_part_number).to_a.map {|x| {'part_number' => x}}
  @upload_headers['object_parts'] = object_parts
  res = @bucket.complete_multipart_upload(@object.key, @upload_headers)
  @logger.info('multipart uploading completed', :file => @object.key)
end
initiate_multipart_upload() click to toggle source
# File lib/logstash/outputs/qingstor/multipart_uploader.rb, line 33
def initiate_multipart_upload
  res = @bucket.initiate_multipart_upload(@object.key, @upload_headers)
  @upload_headers['upload_id'] = res.fetch(:upload_id) do 
    raise(QingStor::Error, :error_msg => res)
  end
  @logger.debug('initiated upload id', :file => @object.key, 
                                       :upload_id => res[:upload_id])
  res[:upload_id]
end
resume_multipart_upload(upload_id) click to toggle source
# File lib/logstash/outputs/qingstor/multipart_uploader.rb, line 81
def resume_multipart_upload(upload_id)
  @logger.debug('resume multipart uploading', :file => @object.key,
                                              :upload_id => upload_id)
  @upload_headers['upload_id'] = upload_id
  res = @bucket.list_multipart(@object.key, upload_headers)
  segment_size = res[:object_parts][0][:size]
  part_number = res[:object_parts].count
  upload_multipart(part_number, segment_size)
end
upload() click to toggle source
# File lib/logstash/outputs/qingstor/multipart_uploader.rb, line 27
def upload
  initiate_multipart_upload
  segment_size = calculate_segment
  upload_multipart(segment_size)
end
upload_multipart(part_number = 0, segment_size) click to toggle source
# File lib/logstash/outputs/qingstor/multipart_uploader.rb, line 43
def upload_multipart(part_number = 0, segment_size)
  ::File.open(@object.path) do |f|
    f.seek(part_number * segment_size) if part_number != 0
    while data = f.read(segment_size)
      content_md5 = Digest::MD5.hexdigest(data)
      @upload_headers['body'] = data
      @upload_headers['content_length'] = segment_size
      @upload_headers['content_md5'] = content_md5
      @upload_headers['part_number'] = part_number
      @logger.debug('multipart uploading: ',
                    :file => @object.key,
                    :part_number => part_number)
      @bucket.upload_multipart(@object.key, @upload_headers)
      part_number += 1
    end
  end
  
  complete_multipart_upload(part_number - 1)
end