class Myreplicator::Loader

Public Class Methods

cleanup(metadata) click to toggle source

Deletes the metadata file and extract

# File lib/loader/loader.rb, line 334
def self.cleanup metadata
  puts "Cleaning up..."
  e1 = nil
  e2 = nil
  begin
  FileUtils.rm metadata.metadata_filepath(tmp_dir) # json file
  rescue Exception => e
    e1 = e
    puts e.message
  end
  begin
  FileUtils.rm metadata.destination_filepath(tmp_dir) # dump file
  rescue Exception => e
    e2 = e
    puts e.message
  end
  if (!e1.blank?)
    raise Exceptions::LoaderError.new("#{e1.message}")
  end
  if (!e2.blank?)
    raise Exceptions::LoaderError.new("#{e2.message}")
  end
end
clear_older_files(metadata) click to toggle source

Clears files that are older than the passed metadata file. Note: This methoded is provided to ensure no old incremental files ever get loaded after the schema change algorithm has been applied

# File lib/loader/loader.rb, line 404
def self.clear_older_files metadata
  files = Loader.metadata_files
  #Kernel.p "===== clear old files ====="
  #Kernel.p metadata
  #Kernel.p files
  max_date = DateTime.strptime metadata.export_time
  files.each do |m|
    if metadata.export_id == m.export_id
      if max_date > DateTime.strptime(m.export_time)
        Loader.cleanup m if metadata.filepath != m.filepath
      end 
    end
  end     
end
group_incrementals(incrementals) click to toggle source

Groups all incrementals files for the same table together Returns and array of arrays NOTE: Each Arrays should be processed in the same thread to avoid collision

# File lib/loader/loader.rb, line 210
def self.group_incrementals incrementals
  groups = [] # array of all grouped incrementals

  incrementals.each do |metadata|
    group = [metadata]
    incrementals.delete(metadata)

    # look for same loads
    incrementals.each do |md| 
      if metadata.equals(md)
        group << md
        incrementals.delete(md) # remove from main array
      end
    end
    
    groups << group
  end
  return groups
end
incremental_load(metadata) click to toggle source

Loads data incrementally Uses the values specified in the metadatta object

# File lib/loader/loader.rb, line 268
def self.incremental_load metadata
  exp = Myreplicator::Export.find(metadata.export_id)
  #Loader.unzip(metadata.filename)
  #metadata.zipped = false
  
  options = {:table_name => exp.table_name, 
    :db => exp.destination_schema,
    :filepath => metadata.destination_filepath(tmp_dir), 
    :source_schema => exp.source_schema,      
    :fields_terminated_by => "\\0",
    :lines_terminated_by => "\\n"}
  
  case metadata.export_to 
  when "vertica"
    Loader.load_to_vertica options, metadata, exp
  when "mysql"
    cmd = ImportSql.load_data_infile(options)
    puts cmd
    result = `#{cmd}` # execute
    unless result.nil?
      if result.size > 0
        raise Exceptions::LoaderError.new("Incremental Load #{metadata.filename} Failed!\n#{result}") 
      end
    end
  end #case
end
incremental_loads(incrementals) click to toggle source

Load all incremental files Ensures that multiple loads to the same table happen sequentially.

# File lib/loader/loader.rb, line 179
def self.incremental_loads incrementals
  groups = Loader.group_incrementals incrementals
  procs = []
  groups.each do |group|
    procs << Proc.new {
      group.each do |metadata|
        Myreplicator::Log.run(:job_type => "loader", 
                :name => "incremental_import", 
                :file => metadata.filename, 
                :export_id => metadata.export_id) do |log|

          if Myreplicator::Loader.transfer_completed? metadata            
            Myreplicator::Loader.incremental_load metadata
            Myreplicator::Loader.cleanup metadata
          end

        end
      end # group
    }
  end # groups
  
  return procs
end
initial_load(metadata) click to toggle source

Creates table and loads data

