class Tapsoob::Push
Public Instance Methods
fetch_local_tables_info()
click to toggle source
# File lib/tapsoob/operation.rb, line 485 def fetch_local_tables_info tables_with_counts = {} tbls = Dir.glob(File.join(dump_path, "data", "*")).map { |path| File.basename(path, ".json") } tbls.each do |table| if File.exists?(File.join(dump_path, "data", "#{table}.json")) data = JSON.parse(File.read(File.join(dump_path, "data", "#{table}.json"))) tables_with_counts[table] = data["data"].size else tables_with_counts[table] = 0 end end apply_table_filter(tables_with_counts) end
file_prefix()
click to toggle source
# File lib/tapsoob/operation.rb, line 316 def file_prefix "push" end
local_tables_info()
click to toggle source
# File lib/tapsoob/operation.rb, line 468 def local_tables_info opts[:local_tables_info] ||= fetch_local_tables_info end
push_data()
click to toggle source
# File lib/tapsoob/operation.rb, line 389 def push_data log.info "Sending data" log.info "#{tables.size} tables, #{format_number(record_count)} records" tables.each do |table_name, count| next unless File.exists?(File.join(dump_path, "data", "#{table_name}.json")) db[table_name.to_sym].truncate if @opts[:purge] stream = Tapsoob::DataStream.factory(db, { :table_name => table_name, :chunksize => default_chunksize }, { :"skip-duplicates" => opts[:"skip-duplicates"] || false, :"discard-identity" => opts[:"discard-identity"] || false, :purge => opts[:purge] || false, :debug => opts[:debug] }) progress = ProgressBar.new(table_name.to_s, count) push_data_from_file(stream, progress) end end
push_data_from_file(stream, progress)
click to toggle source
# File lib/tapsoob/operation.rb, line 411 def push_data_from_file(stream, progress) loop do if exiting? store_session exit 0 end row_size = 0 chunksize = stream.state[:chunksize] begin chunksize = Tapsoob::Utils.calculate_chunksize(chunksize) do |c| stream.state[:chunksize] = c.to_i encoded_data, row_size, elapsed_time = nil d1 = c.time_delta do encoded_data, row_size, elapsed_time = stream.fetch({ :type => "file", :source => dump_path }) end break if stream.complete? data = nil d2 = c.time_delta do data = { :state => stream.to_hash, :checksum => Tapsoob::Utils.checksum(encoded_data).to_s } end row_size = stream.fetch_data_in_database({ :encoded_data => encoded_data, :checksum => data[:checksum] }) log.debug "row size: #{row_size}" self.stream_state = stream.to_hash c.idle_secs = (d1 + d2) elapsed_time end rescue Tapsoob::CorruptedData => e # retry the same data, it got corrupted somehow. next rescue Tapsoob::DuplicatePrimaryKeyError => e # verify the stream and retry it stream.verify_stream stream = JSON.generate({ :state => stream.to_hash }) next end stream.state[:chunksize] = chunksize progress.inc(row_size) stream.increment(row_size) break if stream.complete? end progress.finish completed_tables << stream.table_name.to_s self.stream_state = {} end
push_indexes()
click to toggle source
# File lib/tapsoob/operation.rb, line 338 def push_indexes idxs = {} table_idxs = Dir.glob(File.join(dump_path, "indexes", "*.json")).map { |path| File.basename(path, '.json') } table_idxs.each do |table_idx| idxs[table_idx] = JSON.parse(File.read(File.join(dump_path, "indexes", "#{table_idx}.json"))) end return unless idxs.size > 0 log.info "Sending indexes" apply_table_filter(idxs).each do |table, indexes| next unless indexes.size > 0 progress = ProgressBar.new(table, indexes.size) indexes.each do |idx| Tapsoob::Utils.load_indexes(database_url, idx) progress.inc(1) end progress.finish end end
push_partial_data()
click to toggle source
# File lib/tapsoob/operation.rb, line 378 def push_partial_data return if stream_state == {} table_name = stream_state[:table_name] record_count = tables[table_name.to_s] log.info "Resuming #{table_name}, #{format_number(record_count)} records" progress = ProgressBar.new(table_name.to_s, record_count) stream = Tapsoob::DataStream.factory(db, stream_state) push_data_from_file(stream, progress) end
push_reset_sequences()
click to toggle source
# File lib/tapsoob/operation.rb, line 372 def push_reset_sequences log.info "Resetting sequences" Tapsoob::Utils.schema_bin(:reset_db_sequences, database_url) end
push_schema()
click to toggle source
# File lib/tapsoob/operation.rb, line 360 def push_schema log.info "Sending schema" progress = ProgressBar.new('Schema', tables.size) tables.each do |table, count| log.debug "Loading '#{table}' schema\n" Tapsoob::Utils.load_schema(dump_path, database_url, table) progress.inc(1) end progress.finish end
record_count()
click to toggle source
# File lib/tapsoob/operation.rb, line 481 def record_count @record_count ||= local_tables_info.values.inject(0) { |a,c| a += c } end
run()
click to toggle source
# File lib/tapsoob/operation.rb, line 324 def run catch_errors do unless resuming? push_schema if !skip_schema? push_indexes if indexes_first? && !skip_schema? end setup_signal_trap push_partial_data if resuming? push_data push_indexes if !indexes_first? && !skip_schema? push_reset_sequences end end
tables()
click to toggle source
# File lib/tapsoob/operation.rb, line 472 def tables h = {} local_tables_info.each do |table_name, count| next if completed_tables.include?(table_name.to_s) h[table_name.to_s] = count end h end
to_hash()
click to toggle source
Calls superclass method
Tapsoob::Operation#to_hash
# File lib/tapsoob/operation.rb, line 320 def to_hash super.merge(:local_tables_info => local_tables_info) end