class ODisk::Planner

The Planner collects input in the form of Digests from a Digester and a Fetcher and then determines what actions are necessary to synchronize a directory. The Planner then asks Copiers and Crypters to perform the synchronization operations.

Public Class Methods

new(options={}) click to toggle source
Calls superclass method
# File lib/odisk/planner.rb, line 11
def initialize(options={})
  super(options)
end
sync_steps(pd, ld, rd, master=nil) click to toggle source

master can be Step::LOCAL or Step::REMOTE and forces direction

# File lib/odisk/planner.rb, line 25
def self.sync_steps(pd, ld, rd, master=nil)
  steps = {}
  lh = {}
  rh = {}
  ph = {}
  ld.entries.each { |e| lh[e.name] = e }
  rd.entries.each { |e| rh[e.name] = e }
  pd.entries.each { |e| ph[e.name] = e } unless pd.nil?
  keys = lh.keys | rh.keys
  keys.each do |name|
    le = lh[name]
    re = rh[name]
    if re.nil?
      if Step::REMOTE == master
        steps[name] = Step.new(name, Step::LOCAL, Step::REMOVE)
      elsif le.is_a?(::ODisk::File)
        steps[name] = Step.new(name, Step::LOCAL, Step::COPY)
      elsif le.is_a?(::ODisk::Dir)
        steps[name] = Step.new(name, Step::LOCAL, Step::DIGEST)
      end
    elsif le.nil?
      if Step::LOCAL == master
        steps[name] = Step.new(name, Step::REMOTE, Step::REMOVE)
      elsif re.is_a?(::ODisk::File)
        steps[name] = Step.new(name, Step::REMOTE, Step::COPY)
      elsif re.is_a?(::ODisk::Dir)
        steps[name] = Step.new(name, Step::REMOTE, Step::DIGEST)
      end
    elsif le != re # both exist but are different
      # some helpful info
      if le.owner != re.owner
        ::Opee::Env.info("#{le.name} owner difference #{le.owner} != #{re.owner}")
      elsif le.group != re.group
        ::Opee::Env.info("#{le.name} group difference #{le.group} != #{re.group}")
      elsif le.mode != re.mode
        ::Opee::Env.info("#{le.name} mode difference #{le.mode} != #{re.mode}")
      elsif le.mtime.sec != re.mtime.sec
        ::Opee::Env.info("#{le.name} mtime (sec) difference #{le.mtime.sec} != #{re.mtime.sec}")
      elsif le.size != re.size
        ::Opee::Env.info("#{le.name} size difference #{le.size} != #{re.size}")
      end

      if le.class != re.class
        ::Opee::Env.error("Conflict syncing #{ld.top_path}/#{name}. Local and remote types do not match.")
        steps[name] = Step.new(name, Step::LOCAL, Step::ERROR)
      elsif le.removed
        ::Opee::Env.error("Unexpected digest entry for #{ld.top_path}/#{name}. The removed flag is sent in the local digest.")
      elsif re.removed
        steps[name] = Step.new(name, Step::LOCAL, Step::REMOVE)
      elsif le.is_a?(::ODisk::File) || le.is_a?(::ODisk::Link)
        op = le.is_a?(::ODisk::File) ? Step::COPY : Step::LINK
        if Step::LOCAL == master
          steps[name] = Step.new(name, Step::LOCAL, op) if le.is_a?(::ODisk::File)
        elsif Step::REMOTE == master
          steps[name] = Step.new(name, Step::REMOTE, op) if re.is_a?(::ODisk::File)
        elsif le.mtime > re.mtime
          pe = ph[name]
          if pe.nil? || pe.mtime == re.mtime
            # Don't know if the content or the stats changed so copy it.
            steps[name] = Step.new(name, Step::LOCAL, op) if le.is_a?(::ODisk::File)
          else
            ::Opee::Env.error("Conflict syncing #{ld.top_path}/#{name}. Both local and remote have changed.")
            steps[name] = Step.new(name, Step::LOCAL, Step::ERROR)
          end
        elsif le.mtime < re.mtime
          # Don't know if the content or the stats changed so copy it.
          steps[name] = Step.new(name, Step::REMOTE, op) if re.is_a?(::ODisk::File)
        else # same times but different can't be good
          ::Opee::Env.error("Conflict syncing #{ld.top_path}/#{name}. Both local and remote have changed.")
          steps[name] = Step.new(name, Step::LOCAL, Step::ERROR)
        end
      end
    end
  end
  steps.empty? ? nil : steps
end

Public Instance Methods

set_options(options) click to toggle source
Calls superclass method
# File lib/odisk/planner.rb, line 15
def set_options(options)
  super(options)
  @dir_queue = options[:dir_queue]
  @copy_queue = options[:copy_queue]
  @crypt_queue = options[:crypt_queue]
  @inputs = options[:inputs]
  @fixer = options[:fixer]
end

Private Instance Methods

