class Backup::CloudIO::CloudFiles

Constants

MAX_FILE_SIZE
MAX_SLO_SIZE
SEGMENT_BUFFER

Attributes

api_key[R]
auth_url[R]
container[R]
days_to_keep[R]
fog_options[R]
region[R]
segment_size[R]
segments_container[R]
servicenet[R]
username[R]

Public Class Methods

new(options = {}) click to toggle source
Calls superclass method Backup::CloudIO::Base::new
# File lib/backup/cloud_io/cloud_files.rb, line 19
def initialize(options = {})
  super

  @username           = options[:username]
  @api_key            = options[:api_key]
  @auth_url           = options[:auth_url]
  @region             = options[:region]
  @servicenet         = options[:servicenet]
  @container          = options[:container]
  @segments_container = options[:segments_container]
  @segment_size       = options[:segment_size]
  @days_to_keep       = options[:days_to_keep]
  @fog_options        = options[:fog_options]
end

Public Instance Methods

delete(objects_or_names) click to toggle source

Delete non-SLO object(s) from the container.

  • Called by the Storage (with objects) and the Syncer (with names)

  • Deletes 10,000 objects per request.

  • Missing objects will be ignored.

# File lib/backup/cloud_io/cloud_files.rb, line 104
      def delete(objects_or_names)
        names = Array(objects_or_names).dup
        names.map!(&:name) if names.first.is_a?(Object)

        until names.empty?
          _names = names.slice!(0, 10000)
          with_retries('DELETE Multiple Objects') do
            resp = connection.delete_multiple_objects(container, _names)
            resp_status = resp.body['Response Status']
            raise Error, <<-EOS unless resp_status == '200 OK'
              #{ resp_status }
              The server returned the following:
              #{ resp.body.inspect }
            EOS
          end
        end
      end
delete_slo(objects) click to toggle source

Delete an SLO object(s) from the container.

  • Used only by the Storage. The Syncer cannot use SLOs.

  • Removes the SLO manifest object and all associated segments.

  • Missing segments will be ignored.

# File lib/backup/cloud_io/cloud_files.rb, line 127
      def delete_slo(objects)
        Array(objects).each do |object|
          with_retries("DELETE SLO Manifest '#{ container }/#{ object.name }'") do
            resp = connection.delete_static_large_object(container, object.name)
            resp_status = resp.body['Response Status']
            raise Error, <<-EOS unless resp_status == '200 OK'
              #{ resp_status }
              The server returned the following:
              #{ resp.body.inspect }
            EOS
          end
        end
      end
head_object(object) click to toggle source

Used by Object to fetch metadata if needed.

# File lib/backup/cloud_io/cloud_files.rb, line 91
def head_object(object)
  resp = nil
  with_retries("HEAD '#{ container }/#{ object.name }'") do
    resp = connection.head_object(container, object.name)
  end
  resp
end
objects(prefix) click to toggle source

Returns all objects in the container with the given prefix.

  • get_container returns a max of 10000 objects per request.

  • Returns objects sorted using a sqlite binary collating function.

  • If marker is given, only objects after the marker are in the response.

# File lib/backup/cloud_io/cloud_files.rb, line 69
def objects(prefix)
  objects = []
  resp = nil
  prefix = prefix.chomp('/')
  opts = { :prefix => prefix + '/' }

  create_containers

  while resp.nil? || resp.body.count == 10000
    opts.merge!(:marker => objects.last.name) unless objects.empty?
    with_retries("GET '#{ container }/#{ prefix }/*'") do
      resp = connection.get_container(container, opts)
    end
    resp.body.each do |obj_data|
      objects << Object.new(self, obj_data)
    end
  end

  objects
end
upload(src, dest) click to toggle source

The Syncer may call this method in multiple threads, but objects is always called before this occurs.

# File lib/backup/cloud_io/cloud_files.rb, line 36
      def upload(src, dest)
        create_containers

        file_size = File.size(src)
        segment_bytes = segment_size * 1024**2
        if segment_bytes > 0 && file_size > segment_bytes
          raise FileSizeError, <<-EOS if file_size > MAX_SLO_SIZE
            File Too Large
            File: #{ src }
            Size: #{ file_size }
            Max SLO Size is #{ MAX_SLO_SIZE } (5 GiB * 1000 segments)
          EOS

          segment_bytes = adjusted_segment_bytes(segment_bytes, file_size)
          segments = upload_segments(src, dest, segment_bytes, file_size)
          upload_manifest(dest, segments)
        else
          raise FileSizeError, <<-EOS if file_size > MAX_FILE_SIZE
            File Too Large
            File: #{ src }
            Size: #{ file_size }
            Max File Size is #{ MAX_FILE_SIZE } (5 GiB)
          EOS

          put_object(src, dest)
        end
      end

Private Instance Methods

