class Backup::Syncer::Cloud::Base

Constants

MUTEX

Attributes

max_retries[RW]

Number of times to retry failed operations.

Default: 10

retry_waitsec[RW]

Time in seconds to pause before each retry.

Default: 30

thread_count[RW]

Number of threads to use for concurrency.

Default: 0 (no concurrency)

Public Class Methods

new(syncer_id = nil, &block) click to toggle source
Calls superclass method Backup::Syncer::Base::new
# File lib/backup/syncer/cloud/base.rb, line 29
def initialize(syncer_id = nil, &block)
  super
  instance_eval(&block) if block_given?

  @thread_count   ||= 0
  @max_retries    ||= 10
  @retry_waitsec  ||= 30

  @path ||= 'backups'
  @path = path.sub(/^\//, '')
end

Public Instance Methods

perform!() click to toggle source
# File lib/backup/syncer/cloud/base.rb, line 41
def perform!
  log!(:started)
  @transfer_count = 0
  @unchanged_count = 0
  @skipped_count = 0
  @orphans = thread_count > 0 ? Queue.new : []

  directories.each {|dir| sync_directory(dir) }
  orphans_result = process_orphans

  Logger.info "\nSummary:"
  Logger.info "\s\sTransferred Files: #{ @transfer_count }"
  Logger.info "\s\s#{ orphans_result }"
  Logger.info "\s\sUnchanged Files: #{ @unchanged_count }"
  Logger.warn "\s\sSkipped Files: #{ @skipped_count }" if @skipped_count > 0
  log!(:finished)
end

Private Instance Methods

process_orphans() click to toggle source
# File lib/backup/syncer/cloud/base.rb, line 146
def process_orphans
  if @orphans.empty?
    return mirror ? 'Deleted Files: 0' : 'Orphaned Files: 0'
  end

  if @orphans.is_a?(Queue)
    @orphans = @orphans.size.times.map { @orphans.shift }
  end

  if mirror
    Logger.info @orphans.map {|path|
      "\s\s[removing] '#{ path }'"
    }.join("\n")

    begin
      cloud_io.delete(@orphans)
      "Deleted Files: #{ @orphans.count }"
    rescue => err
      Logger.warn Error.wrap(err, 'Delete Operation Failed')
      "Attempted to Delete: #{ @orphans.count } " +
      "(See log messages for actual results)"
    end
  else
    Logger.info @orphans.map {|path|
      "\s\s[orphaned] '#{ path }'"
    }.join("\n")
    "Orphaned Files: #{ @orphans.count }"
  end
end
sync_directory(dir) click to toggle source
# File lib/backup/syncer/cloud/base.rb, line 61
def sync_directory(dir)
  remote_base = path.empty? ? File.basename(dir) :
                              File.join(path, File.basename(dir))
  Logger.info "Gathering remote data for '#{ remote_base }'..."
  remote_files = get_remote_files(remote_base)

  Logger.info("Gathering local data for '#{ File.expand_path(dir) }'...")
  local_files = LocalFile.find(dir, excludes)

  relative_paths = (local_files.keys | remote_files.keys).sort
  if relative_paths.empty?
    Logger.info 'No local or remote files found'
  else
    Logger.info 'Syncing...'
    sync_block = Proc.new do |relative_path|
      local_file  = local_files[relative_path]
      remote_md5  = remote_files[relative_path]
      remote_path = File.join(remote_base, relative_path)
      sync_file(local_file, remote_path, remote_md5)
    end

    if thread_count > 0
      sync_in_threads(relative_paths, sync_block)
    else
      relative_paths.each(&sync_block)
    end
  end
end
sync_file(local_file, remote_path, remote_md5) click to toggle source

If an exception is raised in multiple threads, only the exception raised in the first thread that Thread#join is called on will be handled. So all exceptions are logged first with their details, then a generic exception is raised.

# File lib/backup/syncer/cloud/base.rb, line 120
        def sync_file(local_file, remote_path, remote_md5)
          if local_file && File.exist?(local_file.path)
            if local_file.md5 == remote_md5
              MUTEX.synchronize { @unchanged_count += 1 }
            else
              Logger.info("\s\s[transferring] '#{ remote_path }'")
              begin
                cloud_io.upload(local_file.path, remote_path)
                MUTEX.synchronize { @transfer_count += 1 }
              rescue CloudIO::FileSizeError => err
                MUTEX.synchronize { @skipped_count += 1 }
                Logger.warn Error.wrap(err, "Skipping '#{ remote_path }'")
              rescue => err
                Logger.error(err)
                raise Error, <<-EOS
                  Syncer Failed!
                  See the Retry [info] and [error] messages (if any)
                  for details on each failed operation.
                EOS
              end
            end
          elsif remote_md5
            @orphans << remote_path
          end
        end
sync_in_threads(relative_paths, sync_block) click to toggle source
# File lib/backup/syncer/cloud/base.rb, line 90
def sync_in_threads(relative_paths, sync_block)
  queue = Queue.new
  queue << relative_paths.shift until relative_paths.empty?
  num_threads = [thread_count, queue.size].min
  Logger.info "\s\sUsing #{ num_threads } Threads"
  threads = num_threads.times.map do
    Thread.new do
      loop do
        path = queue.shift(true) rescue nil
        path ? sync_block.call(path) : break
      end
    end
  end

  # abort if any thread raises an exception
  while threads.any?(&:alive?)
    if threads.any? {|thr| thr.status.nil? }
      threads.each(&:kill)
      Thread.pass while threads.any?(&:alive?)
      break
    end
    sleep num_threads * 0.1
  end
  threads.each(&:join)
end