complete?(job, token) click to toggle source
# File lib/odisk/planner.rb, line 110
def complete?(job, token)
  result = !token.nil? && token.size == @inputs.size && token.sort == @inputs.sort
  ::Opee::Env.info("complete?(#{job.key()}, #{token}) => #{result}")
  #::Opee::Env.debug("#{Oj.dump(job, indent: 2)}")
  result
end
keep_going(job) click to toggle source
# File lib/odisk/planner.rb, line 117
def keep_going(job)
  path = ::File.join($local_top, job.path)
  odisk_dir = ::File.join(path, '.odisk')
  `mkdir -p "#{odisk_dir}"` unless ::File.directory?(odisk_dir) && !$dry_run
  if @copy_queue.nil? && @crypt_queue.nil? && !job.current_digest.nil? # digests_only
    if job.previous_digest.nil?
      job.current_digest.version = 1
    else
      job.current_digest.version = job.previous_digest.version + 1
      Oj.to_file(::File.join(odisk_dir, 'digest.old.json'), job.previous_digest, indent: 2)
    end
    Oj.to_file(::File.join(odisk_dir, 'digest.json'), job.current_digest, indent: 2)
    job.current_digest.entries.each do |e|
      @dir_queue.ask(:add, job.path.empty? ? e.name : ::File.join(job.path, e.name)) if e.is_a?(::ODisk::Dir)
    end
  elsif job.remote_digest.nil?
    process_new(job, odisk_dir)
  elsif ((job.current_digest.nil? || job.current_digest.empty?) &&
         (job.previous_digest.nil? || job.previous_digest.empty?))
    process_down(job, odisk_dir)
  else
    process_sync(job, odisk_dir)
  end
end
process_down(job, odisk_dir) click to toggle source
# File lib/odisk/planner.rb, line 177
def process_down(job, odisk_dir)
  job.new_digest = job.remote_digest
  Oj.to_file(::File.join(odisk_dir, 'digest.json'), job.new_digest, indent: 2)
  full_job_path = job.path.empty? ? $local_top : ::File.join($local_top, job.path)
  stat_job = StatJob.new(full_job_path, job.new_digest)
  job.new_digest.entries.each do |e|
    path = job.path.empty? ? e.name : ::File.join(job.path, e.name)
    local = ::File.join($local_top, path)
    case e
    when ::ODisk::Dir
      ::Dir.mkdir(local) unless $dry_run || ::File.directory?(local)
      @dir_queue.ask(:add, path)
    when ::ODisk::File
      remote = ::File.join($remote.dir, path)
      stat_job.add_mod(e.name)
      if $remote.encrypt?
        encrypt_path = (job.path.empty? ? 
                        ::File.join($local_top, '.odisk', e.name + '.gpg') :
                        ::File.join($local_top, job.path, '.odisk', e.name + '.gpg'))
        @copy_queue.add_method(:download, remote + '.gpg', encrypt_path, local)
      else
        @copy_queue.add_method(:download, remote, local, nil)
      end
    when ::ODisk::Link
      target = e.target
      target = ::File.join($local_top, e.target) unless '/' == target[0]
      ::Opee::Env.info("symlink \"#{local}\" -> \"#{target}\"}")
      ::File.symlink(target, local) unless $dry_run || ::File.exists?(local)
    end
  end
  @fixer.ask(:collect, stat_job, :planner) unless @fixer.nil?
end
process_new(job, odisk_dir) click to toggle source
# File lib/odisk/planner.rb, line 142
def process_new(job, odisk_dir)
  job.current_digest.version = 1
  job.new_digest = job.current_digest
  # determine update new_digest
  # write the digest files
  # TBD if they are the same then don't bother
  Oj.to_file(::File.join(odisk_dir, 'digest.old.json'), job.previous_digest, indent: 2) unless job.previous_digest.nil?
  Oj.to_file(::File.join(odisk_dir, 'digest.json'), job.new_digest, indent: 2)
  # get the transfers going for all files
  job.new_digest.entries.each do |e|
    path = job.path.empty? ? e.name : ::File.join(job.path, e.name)
    case e
    when ::ODisk::Dir
      @dir_queue.ask(:add, path)
    when ::ODisk::File
      local = ::File.join($local_top, path)
      remote = ::File.join($remote.dir, path)
      if $remote.encrypt?
        encrypt_path = (job.path.empty? ? 
                        ::File.join($local_top, '.odisk', e.name + '.gpg') :
                        ::File.join($local_top, job.path, '.odisk', e.name + '.gpg'))
        @crypt_queue.add_method(:encrypt, local, encrypt_path, remote + '.gpg')
      else
        @copy_queue.add_method(:upload, local, remote)
      end
    when ::ODisk::Link
      # nothing to do
    end
  end
  path = job.path.empty? ? ::File.join('.odisk', 'digest.json') : ::File.join(job.path, '.odisk', 'digest.json')
  local = ::File.join($local_top, path)
  remote = ::File.join($remote.dir, path)
  @copy_queue.add_method(:upload, local, remote)