adjusted_segment_bytes(segment_bytes, file_size) click to toggle source
# File lib/backup/cloud_io/cloud_files.rb, line 257
      def adjusted_segment_bytes(segment_bytes, file_size)
        return segment_bytes if file_size / segment_bytes.to_f <= 1000

        mb = orig_mb = segment_bytes / 1024**2
        mb += 1 until file_size / (1024**2 * mb).to_f <= 1000
        Logger.warn Error.new(<<-EOS)
          Segment Size Adjusted
          Your original #segment_size of #{ orig_mb } MiB has been adjusted
          to #{ mb } MiB in order to satisfy the limit of 1000 segments.
          To enforce your chosen #segment_size, you should use the Splitter.
          e.g. split_into_chunks_of #{ mb * 1000 } (#segment_size * 1000)
        EOS
        1024**2 * mb
      end
connection() click to toggle source
# File lib/backup/cloud_io/cloud_files.rb, line 143
def connection
  @connection ||= Fog::Storage.new({
    :provider             => 'Rackspace',
    :rackspace_username   => username,
    :rackspace_api_key    => api_key,
    :rackspace_auth_url   => auth_url,
    :rackspace_region     => region,
    :rackspace_servicenet => servicenet
  }.merge(fog_options || {}))
end
create_containers() click to toggle source
# File lib/backup/cloud_io/cloud_files.rb, line 154
def create_containers
  return if @containers_created
  @containers_created = true

  with_retries('Create Containers') do
    connection.put_container(container)
    connection.put_container(segments_container) if segments_container
  end
end
delete_at() click to toggle source
# File lib/backup/cloud_io/cloud_files.rb, line 252
def delete_at
  return unless days_to_keep
  @delete_at ||= (Time.now.utc + days_to_keep * 60**2 * 24).to_i
end
headers() click to toggle source

If :days_to_keep was set, each object will be scheduled for deletion. This includes non-SLO objects, the SLO manifest and all segments.

# File lib/backup/cloud_io/cloud_files.rb, line 246
def headers
  headers = {}
  headers.merge!('X-Delete-At' => delete_at) if delete_at
  headers
end
put_object(src, dest) click to toggle source
# File lib/backup/cloud_io/cloud_files.rb, line 164
def put_object(src, dest)
  opts = headers.merge('ETag' => Digest::MD5.file(src).hexdigest)
  with_retries("PUT '#{ container }/#{ dest }'") do
    File.open(src, 'r') do |file|
      connection.put_object(container, dest, file, opts)
    end
  end
end
segment_md5(file, segment_bytes) click to toggle source
# File lib/backup/cloud_io/cloud_files.rb, line 220
def segment_md5(file, segment_bytes)
  md5 = Digest::MD5.new
  offset = 0
  while offset <= segment_bytes - SEGMENT_BUFFER
    data = file.read(SEGMENT_BUFFER)
    break unless data
    offset += data.size
    md5 << data
  end
  md5.hexdigest
end
upload_manifest(dest, segments) click to toggle source

Each segment’s ETag and byte_size will be verified once uploaded. Request will raise an exception if verification fails or segments are not found. However, each segment’s ETag was verified when we uploaded the segments, so this should only retry failed requests.

# File lib/backup/cloud_io/cloud_files.rb, line 236
def upload_manifest(dest, segments)
  Logger.info "\s\sStoring SLO Manifest '#{ container }/#{ dest }'"

  with_retries("PUT SLO Manifest '#{ container }/#{ dest }'") do
    connection.put_static_obj_manifest(container, dest, segments, headers)
  end
end
upload_segments(src, dest, segment_bytes, file_size) click to toggle source

Each segment is uploaded using chunked transfer encoding using SEGMENT_BUFFER, and each segment’s MD5 is sent to verify the transfer. Each segment’s MD5 and byte_size will also be verified when the SLO manifest object is uploaded.

# File lib/backup/cloud_io/cloud_files.rb, line 177
def upload_segments(src, dest, segment_bytes, file_size)
  total_segments = (file_size / segment_bytes.to_f).ceil
  progress = (0.1..0.9).step(0.1).map {|n| (total_segments * n).floor }
  Logger.info "\s\sUploading #{ total_segments } SLO Segments..."

  segments = []
  File.open(src, 'r') do |file|
    segment_number = 0
    until file.eof?
      segment_number += 1
      object = "#{ dest }/#{ segment_number.to_s.rjust(4, '0') }"
      pos = file.pos
      md5 = segment_md5(file, segment_bytes)
      opts = headers.merge('ETag' => md5)

      with_retries("PUT '#{ segments_container }/#{ object }'") do
        file.seek(pos)
        offset = 0
        connection.put_object(segments_container, object, nil, opts) do
          # block is called to stream data until it returns ''
          data = ''
          if offset <= segment_bytes - SEGMENT_BUFFER
            data = file.read(SEGMENT_BUFFER).to_s # nil => ''
            offset += data.size
          end
          data
        end
      end

      segments << {
        :path => "#{ segments_container }/#{ object }",
        :etag => md5,
        :size_bytes => file.pos - pos
      }

      if i = progress.rindex(segment_number)
        Logger.info "\s\s...#{ i + 1 }0% Complete..."
      end
    end
  end
  segments
end