class COS::Download
大文件分片下载, 支持断点续传, 支持多线程 Range Headers support in HTTP1.1(rfc2616)
Constants
- PART_SIZE
默认分块大小
- READ_SIZE
默认文件读取大小
Attributes
cpt_file[RW]
session[RW]
Public Class Methods
new(opts = {})
click to toggle source
Calls superclass method
COS::Checkpoint::new
# File lib/cos/download.rb, line 22 def initialize(opts = {}) super(opts) @file_store = File.expand_path(file_store) @cpt_file = options[:cpt_file] || "#{@file_store}.cpt" end
Public Instance Methods
checkpoint()
click to toggle source
断点续传状态记录 @example
states = { :session => 'session', :file => 'file', :file_meta => { :sha1 => 'file sha1', :size => 10000, }, :parts => [ {:number => 1, :range => [0, 100], :done => false}, {:number => 2, :range => [100, 200], :done => true} ], :sha1 => 'checkpoint file sha1' }
# File lib/cos/download.rb, line 84 def checkpoint logger.debug("Make checkpoint, options[:disable_cpt]: #{options[:disable_cpt] == true}") parts = sync_get_all_parts states = { :session => session, :file => file_store, :file_meta => @file_meta, :parts => parts } done = parts.count { |p| p[:done] } # 下载进度回调 if progress if done == 0 or parts.count == 0 progress.call(0.to_f) else percent = done.to_f / parts.size progress.call(percent > 1 ? 1.to_f : percent) end end write_checkpoint(states, cpt_file) unless options[:disable_cpt] logger.debug("Download Parts #{done}/#{parts.size}") end
download()
click to toggle source
开始下载
# File lib/cos/download.rb, line 30 def download logger.info("Begin download, file: #{file_store}, threads: #{@num_threads}") # 重建断点续传 rebuild # 文件分片 divide_parts if @parts.empty? # 未完成的片段 @todo_parts = @parts.reject { |p| p[:done] } # 多线程下载 (1..@num_threads).map do logger.debug("#{@num_threads} Threads Downloads") Thread.new do logger.debug("Create Thread #{Thread.current.object_id}") loop do # 获取下一个未下载的片段 p = sync_get_todo_part break unless p # 下载片段 download_part(p) end end end.map(&:join) # 完成下载, 合并文件 complete unless finish? File.delete(file_store) if File.exist?(file_store) raise DownloadError, 'File downloaded sha1 not match, deleted!' end end
Private Instance Methods
complete()
click to toggle source
# File lib/cos/download.rb, line 119 def complete # 返回100%的进度 progress.call(1.to_f) if progress # 获取全部的分块 parts = sync_get_all_parts # 合并分块文件 File.open(@file_store, 'w') do |w| # 排序组合文件 parts.sort{ |x, y| x[:number] <=> y[:number] }.each do |p| File.open(get_part_file(p)) do |r| w.write(r.read(READ_SIZE)) until r.eof? end end end # 下载完成, 删除checkpoint文件 File.delete(cpt_file) unless options[:disable_cpt] # 删除分块文件 parts.each{ |p| File.delete(get_part_file(p)) } logger.info("Done download, file: #{@file_store}") end
divide_parts()
click to toggle source
文件片段拆分
# File lib/cos/download.rb, line 205 def divide_parts logger.info("Begin divide parts, file: #{file_store}") object_size = @file_meta[:size] part_size = @options[:part_size] || PART_SIZE num_parts = (object_size - 1) / part_size + 1 @parts = (1..num_parts).map do |i| { :number => i, :range => [(i - 1) * part_size, [i * part_size, object_size].min], :done => false } end checkpoint logger.info("Done divide parts, parts: #{@parts.size}") end
download_part(p)
click to toggle source
下载片段
# File lib/cos/download.rb, line 181 def download_part(p) logger.debug("Begin download slice: #{p}") part_file = get_part_file(p) url = cos_file.url # 下载 # Range:bytes=0-11 bucket.client.api.download( url, part_file, headers: {Range: "bytes=#{p[:range].at(0)}-#{p[:range].at(1) - 1}"}, bucket: bucket.bucket_name ) sync_update_part(p.merge(done: true)) checkpoint logger.debug("Done download part: #{p}") end
finish?()
click to toggle source
是否完成下载并比对sha1
# File lib/cos/download.rb, line 115 def finish? @file_meta[:sha1].downcase == Util.file_sha1(@file_store) end
get_part_file(p)
click to toggle source
获取分块文件名
# File lib/cos/download.rb, line 247 def get_part_file(p) "#{@file_store}.part.#{p[:number]}" end
initiate()
click to toggle source
# File lib/cos/download.rb, line 164 def initiate logger.info('Begin initiate session') @session = "#{cos_file.bucket.bucket_name}-#{cos_file.path}-#{Time.now.to_i}" @file_meta = { :sha1 => cos_file.sha, :size => cos_file.filesize } # 保存断点 checkpoint logger.info("Done initiate session: #{@session}") end
rebuild()
click to toggle source
断点续传文件重建
# File lib/cos/download.rb, line 145 def rebuild logger.info("Begin rebuild session, checkpoint: #{cpt_file}") # 是否启用断点续传并且记录文件存在 if options[:disable_cpt] || !File.exist?(cpt_file) # 初始化 initiate else # 加载断点续传 states = load_checkpoint(cpt_file) @session = states[:session] @file_meta = states[:file_meta] @parts = states[:parts] end logger.info("Done rebuild session, Parts: #{@parts.count}") end
sync_get_all_parts()
click to toggle source
同步获取所有片段
# File lib/cos/download.rb, line 240 def sync_get_all_parts @all_mutex.synchronize { @parts.dup } end
sync_get_todo_part()
click to toggle source
同步获取下一片段
# File lib/cos/download.rb, line 226 def sync_get_todo_part @todo_mutex.synchronize { @todo_parts.shift } end
sync_update_part(p)
click to toggle source
同步更新片段
# File lib/cos/download.rb, line 233 def sync_update_part(p) @all_mutex.synchronize { @parts[p[:number] - 1] = p } end