class Tapsoob::Push

Public Instance Methods

fetch_local_tables_info() click to toggle source
# File lib/tapsoob/operation.rb, line 485
def fetch_local_tables_info
  tables_with_counts = {}
  tbls = Dir.glob(File.join(dump_path, "data", "*")).map { |path| File.basename(path, ".json") }
  tbls.each do |table|
    if File.exists?(File.join(dump_path, "data", "#{table}.json"))
      data = JSON.parse(File.read(File.join(dump_path, "data", "#{table}.json")))
      tables_with_counts[table] = data["data"].size
    else
      tables_with_counts[table] = 0
    end
  end
  apply_table_filter(tables_with_counts)
end
file_prefix() click to toggle source
# File lib/tapsoob/operation.rb, line 316
def file_prefix
  "push"
end
local_tables_info() click to toggle source
# File lib/tapsoob/operation.rb, line 468
def local_tables_info
  opts[:local_tables_info] ||= fetch_local_tables_info
end
push_data() click to toggle source
# File lib/tapsoob/operation.rb, line 389
def push_data
  log.info "Sending data"

  log.info "#{tables.size} tables, #{format_number(record_count)} records"

  tables.each do |table_name, count|
    next unless File.exists?(File.join(dump_path, "data", "#{table_name}.json"))
    db[table_name.to_sym].truncate if @opts[:purge]
    stream = Tapsoob::DataStream.factory(db, {
      :table_name => table_name,
      :chunksize => default_chunksize
    }, {
      :"skip-duplicates" => opts[:"skip-duplicates"] || false,
      :"discard-identity" => opts[:"discard-identity"] || false,
      :purge => opts[:purge] || false,
      :debug => opts[:debug]
    })
    progress = ProgressBar.new(table_name.to_s, count)
    push_data_from_file(stream, progress)
  end
end
push_data_from_file(stream, progress) click to toggle source
# File lib/tapsoob/operation.rb, line 411
def push_data_from_file(stream, progress)
  loop do
    if exiting?
      store_session
      exit 0
    end

    row_size = 0
    chunksize = stream.state[:chunksize]

    begin
      chunksize = Tapsoob::Utils.calculate_chunksize(chunksize) do |c|
        stream.state[:chunksize] = c.to_i
        encoded_data, row_size, elapsed_time = nil
        d1 = c.time_delta do
          encoded_data, row_size, elapsed_time = stream.fetch({ :type => "file", :source => dump_path })
        end
        break if stream.complete?

        data = nil
        d2 = c.time_delta do
          data = {
            :state    => stream.to_hash,
            :checksum => Tapsoob::Utils.checksum(encoded_data).to_s
          }
        end

        row_size = stream.fetch_data_in_database({ :encoded_data => encoded_data, :checksum => data[:checksum] })
        log.debug "row size: #{row_size}"
        self.stream_state = stream.to_hash

        c.idle_secs = (d1 + d2)

        elapsed_time
      end
    rescue Tapsoob::CorruptedData => e
      # retry the same data, it got corrupted somehow.
      next
    rescue Tapsoob::DuplicatePrimaryKeyError => e
      # verify the stream and retry it
      stream.verify_stream
      stream = JSON.generate({ :state => stream.to_hash })
      next
    end
    stream.state[:chunksize] = chunksize

    progress.inc(row_size)

    stream.increment(row_size)
    break if stream.complete?
  end

  progress.finish
  completed_tables << stream.table_name.to_s
  self.stream_state = {}
end
push_indexes() click to toggle source
# File lib/tapsoob/operation.rb, line 338
def push_indexes
  idxs = {}
  table_idxs = Dir.glob(File.join(dump_path, "indexes", "*.json")).map { |path| File.basename(path, '.json') }
  table_idxs.each do |table_idx|
    idxs[table_idx] = JSON.parse(File.read(File.join(dump_path, "indexes", "#{table_idx}.json")))
  end

  return unless idxs.size > 0

  log.info "Sending indexes"

  apply_table_filter(idxs).each do |table, indexes|
    next unless indexes.size > 0
    progress = ProgressBar.new(table, indexes.size)
    indexes.each do |idx|
      Tapsoob::Utils.load_indexes(database_url, idx)
      progress.inc(1)
    end
    progress.finish
  end
end
push_partial_data() click to toggle source
# File lib/tapsoob/operation.rb, line 378
def push_partial_data
  return if stream_state == {}

  table_name = stream_state[:table_name]
  record_count = tables[table_name.to_s]
  log.info "Resuming #{table_name}, #{format_number(record_count)} records"
  progress = ProgressBar.new(table_name.to_s, record_count)
  stream = Tapsoob::DataStream.factory(db, stream_state)
  push_data_from_file(stream, progress)
end
push_reset_sequences() click to toggle source
# File lib/tapsoob/operation.rb, line 372
def push_reset_sequences
  log.info "Resetting sequences"

  Tapsoob::Utils.schema_bin(:reset_db_sequences, database_url)
end
push_schema() click to toggle source
# File lib/tapsoob/operation.rb, line 360
def push_schema
  log.info "Sending schema"

  progress = ProgressBar.new('Schema', tables.size)
  tables.each do |table, count|
    log.debug "Loading '#{table}' schema\n"
    Tapsoob::Utils.load_schema(dump_path, database_url, table)
    progress.inc(1)
  end
  progress.finish
end
record_count() click to toggle source
# File lib/tapsoob/operation.rb, line 481
def record_count
  @record_count ||= local_tables_info.values.inject(0) { |a,c| a += c }
end
run() click to toggle source
# File lib/tapsoob/operation.rb, line 324
def run
  catch_errors do
    unless resuming?
      push_schema if !skip_schema?
      push_indexes if indexes_first? && !skip_schema?
    end
    setup_signal_trap
    push_partial_data if resuming?
    push_data
    push_indexes if !indexes_first? && !skip_schema?
    push_reset_sequences
  end
end
tables() click to toggle source
# File lib/tapsoob/operation.rb, line 472
def tables
  h = {}
  local_tables_info.each do |table_name, count|
    next if completed_tables.include?(table_name.to_s)
    h[table_name.to_s] = count
  end
  h
end
to_hash() click to toggle source
Calls superclass method Tapsoob::Operation#to_hash
# File lib/tapsoob/operation.rb, line 320
def to_hash
  super.merge(:local_tables_info => local_tables_info)
end