class COS::Slice
分片大文件上传, 支持断点续传, 支持多线程
Constants
- DEFAULT_SLICE_SIZE
默认分片大小 3M
Attributes
cpt_file[RW]
offset[RW]
result[RW]
session[RW]
slice_size[RW]
Public Class Methods
new(opts = {})
click to toggle source
Calls superclass method
COS::Checkpoint::new
# File lib/cos/slice.rb, line 20 def initialize(opts = {}) super(opts) @cpt_file = options[:cpt_file] || "#{File.expand_path(file_src)}.cpt" end
Public Instance Methods
checkpoint()
click to toggle source
断点续传状态记录 @example
states = { :session => 'session', :offset => 0, :slice_size => 2048, :file => 'file', :file_meta => { :mtime => Time.now, :sha1 => 'file sha1', :size => 10000, }, :parts => [ {:number => 1, :range => [0, 100], :done => false}, {:number => 2, :range => [100, 200], :done => true} ], :sha1 => 'checkpoint file sha1' }
# File lib/cos/slice.rb, line 103 def checkpoint logger.debug("Make checkpoint, options[:disable_cpt]: #{options[:disable_cpt] == true}") ensure_file_not_changed parts = sync_get_all_parts states = { :session => session, :slice_size => slice_size, :offset => offset, :file => file_src, :file_meta => @file_meta, :parts => parts } done = parts.count { |p| p[:done] } # 上传进度回调 if progress percent = (offset + done*slice_size).to_f / states[:file_meta][:size] progress.call(percent > 1 ? 1.to_f : percent) end write_checkpoint(states, cpt_file) unless options[:disable_cpt] logger.debug("Upload Parts #{done}/#{states[:parts].count}") end
upload()
click to toggle source
开始上传
# File lib/cos/slice.rb, line 27 def upload logger.info("Begin upload, file: #{file_src}, threads: #{@num_threads}") # 重建断点续传或重新从服务器初始化分片上传 # 有可能sha命中直接完成 data = rebuild return data if data # 文件分片 divide_parts if @parts.empty? # 未完成的片段 @todo_parts = @parts.reject { |p| p[:done] } # 多线程上传 Thread.abort_on_exception = true threads = [] @num_threads.times do threads << Thread.new do loop do # 获取下一个未上传的片段 p = sync_get_todo_part break unless p # 上传片段 upload_part(p) end end end threads.each do |thread| begin thread.join rescue => error unless finish? # 部分服务端异常需要重新初始化, 可能上传已经完成了 if error.is_a?(ServerError) and error.error_code == -288 File.delete(cpt_file) unless options[:disable_cpt] end threads.each {|t| t.exit} raise error end end end # 返回100%的进度 progress.call(1.to_f) if progress # 上传完成, 删除checkpoint文件 File.delete(cpt_file) unless options[:disable_cpt] logger.info("Done upload, file: #{@file_src}") # 返回文件信息 result end
Private Instance Methods
divide_parts()
click to toggle source
文件片段拆分
# File lib/cos/slice.rb, line 249 def divide_parts logger.info("Begin divide parts, file: #{file_src}") file_size = File.size(file_src) num_parts = (file_size - offset - 1) / slice_size + 1 @parts = (1..num_parts).map do |i| { :number => i, :range => [offset + (i-1) * slice_size, [offset + i * slice_size, file_size].min], :done => false } end checkpoint logger.info("Done divide parts, parts: #{@parts.size}") end
ensure_file_not_changed()
click to toggle source
确保上传中文件没有变化
# File lib/cos/slice.rb, line 289 def ensure_file_not_changed return if File.mtime(file_src) == @file_meta[:mtime] if @file_meta[:sha1] != Util.file_sha1(file_src) # p Util.file_sha1(file_src) raise FileInconsistentError, 'The file to upload is changed' end end
finish?()
click to toggle source
是否完成上传
# File lib/cos/slice.rb, line 134 def finish? result != nil and result[:access_url] != nil end
initiate()
click to toggle source
初始化分块上传
# File lib/cos/slice.rb, line 169 def initiate logger.info('Begin initiate session') bucket = config.get_bucket(options[:bucket]) sign = http.signature.multiple(bucket) resource_path = Util.get_resource_path(config.app_id, bucket, path, file_name) file_size = File.size(file_src) file_sha1 = Util.file_sha1(file_src) payload = { op: 'upload_slice', slice_size: options[:slice_size] || DEFAULT_SLICE_SIZE, sha: file_sha1, filesize: file_size, biz_attr: options[:biz_attr], session: session, multipart: true } resp = http.post(resource_path, {}, sign, payload) # 上一次已传完或秒传成功 return resp if resp[:access_url] @session = resp[:session] @slice_size = resp[:slice_size] @offset = resp[:offset] @file_meta = { :mtime => File.mtime(file_src), :sha1 => file_sha1, :size => file_size } # 保存断点 checkpoint logger.info("Done initiate session: #{@session}") false end
rebuild()
click to toggle source
断点续传文件重建
# File lib/cos/slice.rb, line 139 def rebuild logger.info("Begin rebuild session, checkpoint: #{cpt_file}") # 是否启用断点续传并且记录文件存在 if options[:disable_cpt] || !File.exist?(cpt_file) # 从服务器初始化 data = initiate return data if data else # 加载断点续传 states = load_checkpoint(cpt_file) # 确保上传的文件未变化 if states[:file_sha1] != @file_meta[:sha1] raise FileInconsistentError, 'The file to upload is changed' end @session = states[:session] @file_meta = states[:file_meta] @parts = states[:parts] @slice_size = states[:slice_size] @offset = states[:offset] end logger.info("Done rebuild session, Parts: #{@parts.count}") false end
sync_get_all_parts()
click to toggle source
同步获取所有片段
# File lib/cos/slice.rb, line 282 def sync_get_all_parts @all_mutex.synchronize { @parts.dup } end
sync_get_todo_part()
click to toggle source
同步获取下一片段
# File lib/cos/slice.rb, line 268 def sync_get_todo_part @todo_mutex.synchronize { @todo_parts.shift } end
sync_update_part(p)
click to toggle source
同步更新片段
# File lib/cos/slice.rb, line 275 def sync_update_part(p) @all_mutex.synchronize { @parts[p[:number] - 1] = p } end
upload_part(p)
click to toggle source
上传块
# File lib/cos/slice.rb, line 212 def upload_part(p) logger.debug("Begin upload slice: #{p}") bucket = config.get_bucket(options[:bucket]) sign = http.signature.multiple(bucket) resource_path = Util.get_resource_path(config.app_id, bucket, path, file_name) temp_file = Tempfile.new("#{session}-#{p[:number]}") begin # 复制文件分片至临时文件 IO.copy_stream(file_src, temp_file, p[:range].at(1) - p[:range].at(0), p[:range].at(0)) payload = { op: 'upload_slice', sha: Util.file_sha1(temp_file), offset: p[:range].at(0), session: session, filecontent: temp_file, multipart: true } re = http.post(resource_path, {}, sign, payload) @result = re if re[:access_url] ensure # 确保清除临时文件 temp_file.close temp_file.unlink end sync_update_part(p.merge(done: true)) checkpoint logger.debug("Done upload part: #{p}") end