module UploadService
Constants
- MAX_BLOCK_SIZE
- MAX_PUT_SIZE
- THREAD_COUNT
Public Class Methods
extended(base)
click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 9 def self.extended(base) end
Public Instance Methods
block_size(total_size, block_size, offset)
click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 101 def block_size(total_size, block_size, offset) if offset + block_size > total_size total_size - offset else block_size end end
complete_upload(blocks, options)
click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 26 def complete_upload(blocks, options) options[:blob_content_type] = options[:content_type] self.commit_blob_blocks(options[:container], options[:blob], blocks.map{ |block| [block[:block_id], :uncommitted] }, options) end
compute_blocks(source, options)
click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 44 def compute_blocks(source, options) size = File.size(source) offset = 0 block_number = 1 blocks = [] while offset < size blocks << { container: options[:container], blob: options[:blob], block_id: block_number.to_s.rjust(5, '0'), block_number: block_number, body: FilePart.new( source: source, offset: offset, size: block_size(size, MAX_BLOCK_SIZE, offset) ) } block_number += 1 offset += MAX_BLOCK_SIZE end blocks end
upload(source, options = {})
click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 12 def upload(source, options = {}) @thread_count = options[:thread_count] || THREAD_COUNT size = File.size(source) if size <= MAX_PUT_SIZE content = File.open(source, 'rb') { |file| file.read } self.create_block_blob(options[:container], options[:blob], content) else blocks = upload_blocks(source, options) complete_upload(blocks, options) end end
upload_blocks(source, options)
click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 32 def upload_blocks(source, options) pending = BlockList.new(compute_blocks(source, options)) completed = BlockList.new errors = upload_in_threads(pending, completed) if errors.empty? completed.to_a.sort_by { |block| block[:block_number] } else msg = "multipart upload failed: #{errors.map(&:message).join("; ")}" raise BlockUploadError.new(msg, errors) end end
upload_in_threads(pending, completed)
click to toggle source
# File lib/fluent/plugin/upload_service.rb, line 67 def upload_in_threads(pending, completed) threads = [] @thread_count.times do thread = Thread.new do begin while block = pending.shift content = block[:body].read block[:body].close options = {} options[:content_md5] = Base64.strict_encode64(Digest::MD5.digest(content)) options[:timeout] = 30 content_md5 = self.create_blob_block(block[:container], block[:blob], block[:block_id], content, options) if content_md5 != options[:content_md5] raise "The block is corrupt: block = #{block[:block_id]}" end completed.push(block_id: block[:block_id], block_number: block[:block_number]) end nil rescue => error # keep other threads from uploading other parts pending.clear! error end end thread.abort_on_exception = true threads << thread end threads.map(&:value).compact end