class COS::Slice

分片大文件上传, 支持断点续传, 支持多线程

Constants

DEFAULT_SLICE_SIZE

默认分片大小 3M

Attributes

cpt_file[RW]
offset[RW]
result[RW]
session[RW]
slice_size[RW]

Public Class Methods

new(opts = {}) click to toggle source
Calls superclass method COS::Checkpoint::new
# File lib/cos/slice.rb, line 20
def initialize(opts = {})
  super(opts)

  @cpt_file = options[:cpt_file] || "#{File.expand_path(file_src)}.cpt"
end

Public Instance Methods

checkpoint() click to toggle source

断点续传状态记录 @example

states = {
  :session => 'session',
  :offset => 0,
  :slice_size => 2048,
  :file => 'file',
  :file_meta => {
    :mtime => Time.now,
    :sha1 => 'file sha1',
    :size => 10000,
  },
  :parts => [
    {:number => 1, :range => [0, 100], :done => false},
    {:number => 2, :range => [100, 200], :done => true}
  ],
  :sha1 => 'checkpoint file sha1'
}
# File lib/cos/slice.rb, line 103
def checkpoint
  logger.debug("Make checkpoint, options[:disable_cpt]: #{options[:disable_cpt] == true}")

  ensure_file_not_changed

  parts = sync_get_all_parts
  states = {
      :session    => session,
      :slice_size => slice_size,
      :offset     => offset,
      :file       => file_src,
      :file_meta  => @file_meta,
      :parts      => parts
  }

  done = parts.count { |p| p[:done] }

  # 上传进度回调
  if progress
    percent = (offset + done*slice_size).to_f / states[:file_meta][:size]
    progress.call(percent > 1 ? 1.to_f : percent)
  end

  write_checkpoint(states, cpt_file) unless options[:disable_cpt]

  logger.debug("Upload Parts #{done}/#{states[:parts].count}")
end
upload() click to toggle source

开始上传

# File lib/cos/slice.rb, line 27
def upload
  logger.info("Begin upload, file: #{file_src}, threads: #{@num_threads}")

  # 重建断点续传或重新从服务器初始化分片上传
  # 有可能sha命中直接完成
  data = rebuild
  return data if data

  # 文件分片
  divide_parts if @parts.empty?

  # 未完成的片段
  @todo_parts = @parts.reject { |p| p[:done] }

  # 多线程上传
  Thread.abort_on_exception = true

  threads = []
  @num_threads.times do
    threads << Thread.new do
      loop do
        # 获取下一个未上传的片段
        p = sync_get_todo_part
        break unless p

        # 上传片段
        upload_part(p)
      end
    end
  end

  threads.each do |thread|
    begin
      thread.join
    rescue => error
      unless finish?
        # 部分服务端异常需要重新初始化, 可能上传已经完成了
        if error.is_a?(ServerError) and error.error_code == -288
          File.delete(cpt_file) unless options[:disable_cpt]
        end
        threads.each {|t| t.exit}
        raise error
      end
    end
  end

  # 返回100%的进度
  progress.call(1.to_f) if progress

  # 上传完成, 删除checkpoint文件
  File.delete(cpt_file) unless options[:disable_cpt]

  logger.info("Done upload, file: #{@file_src}")

  # 返回文件信息
  result
end

Private Instance Methods

divide_parts() click to toggle source

文件片段拆分

# File lib/cos/slice.rb, line 249
def divide_parts
  logger.info("Begin divide parts, file: #{file_src}")

  file_size = File.size(file_src)
  num_parts = (file_size - offset - 1) / slice_size + 1
  @parts = (1..num_parts).map do |i|
    {
        :number => i,
        :range  => [offset + (i-1) * slice_size, [offset + i * slice_size, file_size].min],
        :done   => false
    }
  end

  checkpoint

  logger.info("Done divide parts, parts: #{@parts.size}")
end
ensure_file_not_changed() click to toggle source

确保上传中文件没有变化

# File lib/cos/slice.rb, line 289
def ensure_file_not_changed
  return if File.mtime(file_src) == @file_meta[:mtime]

  if @file_meta[:sha1] != Util.file_sha1(file_src)
    # p Util.file_sha1(file_src)
    raise FileInconsistentError, 'The file to upload is changed'
  end