# File lib/loader/loader.rb, line 233
def self.initial_load metadata
  exp = Myreplicator::Export.find(metadata.export_id)
  #Kernel.p "===== unzip ====="
  #Loader.unzip(metadata.filename)
  #metadata.zipped = false
  filename = metadata.filename
  if filename.split('.').last == 'gz'
    filepath = metadata.destination_filepath(tmp_dir)
    cmd = "gunzip #{filepath}"
    system(cmd)
    unzip_file = File.join(tmp_dir, filename.split('.')[0..-2].join('.'))
    cmd = Myreplicator::ImportSql.initial_load(:db => exp.destination_schema,
                                     :filepath => unzip_file.to_s)
    puts cmd
    result = `#{cmd} 2>&1` # execute
    cmd2 = "gzip #{unzip_file.to_s}"
    system(cmd2)
  else
    cmd = Myreplicator::ImportSql.initial_load(:db => exp.destination_schema,
                                     :filepath => metadata.destination_filepath(tmp_dir))
    puts cmd
    result = `#{cmd} 2>&1` # execute
  end
  
  unless result.nil?
    if result.size > 0
      raise Exceptions::LoaderError.new("Initial Load #{metadata.filename} Failed!\n#{result}") 
    end
  end
end
initial_loads(initials) click to toggle source

Loads all new tables concurrently multiple files

# File lib/loader/loader.rb, line 148
def self.initial_loads initials
  procs = []

  initials.each do |metadata| 
    procs << Proc.new {
      Myreplicator::Log.run(:job_type => "loader", 
              :name => "#{metadata.export_type}_import", 
              :file => metadata.filename, 
              :export_id => metadata.export_id) do |log|

        if Myreplicator::Loader.transfer_completed? metadata
          if metadata.export_to == "vertica"
            Myreplicator::Loader.incremental_load metadata
          else
            Myreplicator::Loader.initial_load metadata
          end
          Myreplicator::Loader.cleanup metadata
        end

      end
    }
  end

  return procs
end
load() click to toggle source

Kicks off all initial loads first and then all incrementals Looks at metadata files stored locally Note: Initials are loaded sequentially

# File lib/loader/loader.rb, line 50
def self.load
  initials = []
  incrementals = []
  all_files = Myreplicator::Loader.metadata_files
  
  files_to_metadata = {}
  @redis = Redis.new(:host => Settings[:redis][:host], :port => Settings[:redis][:port])
  @load_set = "myreplicator_load_set"
  @load_hash = "myreplicator_load_hash"
  
  # clear out |k,v| of already deleted filed
  @redis.hgetall(@load_hash).size
  @redis.hgetall(@load_hash).each do |k, v|
    if @redis.hget(@load_hash, k) == '1'
      @redis.hdel(@load_hash, k)
    end
  end
  
  # check if there is any other running loader, if not then reset the load_hash
  cmd = "ps aux | grep 'Processing myreplicator_load'"
  result = `#{cmd} 2>&1`
  if (result.split("Processing myreplicator_load since").size <= 2)
    @redis.hgetall(@load_hash).each do |k, v|
      @redis.hdel(@load_hash, k)
    end
  end
  
  # making the hash for mapping filepath to metadata object
  all_files.each do |m|
    if Myreplicator::Loader.transfer_completed? m
      if !(@redis.hexists(@load_hash, m.filepath))
        @redis.hset(@load_hash, m.filepath, 0)
        @redis.sadd(@load_set, m.filepath)
      end
      files_to_metadata[m.filepath] = m
    else
      # for the fun of commenting: do nothing
    end
  end
  
  # processing the files in "queue"
  while @redis.smembers(@load_set).size > 0
    filepath = @redis.spop(@load_set)
          
    metadata = files_to_metadata[filepath]
    if metadata.blank?
      next
    end
    # init load
    if metadata.export_type == "initial"
      if Myreplicator::Loader.transfer_completed? metadata
        Myreplicator::Log.run(:job_type => "loader",
        :name => "#{metadata.export_type}_import",
        :file => metadata.filename,
        :export_id => metadata.export_id) do |log|
          if metadata.export_to == "vertica"
            Myreplicator::Loader.incremental_load metadata
          else
            Myreplicator::Loader.initial_load metadata
          end
          Myreplicator::Loader.cleanup metadata
        end
        @redis.hset(@load_hash, metadata.filepath, 1)
      else #transporter not done yet, return the file to @load_set
        @redis.sadd(@load_set, metadata.filepath)
      end
      
    else #if metadata.export_type == "incremental" # incremental load
      if Myreplicator::Loader.transfer_completed? metadata
        Myreplicator::Log.run(:job_type => "loader",
        :name => "incremental_import",
        :file => metadata.filename,
        :export_id => metadata.export_id) do |log|
          Myreplicator::Loader.incremental_load metadata
          Myreplicator::Loader.cleanup metadata
        end
        @redis.hset(@load_hash, metadata.filepath, 1)
      else #transporter not done yet, return the file to @load_set
        @redis.sadd(@load_set, metadata.filepath)
      end
    end
    sleep(2)
  end # end while
