class COS::Download

大文件分片下载, 支持断点续传, 支持多线程 Range Headers support in HTTP1.1(rfc2616)

Constants

PART_SIZE

默认分块大小

READ_SIZE

默认文件读取大小

Attributes

cpt_file[RW]
session[RW]

Public Class Methods

new(opts = {}) click to toggle source
Calls superclass method COS::Checkpoint::new
# File lib/cos/download.rb, line 22
def initialize(opts = {})
  super(opts)

  @file_store = File.expand_path(file_store)
  @cpt_file = options[:cpt_file] || "#{@file_store}.cpt"
end

Public Instance Methods

checkpoint() click to toggle source

断点续传状态记录 @example

states = {
  :session => 'session',
  :file => 'file',
  :file_meta => {
    :sha1 => 'file sha1',
    :size => 10000,
  },
  :parts => [
    {:number => 1, :range => [0, 100], :done => false},
    {:number => 2, :range => [100, 200], :done => true}
  ],
  :sha1 => 'checkpoint file sha1'
}
# File lib/cos/download.rb, line 84
def checkpoint
  logger.debug("Make checkpoint, options[:disable_cpt]: #{options[:disable_cpt] == true}")

  parts = sync_get_all_parts
  states = {
      :session    => session,
      :file       => file_store,
      :file_meta  => @file_meta,
      :parts      => parts
  }

  done = parts.count { |p| p[:done] }

  # 下载进度回调
  if progress
    if done == 0 or parts.count == 0
      progress.call(0.to_f)
    else
      percent = done.to_f / parts.size
      progress.call(percent > 1 ? 1.to_f : percent)
    end
  end

  write_checkpoint(states, cpt_file) unless options[:disable_cpt]

  logger.debug("Download Parts #{done}/#{parts.size}")
end
download() click to toggle source

开始下载

# File lib/cos/download.rb, line 30
def download
  logger.info("Begin download, file: #{file_store}, threads: #{@num_threads}")

  # 重建断点续传
  rebuild

  # 文件分片
  divide_parts if @parts.empty?

  # 未完成的片段
  @todo_parts = @parts.reject { |p| p[:done] }

  # 多线程下载
  (1..@num_threads).map do
    logger.debug("#{@num_threads} Threads Downloads")

    Thread.new do
      logger.debug("Create Thread #{Thread.current.object_id}")

      loop do
        # 获取下一个未下载的片段
        p = sync_get_todo_part
        break unless p

        # 下载片段
        download_part(p)
      end
    end
  end.map(&:join)

  # 完成下载, 合并文件
  complete

  unless finish?
    File.delete(file_store) if File.exist?(file_store)
    raise DownloadError, 'File downloaded sha1 not match, deleted!'
  end
end

Private Instance Methods

complete() click to toggle source
# File lib/cos/download.rb, line 119
def complete
  # 返回100%的进度
  progress.call(1.to_f) if progress

  # 获取全部的分块
  parts = sync_get_all_parts

  # 合并分块文件
  File.open(@file_store, 'w') do |w|
    # 排序组合文件
    parts.sort{ |x, y| x[:number] <=> y[:number] }.each do |p|
      File.open(get_part_file(p)) do |r|
        w.write(r.read(READ_SIZE)) until r.eof?
      end
    end
  end

  # 下载完成, 删除checkpoint文件
  File.delete(cpt_file) unless options[:disable_cpt]
  # 删除分块文件
  parts.each{ |p| File.delete(get_part_file(p)) }

  logger.info("Done download, file: #{@file_store}")
end
divide_parts() click to toggle source

文件片段拆分

# File lib/cos/download.rb, line 205
def divide_parts
  logger.info("Begin divide parts, file: #{file_store}")

  object_size = @file_meta[:size]
  part_size   = @options[:part_size] || PART_SIZE
  num_parts   = (object_size - 1) / part_size + 1

  @parts = (1..num_parts).map do |i|
    {
        :number => i,
        :range => [(i - 1) * part_size, [i * part_size, object_size].min],
        :done => false
    }
  end

  checkpoint

  logger.info("Done divide parts, parts: #{@parts.size}")
end
download_part(p) click to toggle source

下载片段

# File lib/cos/download.rb, line 181
def download_part(p)
  logger.debug("Begin download slice: #{p}")

  part_file = get_part_file(p)

  url = cos_file.url

  # 下载
  # Range:bytes=0-11
  bucket.client.api.download(
      url,
      part_file,
      headers: {Range: "bytes=#{p[:range].at(0)}-#{p[:range].at(1) - 1}"},
      bucket: bucket.bucket_name
  )

  sync_update_part(p.merge(done: true))

  checkpoint

  logger.debug("Done download part: #{p}")
end
finish?() click to toggle source

是否完成下载并比对sha1

# File lib/cos/download.rb, line 115
def finish?
  @file_meta[:sha1].downcase == Util.file_sha1(@file_store)
end
get_part_file(p) click to toggle source

获取分块文件名

# File lib/cos/download.rb, line 247
def get_part_file(p)
  "#{@file_store}.part.#{p[:number]}"
end
initiate() click to toggle source
# File lib/cos/download.rb, line 164
def initiate
  logger.info('Begin initiate session')

  @session = "#{cos_file.bucket.bucket_name}-#{cos_file.path}-#{Time.now.to_i}"

  @file_meta = {
      :sha1  => cos_file.sha,
      :size  => cos_file.filesize
  }

  # 保存断点
  checkpoint

  logger.info("Done initiate session: #{@session}")
end
rebuild() click to toggle source

断点续传文件重建

# File lib/cos/download.rb, line 145
def rebuild
  logger.info("Begin rebuild session, checkpoint: #{cpt_file}")

  # 是否启用断点续传并且记录文件存在
  if options[:disable_cpt] || !File.exist?(cpt_file)
    # 初始化
    initiate
  else
    # 加载断点续传
    states = load_checkpoint(cpt_file)

    @session    = states[:session]
    @file_meta  = states[:file_meta]
    @parts      = states[:parts]
  end

  logger.info("Done rebuild session, Parts: #{@parts.count}")
end
sync_get_all_parts() click to toggle source

同步获取所有片段

# File lib/cos/download.rb, line 240
def sync_get_all_parts
  @all_mutex.synchronize {
    @parts.dup
  }
end
sync_get_todo_part() click to toggle source

同步获取下一片段

# File lib/cos/download.rb, line 226
def sync_get_todo_part
  @todo_mutex.synchronize {
    @todo_parts.shift
  }
end
sync_update_part(p) click to toggle source

同步更新片段

# File lib/cos/download.rb, line 233
def sync_update_part(p)
  @all_mutex.synchronize {
    @parts[p[:number] - 1] = p
  }
end