class Myreplicator::Loader
Public Class Methods
Deletes the metadata file and extract
# File lib/loader/loader.rb, line 334 def self.cleanup metadata puts "Cleaning up..." e1 = nil e2 = nil begin FileUtils.rm metadata.metadata_filepath(tmp_dir) # json file rescue Exception => e e1 = e puts e.message end begin FileUtils.rm metadata.destination_filepath(tmp_dir) # dump file rescue Exception => e e2 = e puts e.message end if (!e1.blank?) raise Exceptions::LoaderError.new("#{e1.message}") end if (!e2.blank?) raise Exceptions::LoaderError.new("#{e2.message}") end end
Clears files that are older than the passed metadata file. Note: This methoded is provided to ensure no old incremental files ever get loaded after the schema change algorithm has been applied
# File lib/loader/loader.rb, line 404 def self.clear_older_files metadata files = Loader.metadata_files #Kernel.p "===== clear old files =====" #Kernel.p metadata #Kernel.p files max_date = DateTime.strptime metadata.export_time files.each do |m| if metadata.export_id == m.export_id if max_date > DateTime.strptime(m.export_time) Loader.cleanup m if metadata.filepath != m.filepath end end end end
Groups all incrementals files for the same table together Returns and array of arrays NOTE: Each Arrays should be processed in the same thread to avoid collision
# File lib/loader/loader.rb, line 210 def self.group_incrementals incrementals groups = [] # array of all grouped incrementals incrementals.each do |metadata| group = [metadata] incrementals.delete(metadata) # look for same loads incrementals.each do |md| if metadata.equals(md) group << md incrementals.delete(md) # remove from main array end end groups << group end return groups end
Loads data incrementally Uses the values specified in the metadatta object
# File lib/loader/loader.rb, line 268 def self.incremental_load metadata exp = Myreplicator::Export.find(metadata.export_id) #Loader.unzip(metadata.filename) #metadata.zipped = false options = {:table_name => exp.table_name, :db => exp.destination_schema, :filepath => metadata.destination_filepath(tmp_dir), :source_schema => exp.source_schema, :fields_terminated_by => "\\0", :lines_terminated_by => "\\n"} case metadata.export_to when "vertica" Loader.load_to_vertica options, metadata, exp when "mysql" cmd = ImportSql.load_data_infile(options) puts cmd result = `#{cmd}` # execute unless result.nil? if result.size > 0 raise Exceptions::LoaderError.new("Incremental Load #{metadata.filename} Failed!\n#{result}") end end end #case end
Load all incremental files Ensures that multiple loads to the same table happen sequentially.
# File lib/loader/loader.rb, line 179 def self.incremental_loads incrementals groups = Loader.group_incrementals incrementals procs = [] groups.each do |group| procs << Proc.new { group.each do |metadata| Myreplicator::Log.run(:job_type => "loader", :name => "incremental_import", :file => metadata.filename, :export_id => metadata.export_id) do |log| if Myreplicator::Loader.transfer_completed? metadata Myreplicator::Loader.incremental_load metadata Myreplicator::Loader.cleanup metadata end end end # group } end # groups return procs end
Creates table and loads data
# File lib/loader/loader.rb, line 233 def self.initial_load metadata exp = Myreplicator::Export.find(metadata.export_id) #Kernel.p "===== unzip =====" #Loader.unzip(metadata.filename) #metadata.zipped = false filename = metadata.filename if filename.split('.').last == 'gz' filepath = metadata.destination_filepath(tmp_dir) cmd = "gunzip #{filepath}" system(cmd) unzip_file = File.join(tmp_dir, filename.split('.')[0..-2].join('.')) cmd = Myreplicator::ImportSql.initial_load(:db => exp.destination_schema, :filepath => unzip_file.to_s) puts cmd result = `#{cmd} 2>&1` # execute cmd2 = "gzip #{unzip_file.to_s}" system(cmd2) else cmd = Myreplicator::ImportSql.initial_load(:db => exp.destination_schema, :filepath => metadata.destination_filepath(tmp_dir)) puts cmd result = `#{cmd} 2>&1` # execute end unless result.nil? if result.size > 0 raise Exceptions::LoaderError.new("Initial Load #{metadata.filename} Failed!\n#{result}") end end end
Loads all new tables concurrently multiple files
# File lib/loader/loader.rb, line 148 def self.initial_loads initials procs = [] initials.each do |metadata| procs << Proc.new { Myreplicator::Log.run(:job_type => "loader", :name => "#{metadata.export_type}_import", :file => metadata.filename, :export_id => metadata.export_id) do |log| if Myreplicator::Loader.transfer_completed? metadata if metadata.export_to == "vertica" Myreplicator::Loader.incremental_load metadata else Myreplicator::Loader.initial_load metadata end Myreplicator::Loader.cleanup metadata end end } end return procs end
Kicks off all initial loads first and then all incrementals Looks at metadata files stored locally Note: Initials are loaded sequentially
# File lib/loader/loader.rb, line 50 def self.load initials = [] incrementals = [] all_files = Myreplicator::Loader.metadata_files files_to_metadata = {} @redis = Redis.new(:host => Settings[:redis][:host], :port => Settings[:redis][:port]) @load_set = "myreplicator_load_set" @load_hash = "myreplicator_load_hash" # clear out |k,v| of already deleted filed @redis.hgetall(@load_hash).size @redis.hgetall(@load_hash).each do |k, v| if @redis.hget(@load_hash, k) == '1' @redis.hdel(@load_hash, k) end end # check if there is any other running loader, if not then reset the load_hash cmd = "ps aux | grep 'Processing myreplicator_load'" result = `#{cmd} 2>&1` if (result.split("Processing myreplicator_load since").size <= 2) @redis.hgetall(@load_hash).each do |k, v| @redis.hdel(@load_hash, k) end end # making the hash for mapping filepath to metadata object all_files.each do |m| if Myreplicator::Loader.transfer_completed? m if !(@redis.hexists(@load_hash, m.filepath)) @redis.hset(@load_hash, m.filepath, 0) @redis.sadd(@load_set, m.filepath) end files_to_metadata[m.filepath] = m else # for the fun of commenting: do nothing end end # processing the files in "queue" while @redis.smembers(@load_set).size > 0 filepath = @redis.spop(@load_set) metadata = files_to_metadata[filepath] if metadata.blank? next end # init load if metadata.export_type == "initial" if Myreplicator::Loader.transfer_completed? metadata Myreplicator::Log.run(:job_type => "loader", :name => "#{metadata.export_type}_import", :file => metadata.filename, :export_id => metadata.export_id) do |log| if metadata.export_to == "vertica" Myreplicator::Loader.incremental_load metadata else Myreplicator::Loader.initial_load metadata end Myreplicator::Loader.cleanup metadata end @redis.hset(@load_hash, metadata.filepath, 1) else #transporter not done yet, return the file to @load_set @redis.sadd(@load_set, metadata.filepath) end else #if metadata.export_type == "incremental" # incremental load if Myreplicator::Loader.transfer_completed? metadata Myreplicator::Log.run(:job_type => "loader", :name => "incremental_import", :file => metadata.filename, :export_id => metadata.export_id) do |log| Myreplicator::Loader.incremental_load metadata Myreplicator::Loader.cleanup metadata end @redis.hset(@load_hash, metadata.filepath, 1) else #transporter not done yet, return the file to @load_set @redis.sadd(@load_set, metadata.filepath) end end sleep(2) end # end while end
Load to Vertica
# File lib/loader/loader.rb, line 298 def self.load_to_vertica options, metadata, exp options = {:table_name => exp.table_name, :db => ActiveRecord::Base.configurations["vertica"]["database"], :filepath => metadata.destination_filepath(tmp_dir), :source_schema => exp.source_schema, :export_id => metadata.export_id, :metadata => metadata } options[:destination_schema] = exp.destination_schema result = Myreplicator::VerticaLoader.load options ##TO DO: Handle unsuccessful vertica loads here end
# File lib/loader/loader.rb, line 383 def self.metadata_files files = [] Dir.glob(File.join(tmp_dir, "*.json")).each do |json_file| files << Myreplicator::ExportMetadata.new(:metadata_path => json_file) end result = [] #Kernel.p files files.each do |file| job = Export.where("id = #{file.export_id}").first #if job.state == "transport_completed" result << file #end end return result end
# File lib/loader/loader.rb, line 419 def self.mysql_table_definition options sql = "SELECT table_schema, table_name, column_name, is_nullable, data_type, column_type, column_key " sql += "FROM INFORMATION_SCHEMA.COLUMNS where table_name = '#{options[:table]}' " sql += "and table_schema = '#{options[:source_schema]}';" puts sql desc = Myreplicator::DB.exec_sql(options[:source_schema], sql) puts desc return desc end
# File lib/loader/loader.rb, line 9 def initialize *args options = args.extract_options! end
# File lib/loader/loader.rb, line 135 def self.parallel_load procs p = Parallelizer.new(:klass => "Myreplicator::Loader") procs.each do |proc| p.queue << {:params => [], :block => proc} end p.run end
Main method provided for resque Reconnection provided for resque workers
# File lib/loader/loader.rb, line 22 def self.perform *args options = args.extract_options! id = options[:id] if id.blank? ActiveRecord::Base.verify_active_connections! ActiveRecord::Base.connection.reconnect! load # Kick off the load process else ActiveRecord::Base.verify_active_connections! ActiveRecord::Base.connection.reconnect! load_id(id) end end
# File lib/loader/loader.rb, line 13 def self.tmp_dir #@tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator") @tmp_dir ||= Myreplicator.tmp_path end
Returns true if the transfer of the file being loaded is completed
# File lib/loader/loader.rb, line 318 def self.transfer_completed? metadata #Kernel.p "===== transfer_completed? metadata =====" #Kernel.p ({:export_id => metadata.export_id, # :file => metadata.export_path, #:job_type => "transporter"}) if Log.completed?(:export_id => metadata.export_id, :file => metadata.export_path, :job_type => "transporter") return true end return false end
Unzips file Checks if the file exists or already unzipped
# File lib/loader/loader.rb, line 362 def self.unzip filename cmd = "cd #{tmp_dir}; gunzip #{filename}" passed = false if File.exist?(File.join(tmp_dir,filename)) result = `#{cmd}` unless result.nil? puts result unless result.length > 0 passed = true end else passed = true end elsif File.exist?(File.join(tmp_dir,filename.gsub(".gz",""))) puts "File already unzipped" passed = true end raise Exceptions::LoaderError.new("Unzipping #{filename} Failed!") unless passed end
Public Instance Methods
Running loader for 1 export object
# File lib/loader/loader.rb, line 39 def load_id id #Resque.enqueue(Myreplicator::Loader, id) #Resque.enqueue(Myreplicator::Export,342) end