end
load_to_vertica(options, metadata, exp) click to toggle source

Load to Vertica

# File lib/loader/loader.rb, line 298
def self.load_to_vertica options, metadata, exp
  options = {:table_name => exp.table_name, 
    :db => ActiveRecord::Base.configurations["vertica"]["database"],
    :filepath => metadata.destination_filepath(tmp_dir), 
    :source_schema => exp.source_schema, :export_id => metadata.export_id,
    :metadata => metadata
  }
  
  options[:destination_schema] = exp.destination_schema
  
  result = Myreplicator::VerticaLoader.load options
  
  ##TO DO: Handle unsuccessful vertica loads here

end
metadata_files() click to toggle source
# File lib/loader/loader.rb, line 383
def self.metadata_files
  files = []
  Dir.glob(File.join(tmp_dir, "*.json")).each do |json_file|
    files << Myreplicator::ExportMetadata.new(:metadata_path => json_file)
  end
  result = []
  #Kernel.p files
  files.each do |file|
    job = Export.where("id = #{file.export_id}").first
    #if job.state == "transport_completed"
    result << file
    #end
  end
  return result
end
mysql_table_definition(options) click to toggle source
# File lib/loader/loader.rb, line 419
def self.mysql_table_definition options
  sql = "SELECT table_schema, table_name, column_name, is_nullable, data_type, column_type, column_key "
  sql += "FROM INFORMATION_SCHEMA.COLUMNS where table_name = '#{options[:table]}' "
  sql += "and table_schema = '#{options[:source_schema]}';"
  
  puts sql
  
  desc = Myreplicator::DB.exec_sql(options[:source_schema], sql)
  puts desc
  return desc
end
new(*args) click to toggle source
# File lib/loader/loader.rb, line 9
def initialize *args
  options = args.extract_options!
end
parallel_load(procs) click to toggle source
# File lib/loader/loader.rb, line 135
def self.parallel_load procs
  p = Parallelizer.new(:klass => "Myreplicator::Loader")
  procs.each do |proc|
    p.queue << {:params => [], :block => proc}
  end
  
  p.run
end
perform(*args) click to toggle source

Main method provided for resque Reconnection provided for resque workers

# File lib/loader/loader.rb, line 22
def self.perform *args
  options = args.extract_options!
  id = options[:id]
  if id.blank?  
    ActiveRecord::Base.verify_active_connections!
    ActiveRecord::Base.connection.reconnect!
    load # Kick off the load process
  else
    ActiveRecord::Base.verify_active_connections!
    ActiveRecord::Base.connection.reconnect!
    load_id(id)
  end
end
tmp_dir() click to toggle source
# File lib/loader/loader.rb, line 13
def self.tmp_dir
  #@tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator")
  @tmp_dir ||= Myreplicator.tmp_path
end
transfer_completed?(metadata) click to toggle source

Returns true if the transfer of the file being loaded is completed

# File lib/loader/loader.rb, line 318
def self.transfer_completed? metadata
  #Kernel.p "===== transfer_completed? metadata ====="
  #Kernel.p ({:export_id => metadata.export_id,
  #                        :file => metadata.export_path,
  #:job_type => "transporter"})
  if Log.completed?(:export_id => metadata.export_id,
                    :file => metadata.export_path,
                    :job_type => "transporter")
    return true
  end
  return false
end
unzip(filename) click to toggle source

Unzips file Checks if the file exists or already unzipped

# File lib/loader/loader.rb, line 362
def self.unzip filename
  cmd = "cd #{tmp_dir}; gunzip #{filename}"
  passed = false
  if File.exist?(File.join(tmp_dir,filename))
    result = `#{cmd}`
    unless result.nil? 
      puts result
      unless result.length > 0
        passed = true
      end
    else
      passed = true
    end
  elsif File.exist?(File.join(tmp_dir,filename.gsub(".gz","")))
    puts "File already unzipped"
    passed = true
  end

  raise Exceptions::LoaderError.new("Unzipping #{filename} Failed!") unless passed
end

Public Instance Methods

load_id(id) click to toggle source

Running loader for 1 export object

# File lib/loader/loader.rb, line 39
def load_id id
  
  #Resque.enqueue(Myreplicator::Loader, id)
  #Resque.enqueue(Myreplicator::Export,342)
end