end
process_sync(job, odisk_dir) click to toggle source
# File lib/odisk/planner.rb, line 210
    def process_sync(job, odisk_dir)
      dirs = []
      if Step::REMOTE == $master
        job.remote_digest.entries.each { |e| dirs << e.name unless !e.is_a?(::ODisk::Dir) || dirs.include?(e.name) }
      elsif Step::LOCAL == $master
        job.current_digest.entries.each { |e| dirs << e.name unless !e.is_a?(::ODisk::Dir) || dirs.include?(e.name) }
      else
        job.current_digest.entries.each { |e| dirs << e.name unless !e.is_a?(::ODisk::Dir) || dirs.include?(e.name) }
        job.remote_digest.entries.each do |e|
          next unless e.is_a?(::ODisk::Dir)
          if e.removed
            dirs.delete(e.name)
          else
            dirs << e.name unless dirs.include?(e.name)
          end
        end
      end
      dirs.each do |dir|
        path = job.path.empty? ? dir : ::File.join(job.path, dir)
        local = ::File.join($local_top, path)
        ::Dir.mkdir(local) unless $dry_run || ::File.directory?(local)
        @dir_queue.ask(:add, path)
      end
      steps = self.class.sync_steps(job.previous_digest, job.current_digest, job.remote_digest, $master)
      #puts "*** steps for #{job.path}: #{steps}"
      return if steps.nil?

      Oj.to_file(::File.join(odisk_dir, 'digest.old.json'), job.previous_digest, indent: 2) unless job.previous_digest.nil?
      nrh = {} # fill with new digest entries for remote
      nlh = {} # fill with new digest entries for local
      full_job_path = job.path.empty? ? $local_top : ::File.join($local_top, job.path)
      stat_job = StatJob.new(full_job_path, nil)
      job.remote_digest.entries.each { |e| nrh[e.name] = e }
      job.current_digest.entries.each { |e| nlh[e.name] = e }
=begin
      if job.previous_digest.nil?
        job.current_digest.entries.each { |e| nlh[e.name] = e }
      else
        job.previous_digest.entries.each { |e| nlh[e.name] = e }
      end
=end
      steps.values.each do |s|
        stat_job.add_mod(s.name) unless Step::REMOVE == s.op || Step::DIGEST == s.op
        case s.op
        when Step::COPY
          path = job.path.empty? ? s.name : ::File.join(job.path, s.name)
          local = ::File.join($local_top, path)
          remote = ::File.join($remote.dir, path)
          encrypt_path = (job.path.empty? ? 
                          ::File.join($local_top, '.odisk', s.name + '.gpg') :
                          ::File.join($local_top, job.path, '.odisk', s.name + '.gpg'))
          if Step::REMOTE == s.master
            if $remote.encrypt?
              @copy_queue.add_method(:download, remote + '.gpg', encrypt_path, local)
            else
              @copy_queue.add_method(:download, remote, local, nil)
            end
            e = job.remote_digest[s.name]
          else
            if $remote.encrypt?
              @crypt_queue.add_method(:encrypt, local, encrypt_path, remote + '.gpg')
            else
              @copy_queue.add_method(:upload, local, remote)
            end
            e = job.current_digest[s.name]
          end
          nrh[e.name] = e
          nlh[e.name] = e
        when Step::REMOVE
          path = job.path.empty? ? s.name : ::File.join(job.path, s.name)
          if Step::REMOTE == s.master
            remote = ::File.join($remote.dir, path)
            @copy_queue.add_method(:remove_remote, remote)
            nrh.delete(s.name)
          else
            local = ::File.join($local_top, path)
            @copy_queue.add_method(:remove_local, local)
            nlh.delete(s.name)
          end
        when Step::DIGEST
          e = (Step::REMOTE == s.master) ? job.remote_digest[s.name] : job.current_digest[s.name]
          nrh[e.name] = e
          nlh[e.name] = e
        when Step::LINK
          # Handled in StatFixer
        when Step::ERROR
          # TBD
        end
      end
      nrd = Digest.new(job.remote_digest.top_path)
      nld = Digest.new(job.current_digest.top_path)
      # fill in digest entries from hashs
      nrd.entries = nrh.values
      nld.entries = nlh.values

      job.new_digest = nrd
      remote_digest_path = ::File.join(odisk_dir, 'digest.remote.json')
      Oj.to_file(::File.join(odisk_dir, 'digest.json'), nld, indent: 2)
      Oj.to_file(remote_digest_path, job.new_digest, indent: 2)
      path = job.path.empty? ? ::File.join('.odisk', 'digest.json') : ::File.join(job.path, '.odisk', 'digest.json')
      @copy_queue.add_method(:upload, remote_digest_path, ::File.join($remote.dir, path)) unless Step::REMOTE == $master

      stat_job.digest = nld
      @fixer.ask(:collect, stat_job, :planner) unless @fixer.nil?
    end
update_token(job, token, path_id) click to toggle source
# File lib/odisk/planner.rb, line 104
def update_token(job, token, path_id)
  token = [] if token.nil?
  token << path_id
  token
end