class Myreplicator::Transporter

Public Class Methods

backup_files(location, metadata_path, dump_path) click to toggle source
# File lib/transporter/transporter.rb, line 160
def self.backup_files location, metadata_path, dump_path
  FileUtils.cp(metadata_path, location)
  FileUtils.cp(dump_path, location)
end
completed_files(export) click to toggle source

Gets all files ready to be exported from server

# File lib/transporter/transporter.rb, line 187
def self.completed_files export
  ssh = export.ssh_to_source
  done_files = ssh.exec!(get_done_files(export))
  if done_files.blank?
    return []
  end
  files = done_files.split("\n")
  
  jobs = Export.where("active = 1 and source_schema = '#{export.source_schema}'")
  #jobs.each do |j|
  #  j.update_attributes!({:state => "transporting"})
  #end
  result = []
  files.each do |file|
    flag = nil
    jobs.each do |job|
      if file.include?(job.table_name)
        flag = job 
        #job.update_attributes!({:state => 'transporting'})
      end
    end
    if flag
      result << {:file => file, :export => flag}
    end
  end
  Kernel.p "===== done_files ====="
  Kernel.p result
  return result

  #Kernel.p "===== done_files ====="
  #Kernel.p files
  #return files
end
download(export) click to toggle source

Connect to server via SSH Kicks off parallel download

# File lib/transporter/transporter.rb, line 55
def self.download export
  #Kernel.p "===== 1 ====="
  #parallel_download(completed_files(export))
  tmp_dir ||= Myreplicator.tmp_path
  Dir.mkdir(tmp_dir) unless File.directory?(tmp_dir)
  files = completed_files(export)
  files.each do |f|
    export = f[:export]
    filename = f[:file]
    ActiveRecord::Base.verify_active_connections!
         ActiveRecord::Base.connection.reconnect!
 
         Log.run(:job_type => "transporter", :name => "metadata_file", 
                 :file => filename, :export_id => export.id ) do |log|
 
           sftp = export.sftp_to_source
           json_file = Transporter.remote_path(export, filename) 
           json_local_path = File.join(tmp_dir,filename)
           puts "Downloading #{json_file}"
           sftp.download!(json_file, json_local_path)
           metadata = Transporter.metadata_obj(json_local_path)
           dump_file = metadata.export_path
           puts metadata.state
           if metadata.state == "export_completed"
             Log.run(:job_type => "transporter", :name => "export_file",
                     :file => dump_file, :export_id => export.id) do |log|
               puts "Downloading #{dump_file}"
               local_dump_file = File.join(tmp_dir, dump_file.split("/").last)
               sftp.download!(dump_file, local_dump_file)
               Transporter.remove!(export, json_file, dump_file)
               #export.update_attributes!({:state => 'transport_completed'})
               # store back up as well
               unless metadata.store_in.blank?
                 Transporter.backup_files(metadata.backup_path, json_local_path, local_dump_file)
               end
             end
           else
             # TO DO: remove metadata file of failed export
             Transporter.remove!(export, json_file, "")
           end #if
         end
  end
end
download_file() click to toggle source

Code block that each thread calls instance_exec is used to execute under Transporter class

  1. Connects via SFTP

  2. Downloads metadata file first

  3. Gets dump file location from metadata

  4. Downloads dump file

# File lib/transporter/transporter.rb, line 122
def self.download_file    
  proc = Proc.new { |params|
    export = params[0] 
    filename = params[1]

    ActiveRecord::Base.verify_active_connections!
    ActiveRecord::Base.connection.reconnect!

    Log.run(:job_type => "transporter", :name => "metadata_file", 
            :file => filename, :export_id => export.id ) do |log|

      sftp = export.sftp_to_source
      json_file = Transporter.remote_path(export, filename) 
      json_local_path = File.join(tmp_dir,filename)
      puts "Downloading #{json_file}"
      sftp.download!(json_file, json_local_path)
      metadata = Transporter.metadata_obj(json_local_path)
      dump_file = metadata.export_path
      puts metadata.state
      if metadata.state == "export_completed"
        Log.run(:job_type => "transporter", :name => "export_file",
                :file => dump_file, :export_id => export.id) do |log|
          puts "Downloading #{dump_file}"
          local_dump_file = File.join(tmp_dir, dump_file.split("/").last)
          sftp.download!(dump_file, local_dump_file)
          Transporter.remove!(export, json_file, dump_file)
          #export.update_attributes!({:state => 'transport_completed'})
          # store back up as well
          unless metadata.store_in.blank?
            Transporter.backup_files(metadata.backup_path, json_local_path, local_dump_file)
          end
        end
      end #if
      #puts "#{Thread.current.to_s}___Exiting download..."
    end
  }
