module UploadService

Constants

MAX_BLOCK_SIZE
MAX_PUT_SIZE
THREAD_COUNT

Public Class Methods

extended(base) click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 9
def self.extended(base)
end

Public Instance Methods

block_size(total_size, block_size, offset) click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 101
def block_size(total_size, block_size, offset)
  if offset + block_size > total_size
    total_size - offset
  else
    block_size
  end
end
complete_upload(blocks, options) click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 26
def complete_upload(blocks, options)
  options[:blob_content_type] = options[:content_type]

  self.commit_blob_blocks(options[:container], options[:blob], blocks.map{ |block| [block[:block_id], :uncommitted] }, options)
end
compute_blocks(source, options) click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 44
def compute_blocks(source, options)
  size = File.size(source)
  offset = 0
  block_number = 1
  blocks = []
  while offset < size
    blocks << {
        container: options[:container],
        blob: options[:blob],
        block_id: block_number.to_s.rjust(5, '0'),
        block_number: block_number,
        body: FilePart.new(
            source: source,
            offset: offset,
            size: block_size(size, MAX_BLOCK_SIZE, offset)
        )
    }
    block_number += 1
    offset += MAX_BLOCK_SIZE
  end
  blocks
end
upload(source, options = {}) click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 12
def upload(source, options = {})
  @thread_count = options[:thread_count] || THREAD_COUNT

  size = File.size(source)

  if size <= MAX_PUT_SIZE
    content = File.open(source, 'rb') { |file| file.read }
    self.create_block_blob(options[:container], options[:blob], content)
  else
    blocks = upload_blocks(source, options)
    complete_upload(blocks, options)
  end
end
upload_blocks(source, options) click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 32
def upload_blocks(source, options)
  pending = BlockList.new(compute_blocks(source, options))
  completed = BlockList.new
  errors = upload_in_threads(pending, completed)
  if errors.empty?
    completed.to_a.sort_by { |block| block[:block_number] }
  else
    msg = "multipart upload failed: #{errors.map(&:message).join("; ")}"
    raise BlockUploadError.new(msg, errors)
  end
end
upload_in_threads(pending, completed) click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 67
def upload_in_threads(pending, completed)
  threads = []
  @thread_count.times do
    thread = Thread.new do
      begin
        while block = pending.shift
          content = block[:body].read
          block[:body].close

          options = {}
          options[:content_md5] = Base64.strict_encode64(Digest::MD5.digest(content))
          options[:timeout] = 30

          content_md5 = self.create_blob_block(block[:container], block[:blob], block[:block_id], content, options)

          if content_md5 != options[:content_md5]
            raise "The block is corrupt: block = #{block[:block_id]}"
          end

          completed.push(block_id: block[:block_id], block_number: block[:block_number])
        end
        nil
      rescue => error
        # keep other threads from uploading other parts
        pending.clear!
        error
      end
    end
    thread.abort_on_exception = true
    threads << thread
  end
  threads.map(&:value).compact
end