class Tapsoob::Pull

Public Class Methods

factory(db, state) click to toggle source
# File lib/tapsoob/operation.rb, line 273
def self.factory(db, state)
  if defined?(Sequel::MySQL) && Sequel::MySQL.respond_to?(:convert_invalid_date_time=)
    Sequel::MySQL.convert_invalid_date_time = :nil
  end

  if state.has_key?(:klass)
    return eval(state[:klass]).new(db, state)
  end

  if Tapsoob::Utils.single_integer_primary_key(db, state[:table_name].to_sym)
    DataStreamKeyed.new(db, state)
  else
    DataStream.new(db, state)
  end
end

Public Instance Methods

fetch_tables_info() click to toggle source
# File lib/tapsoob/operation.rb, line 263
def fetch_tables_info
  tables = db.tables

  data = {}
  apply_table_filter(tables).each do |table_name|
    data[table_name] = db[table_name].count
  end
  data
end
file_prefix() click to toggle source
# File lib/tapsoob/operation.rb, line 154
def file_prefix
  "pull"
end
pull_data() click to toggle source
# File lib/tapsoob/operation.rb, line 190
def pull_data
  log.info "Receiving data"

  log.info "#{tables.size} tables, #{format_number(record_count)} records"

  tables.each do |table_name, count|
    progress = (opts[:progress] ? ProgressBar.new(table_name.to_s, count) : nil)
    stream   = Tapsoob::DataStream.factory(db, {
      :chunksize  => default_chunksize,
      :table_name => table_name
    }, { :debug => opts[:debug] })
    pull_data_from_table(stream, progress)
  end
end
pull_data_from_table(stream, progress) click to toggle source
# File lib/tapsoob/operation.rb, line 217
def pull_data_from_table(stream, progress)
  loop do
    begin
      exit 0 if exiting?

      size = stream.fetch_database do |rows|
        if dump_path.nil?
          puts JSON.generate(rows)
        else
          Tapsoob::Utils.export_rows(dump_path, stream.table_name, rows)
        end
      end
      stream.error = false
      self.stream_state = stream.to_hash
    rescue Tapsoob::CorruptedData => e
      log.info "Corrupted Data Received #{e.message}, retrying..."
      stream.error = true
      next
    end

    progress.inc(size) if progress && !exiting?
    break if stream.complete?
  end

  progress.finish if progress
  completed_tables << stream.table_name.to_s
  self.stream_state = {}
end
pull_indexes() click to toggle source
# File lib/tapsoob/operation.rb, line 289
def pull_indexes
  log.info "Receiving indexes"

  raw_idxs = Tapsoob::Utils.schema_bin(:indexes_individual, database_url)
  idxs     = (raw_idxs && raw_idxs.length >= 2 ? JSON.parse(raw_idxs) : {})

  apply_table_filter(idxs).each do |table, indexes|
    next unless indexes.size > 0
    progress = ProgressBar.new(table, indexes.size)
    indexes.each do |idx|
      output = Tapsoob::Utils.export_indexes(dump_path, table, idx)
      puts output if output
      progress.inc(1)
    end
    progress.finish
  end
end
pull_partial_data() click to toggle source
# File lib/tapsoob/operation.rb, line 205
def pull_partial_data
  return if stream_state == {}

  table_name = stream_state[:table_name]
  record_count = tables[table_name.to_s]
  log.info "Resuming #{table_name}, #{format_number(record_count)} records"

  progress = (opts[:progress] ? ProgressBar.new(table_name.to_s, record_count) : nil)
  stream = Tapsoob::DataStream.factory(db, stream_state)
  pull_data_from_table(stream, progress)
end
pull_reset_sequences() click to toggle source
# File lib/tapsoob/operation.rb, line 307
def pull_reset_sequences
  log.info "Resetting sequences"

  output = Tapsoob::Utils.schema_bin(:reset_db_sequences, database_url)
  puts output if output
end
pull_schema() click to toggle source
# File lib/tapsoob/operation.rb, line 176
def pull_schema
  log.info "Receiving schema"

  progress = ProgressBar.new('Schema', tables.size)
  tables.each do |table_name, count|
    schema_data = Tapsoob::Schema.dump_table(database_url, table_name)
    log.debug "Table: #{table_name}\n#{schema_data}\n"
    output = Tapsoob::Utils.export_schema(dump_path, table_name, schema_data)
    puts output if output
    progress.inc(1)
  end
  progress.finish
end
record_count() click to toggle source
# File lib/tapsoob/operation.rb, line 255
def record_count
  tables_info.values.inject(:+)
end
run() click to toggle source
# File lib/tapsoob/operation.rb, line 162
def run
  catch_errors do
    unless resuming?
      pull_schema if !skip_schema?
      pull_indexes if indexes_first? && !skip_schema?
    end
    setup_signal_trap
    pull_partial_data if resuming?
    pull_data
    pull_indexes if !indexes_first? && !skip_schema?
    pull_reset_sequences
  end
end
tables() click to toggle source
# File lib/tapsoob/operation.rb, line 246
def tables
  h = {}
  tables_info.each do |table_name, count|
    next if completed_tables.include?(table_name.to_s)
    h[table_name.to_s] = count
  end
  h
end
tables_info() click to toggle source
# File lib/tapsoob/operation.rb, line 259
def tables_info
  opts[:tables_info] ||= fetch_tables_info
end
to_hash() click to toggle source
Calls superclass method Tapsoob::Operation#to_hash
# File lib/tapsoob/operation.rb, line 158
def to_hash
  super.merge(:remote_tables_info => remote_tables_info)
end