end
get_done_files(export) click to toggle source

Command for list of done files Grep -s used to supress error messages

# File lib/transporter/transporter.rb, line 245
def self.get_done_files export
  Kernel.p "===== export ====="
  Kernel.p export
  cmd = "cd #{Myreplicator.configs[export.source_schema]["ssh_tmp_dir"]}; grep -ls export_completed *.json"
  return cmd
end
get_dump_path(json_path, metadata = nil) click to toggle source

Reads metadata file for the export path

# File lib/transporter/transporter.rb, line 229
def self.get_dump_path json_path, metadata = nil
  metadata = Transporter.metadata_obj(json_path) if metadata.nil?
  return metadata.export_path
end
junk_file?(metadata) click to toggle source

Returns true if the file should be deleted

# File lib/transporter/transporter.rb, line 168
def self.junk_file? metadata
  case metadata.state
  when "failed"
    return true
  when "ignored"
    return true
  end
  return false
end
metadata_obj(json_path) click to toggle source
# File lib/transporter/transporter.rb, line 221
def self.metadata_obj json_path
  metadata = ExportMetadata.new(:metadata_path => json_path)
  return metadata
end
new(*args) click to toggle source
# File lib/transporter/transporter.rb, line 8
def initialize *args
  options = args.extract_options!
end
parallel_download(files) click to toggle source

Gathers all files that need to be downloaded Gives the queue to parallelizer library to download in parallel

# File lib/transporter/transporter.rb, line 103
def self.parallel_download files    
  p = Parallelizer.new(:klass => "Myreplicator::Transporter")

  files.each do |f|
    puts f[:file]
    p.queue << {:params =>[f[:export], f[:file]], :block => download_file}
  end

  p.run 
end
perform() click to toggle source

Main method provided for resque Reconnection provided for resque workers

# File lib/transporter/transporter.rb, line 23
def self.perform
  transfer # Kick off the load process
end
remote_path(export, filename) click to toggle source

Returns where path of dump files on remote server

# File lib/transporter/transporter.rb, line 237
def self.remote_path export, filename
  File.join(Myreplicator.configs[export.source_schema]["ssh_tmp_dir"], filename)
end
remove!(export, json_file, dump_file) click to toggle source
# File lib/transporter/transporter.rb, line 178
def self.remove! export, json_file, dump_file
  ssh = export.ssh_to_source
  puts "rm #{json_file} #{dump_file}"
  ssh.exec!("rm #{json_file} #{dump_file}")
end
transfer() click to toggle source

Connects to all unique database servers downloads export files concurrently from multiple sources

# File lib/transporter/transporter.rb, line 42
def self.transfer
  unique_jobs = Myreplicator::Export.where("active = 1").group("source_schema")
  Kernel.p "===== unique_jobs ====="
  Kernel.p unique_jobs 
  unique_jobs.each do |export|
    download export
  end
end

Public Instance Methods

schedule(cron) click to toggle source

Schedules the transport job in Resque

# File lib/transporter/transporter.rb, line 30
def schedule cron
  Resque.set_schedule("myreplicator_transporter", {
                        :cron => cron,
                        :class => "Myreplicator::Transporter",
                        :queue => "myreplicator_transporter"
                      })
end
tmp_dir() click to toggle source
# File lib/transporter/transporter.rb, line 12
def tmp_dir
  #@tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator")
  @tmp_dir ||= Myreplicator.tmp_path
  Dir.mkdir(@tmp_dir) unless File.directory?(@tmp_dir)
  @tmp_dir
end