end
finish?() click to toggle source

是否完成上传

# File lib/cos/slice.rb, line 134
def finish?
  result != nil and result[:access_url] != nil
end
initiate() click to toggle source

初始化分块上传

# File lib/cos/slice.rb, line 169
def initiate
  logger.info('Begin initiate session')

  bucket        = config.get_bucket(options[:bucket])
  sign          = http.signature.multiple(bucket)
  resource_path = Util.get_resource_path(config.app_id, bucket, path, file_name)
  file_size     = File.size(file_src)
  file_sha1     = Util.file_sha1(file_src)

  payload = {
      op:          'upload_slice',
      slice_size:  options[:slice_size] || DEFAULT_SLICE_SIZE,
      sha:         file_sha1,
      filesize:    file_size,
      biz_attr:    options[:biz_attr],
      session:     session,
      multipart:   true
  }

  resp = http.post(resource_path, {}, sign, payload)

  # 上一次已传完或秒传成功
  return resp if resp[:access_url]

  @session    = resp[:session]
  @slice_size = resp[:slice_size]
  @offset     = resp[:offset]

  @file_meta = {
      :mtime => File.mtime(file_src),
      :sha1  => file_sha1,
      :size  => file_size
  }

  # 保存断点
  checkpoint

  logger.info("Done initiate session: #{@session}")

  false
end
rebuild() click to toggle source

断点续传文件重建

# File lib/cos/slice.rb, line 139
def rebuild
  logger.info("Begin rebuild session, checkpoint: #{cpt_file}")

  # 是否启用断点续传并且记录文件存在
  if options[:disable_cpt] || !File.exist?(cpt_file)
    # 从服务器初始化
    data = initiate
    return data if data
  else
    # 加载断点续传
    states = load_checkpoint(cpt_file)

    # 确保上传的文件未变化
    if states[:file_sha1] != @file_meta[:sha1]
      raise FileInconsistentError, 'The file to upload is changed'
    end

    @session    = states[:session]
    @file_meta  = states[:file_meta]
    @parts      = states[:parts]
    @slice_size = states[:slice_size]
    @offset     = states[:offset]
  end

  logger.info("Done rebuild session, Parts: #{@parts.count}")

  false
end
sync_get_all_parts() click to toggle source

同步获取所有片段

# File lib/cos/slice.rb, line 282
def sync_get_all_parts
  @all_mutex.synchronize {
    @parts.dup
  }
end
sync_get_todo_part() click to toggle source

同步获取下一片段

# File lib/cos/slice.rb, line 268
def sync_get_todo_part
  @todo_mutex.synchronize {
    @todo_parts.shift
  }
end
sync_update_part(p) click to toggle source

同步更新片段

# File lib/cos/slice.rb, line 275
def sync_update_part(p)
  @all_mutex.synchronize {
    @parts[p[:number] - 1] = p
  }
end
upload_part(p) click to toggle source

上传块

# File lib/cos/slice.rb, line 212
def upload_part(p)
  logger.debug("Begin upload slice: #{p}")

  bucket        = config.get_bucket(options[:bucket])
  sign          = http.signature.multiple(bucket)
  resource_path = Util.get_resource_path(config.app_id, bucket, path, file_name)
  temp_file     = Tempfile.new("#{session}-#{p[:number]}")

  begin
    # 复制文件分片至临时文件
    IO.copy_stream(file_src, temp_file, p[:range].at(1) - p[:range].at(0), p[:range].at(0))

    payload = {
        op:          'upload_slice',
        sha:         Util.file_sha1(temp_file),
        offset:      p[:range].at(0),
        session:     session,
        filecontent: temp_file,
        multipart:   true
    }

    re = http.post(resource_path, {}, sign, payload)
    @result = re if re[:access_url]
  ensure
    # 确保清除临时文件
    temp_file.close
    temp_file.unlink
  end

  sync_update_part(p.merge(done: true))

  checkpoint

  logger.debug("Done upload part: #{p}")
end