class Forklift::Patterns::Mysql

Public Class Methods

can_incremental_pipe?(source, from_table, destination, to_table, matcher=source.default_matcher) click to toggle source
# File lib/forklift/patterns/mysql_patterns.rb, line 76
def self.can_incremental_pipe?(source, from_table, destination, to_table, matcher=source.default_matcher)
  return false unless source.tables.include?(from_table)
  return false unless destination.tables.include?(to_table)
  source_cols      = source.columns(from_table, source.current_database)
  destination_cols = destination.columns(to_table, destination.current_database)
  return false unless source_cols.include?(matcher)
  return false unless destination_cols.include?(matcher)
  source_cols.each do |source_col|
    return false unless destination_cols.include?(source_col)
  end
  destination_cols.each do |destination_col|
    return false unless source_cols.include?(destination_col)
  end
  true
end
incremental_pipe(source, from_table, destination, to_table, matcher=source.default_matcher, primary_key='id') click to toggle source
# File lib/forklift/patterns/mysql_patterns.rb, line 21
def self.incremental_pipe(source, from_table, destination, to_table, matcher=source.default_matcher, primary_key='id')
  start = Time.new.to_i
  from_db = source.current_database 
  to_db = destination.current_database 
  source.forklift.logger.log("mysql incremental_pipe: `#{from_db}`.`#{from_table}` => `#{to_db}`.`#{to_table}`")
  source.q("create table if not exists `#{to_db}`.`#{to_table}` like `#{from_db}`.`#{from_table}`")

  # Count the number of rows in to_table
  original_count = source.count(to_table, to_db)

  # Find the latest/max/newest timestamp from the final table
  # in order to determine the last copied row.
  latest_timestamp = source.max_timestamp(to_table, matcher, to_db)

  # If to_table has existing rows, ensure none of them are "stale."
  # A stale row in to_table means a previously copied row was
  # updated in from_table, so let's delete it from the to_table
  # so we can get a fresh copy of that row.
  if original_count > 0
    # Get the ids of rows in from_table that are newer than the newest row in to_table.
    # Some of these rows could either be a) stale or b) new.
    source.read("select `#{primary_key}` from `#{from_db}`.`#{from_table}` where `#{matcher}` > \"#{latest_timestamp}\" order by `#{matcher}`") do |stale_rows|
      if stale_rows.length > 0
        # Delete these ids from to_table.
        # If the ids are stale, then they'll be deleted. If they're new, they won't exist, and nothing will happen.
        stale_ids = stale_rows.map { |row| row[primary_key.to_sym] }.join(',')
        source.q("delete from `#{to_db}`.`#{to_table}` where `#{primary_key}` in (#{stale_ids})")
        source.forklift.logger.log("  ^ deleted up to #{stale_rows.length} stale rows from `#{to_db}`.`#{to_table}`")
      end
    end
  end

  # Do the insert into to_table
  destination.q("insert into `#{to_db}`.`#{to_table}` select * from `#{from_db}`.`#{from_table}` where `#{matcher}` > \"#{latest_timestamp}\" order by `#{matcher}`")
  delta = Time.new.to_i - start
  new_count = destination.count(to_table, to_db) - original_count
  source.forklift.logger.log("  ^ created #{new_count} new rows in #{delta}s")
end
mysql_optimistic_import(source, destination, matcher=source.default_matcher) click to toggle source

When you are copying data to and from mysql An implementation of “pipe” for remote databases

# File lib/forklift/patterns/mysql_patterns.rb, line 94
def self.mysql_optimistic_import(source, destination, matcher=source.default_matcher)
  source.tables.each do |table|
    if( source.columns(table).include?(matcher) && destination.tables.include?(table) && destination.columns(table).include?(matcher) )
      since = destination.max_timestamp(table)
      source.read_since(table, since){ |data| destination.write(data, table) }
    else
      # destination.truncate table
      destination.drop! table if destination.tables.include?(table)
      source.read("select * from #{table}"){ |data| destination.write(data, table) }
    end
  end
end
optimistic_pipe(source, from_table, destination, to_table, matcher=source.default_matcher, primary_key='id') click to toggle source
# File lib/forklift/patterns/mysql_patterns.rb, line 60
def self.optimistic_pipe(source, from_table, destination, to_table, matcher=source.default_matcher, primary_key='id')
  from_db = source.current_database 
  to_db = destination.current_database 
  if self.can_incremental_pipe?(source, from_table, destination, to_table, matcher)
    begin
      incremental_pipe(source, from_table, destination, to_table, matcher, primary_key)
    rescue Exception => e
      source.forklift.logger.log("! incremental_pipe failure on #{from_table} => #{to_table}: #{e} ")
      source.forklift.logger.log("! falling back to pipe...")
      pipe(source, from_table, destination, to_table)
    end
  else
    pipe(source, from_table, destination, to_table)
  end
end
pipe(source, from_table, destination, to_table, tmp_table="_forklift_tmp") click to toggle source
# File lib/forklift/patterns/mysql_patterns.rb, line 5
def self.pipe(source, from_table, destination, to_table, tmp_table="_forklift_tmp")
  start = Time.new.to_i
  from_db = source.current_database 
  to_db = destination.current_database 
  source.forklift.logger.log("mysql pipe: `#{from_db}`.`#{from_table}` => `#{to_db}`.`#{to_table}`")

  source.q("drop table if exists `#{to_db}`.`#{tmp_table}`")
  source.q("create table `#{to_db}`.`#{tmp_table}` like `#{from_db}`.`#{from_table}`")
  source.q("insert into `#{to_db}`.`#{tmp_table}` select * from `#{from_db}`.`#{from_table}`")
  source.q("drop table if exists `#{to_db}`.`#{to_table}`")
  source.q("rename table `#{to_db}`.`#{tmp_table}` to `#{to_db}`.`#{to_table}`")

  delta = Time.new.to_i - start
  source.forklift.logger.log("  ^ moved #{destination.count(to_table, to_db)} rows in #{delta}s")
end
write_high_water_mark(db, time, matcher=db.default_matcher) click to toggle source

The high water method will stub a row in all tables with a `default_matcher` column prentending to have a record from `time` This enabled partial forklift funs which will only extract data “later than X” TODO: assumes all columns have a default NULL setting

# File lib/forklift/patterns/mysql_patterns.rb, line 110
def self.write_high_water_mark(db, time, matcher=db.default_matcher)
  db.tables.each do |table|
    columns, types = db.columns(table, db.current_database, true)
    if columns.include?(matcher)
      row = {}
      i = 0
      while( i < columns.length )
        if(columns[i] == matcher)
          row[columns[i]] = time.to_s(:db)
        elsif( types[i] =~ /text/ )
          row[columns[i]] = "~~stub~~" 
        elsif( types[i] =~ /varchar/  )
          row[columns[i]] = "~~stub~~".to_sym
        elsif( types[i] =~ /float/ || types[i] =~ /int/ || types[i] =~ /decimal/ )
          row[columns[i]] = 0
        elsif( types[i] =~ /datetime/ || types[i] =~ /timetsamp/ )
          row[columns[i]] = time.to_s(:db)
        elsif( types[i] =~ /date/ )
          row[columns[i]] = time.to_s(:db).split(" ").first
        else
          row[columns[i]] = "NULL"
        end
        i = i + 1
      end
      db.write([row], table)
    end
  end
end