class Tapsoob::DataStream

Constants

DEFAULT_CHUNKSIZE

Attributes

db[R]
options[R]
state[R]

Public Class Methods

factory(db, state, opts) click to toggle source
# File lib/tapsoob/data_stream.rb, line 285
def self.factory(db, state, opts)
  if defined?(Sequel::MySQL) && Sequel::MySQL.respond_to?(:convert_invalid_date_time=)
    Sequel::MySQL.convert_invalid_date_time = :nil
  end

  if state.has_key?(:klass)
    return eval(state[:klass]).new(db, state, opts)
  end

  if Tapsoob::Utils.single_integer_primary_key(db, state[:table_name].to_sym)
    DataStreamKeyed.new(db, state, opts)
  else
    DataStream.new(db, state, opts)
  end
end
new(db, state, opts = {}) click to toggle source
# File lib/tapsoob/data_stream.rb, line 10
def initialize(db, state, opts = {})
  @db = db
  @state = {
    :offset          => 0,
    :avg_chunksize   => 0,
    :num_chunksize   => 0,
    :total_chunksize => 0
  }.merge(state)
  @state[:chunksize] ||= DEFAULT_CHUNKSIZE
  @options = opts
  @complete = false
end
parse_json(json) click to toggle source
# File lib/tapsoob/data_stream.rb, line 201
def self.parse_json(json)
  hash = JSON.parse(json).symbolize_keys
  hash[:state].symbolize_keys! if hash.has_key?(:state)
  hash
end

Public Instance Methods

complete?() click to toggle source
# File lib/tapsoob/data_stream.rb, line 143
def complete?
  @complete
end
encode_rows(rows) click to toggle source
# File lib/tapsoob/data_stream.rb, line 119
def encode_rows(rows)
  Tapsoob::Utils.base64encode(Marshal.dump(rows))
end
error() click to toggle source
# File lib/tapsoob/data_stream.rb, line 32
def error
  state[:error] || false
end
error=(val) click to toggle source
# File lib/tapsoob/data_stream.rb, line 28
def error=(val)
  state[:error] = val
end
fetch(opts = {}) click to toggle source
# File lib/tapsoob/data_stream.rb, line 123
def fetch(opts = {})
  opts = (opts.empty? ? { :type => "database", :source => db.uri } : opts)

  log.debug "DataStream#fetch state -> #{state.inspect}"

  t1 = Time.now
  rows = (opts[:type] == "file" ? fetch_file(opts[:source]) : fetch_rows)
  encoded_data = encode_rows(rows)
  t2 = Time.now
  elapsed_time = t2 - t1

  if opts[:type] == "file"
    @complete = rows[:data] == [ ]
  else
    @complete = rows == { }
  end

  [encoded_data, (@complete ? 0 : rows[:data].size), elapsed_time]
end
fetch_chunksize() click to toggle source
# File lib/tapsoob/data_stream.rb, line 104
def fetch_chunksize
  chunksize = state[:chunksize]
  return chunksize if state[:num_chunksize] < max_chunksize_training
  return chunksize if state[:avg_chunksize] == 0
  return chunksize if state[:error]
  state[:avg_chunksize] > chunksize ? state[:avg_chunksize] : chunksize
end
fetch_data_in_database(params) click to toggle source
# File lib/tapsoob/data_stream.rb, line 186
def fetch_data_in_database(params)
  encoded_data = params[:encoded_data]

  rows = parse_encoded_data(encoded_data, params[:checksum])

  @complete = rows[:data] == [ ]

  unless @complete
    import_rows(rows)
    rows[:data].size
  else
    0
  end
end
fetch_database() { |rows| ... } click to toggle source
# File lib/tapsoob/data_stream.rb, line 147
def fetch_database
  params = fetch_from_database
  encoded_data = params[:encoded_data]
  json = params[:json]

  rows = parse_encoded_data(encoded_data, json[:checksum])

  @complete = rows == { }

  # update local state
  state.merge!(json[:state].merge(:chunksize => state[:chunksize]))

  unless @complete
    yield rows if block_given?
    state[:offset] += rows[:data].size
    rows[:data].size
  else
    0
  end
end
fetch_file(dump_path) click to toggle source
# File lib/tapsoob/data_stream.rb, line 86
def fetch_file(dump_path)
  state[:chunksize] = fetch_chunksize
  ds = JSON.parse(File.read(File.join(dump_path, "data", "#{table_name}.json")))
  log.debug "DataStream#fetch_file"
  rows = {
    :table_name => ds["table_name"],
    :header     => ds["header"],
    :data       => ((@options[:"skip-duplicates"] ? ds["data"].uniq : ds["data"])[state[:offset], (state[:offset] + state[:chunksize])] || [ ]),
    :types      => ds["types"]
  }
  update_chunksize_stats
  rows
end
fetch_from_database() click to toggle source
# File lib/tapsoob/data_stream.rb, line 168
def fetch_from_database
  res = nil
  log.debug "DataStream#fetch_from_database state -> #{state.inspect}"
  state[:chunksize] = Tapsoob::Utils.calculate_chunksize(state[:chunksize]) do |c|
    state[:chunksize] = c.to_i
    encoded_data = fetch.first

    checksum = Tapsoob::Utils.checksum(encoded_data).to_s

    res = {
      :json         => { :checksum => checksum, :state => to_hash },
      :encoded_data => encoded_data
    }
  end

  res
end
fetch_rows() click to toggle source

keep a record of the average chunksize within the first few hundred thousand records, after chunksize goes below 100 or maybe if offset is > 1000

# File lib/tapsoob/data_stream.rb, line 73
def fetch_rows
  state[:chunksize] = fetch_chunksize
  ds = table.order(*order_by).limit(state[:chunksize], state[:offset])
  log.debug "DataStream#fetch_rows SQL -> #{ds.sql}"
  rows = Tapsoob::Utils.format_data(ds.all,
    :string_columns => string_columns,
    :schema => db.schema(table_name),
    :table => table_name
  )
  update_chunksize_stats
  rows
end
import_rows(rows) click to toggle source
# File lib/tapsoob/data_stream.rb, line 221
    def import_rows(rows)
      columns = rows[:header]
      data    = rows[:data]

      # Only import existing columns
      if table.columns.size != columns.size
        existing_columns        = table.columns.map(&:to_s)
        additional_columns      = columns - existing_columns
        additional_columns_idxs = additional_columns.map { |c| columns.index(c) }
        additional_columns_idxs.reverse.each do |idx|
          columns.delete_at(idx)
          rows[:types].delete_at(idx)
        end
        data.each_index { |didx| additional_columns_idxs.reverse.each { |idx| data[didx].delete_at(idx) } }
      end

      # Decode blobs
      if rows.has_key?(:types) && rows[:types].include?("blob")
        blob_indices = rows[:types].each_index.select { |idx| rows[:types][idx] == "blob" }
        data.each_index do |idx|
          blob_indices.each do |bi|
            data[idx][bi] = Sequel::SQL::Blob.new(Tapsoob::Utils.base64decode(data[idx][bi])) unless data[idx][bi].nil?
          end
        end
      end

      # Parse date/datetime/time columns
      if rows.has_key?(:types)
        %w(date datetime time).each do |type|
          if rows[:types].include?(type)
            type_indices = rows[:types].each_index.select { |idx| rows[:types][idx] == type }
            data.each_index do |idx|
              type_indices.each do |ti|
                data[idx][ti] = Sequel.send("string_to_#{type}".to_sym, data[idx][ti]) unless data[idx][ti].nil?
              end
            end
          end
        end
      end

      # Remove id column
      if @options[:"discard-identity"] && rows[:header].include?("id")
        columns = rows[:header] - ["id"]
        data    = data.map { |d| d[1..-1] }
      end

      table.import(columns, data, :commit_every => 100)
      state[:offset] += rows[:data].size
    rescue Exception => ex
      case ex.message
      when /integer out of range/ then
        raise Tapsoob::InvalidData, <<-ERROR, []
  \nDetected integer data that exceeds the maximum allowable size for an integer type.
  This generally occurs when importing from SQLite due to the fact that SQLite does
  not enforce maximum values on integer types.
        ERROR
      else raise ex
      end
    end
increment(row_count) click to toggle source
# File lib/tapsoob/data_stream.rb, line 67
def increment(row_count)
  state[:offset] += row_count
end
log() click to toggle source
# File lib/tapsoob/data_stream.rb, line 23
def log
  Tapsoob.log.level = Logger::DEBUG if state[:debug]
  Tapsoob.log
end
max_chunksize_training() click to toggle source
# File lib/tapsoob/data_stream.rb, line 100
def max_chunksize_training
  20
end
order_by(name=nil) click to toggle source
# File lib/tapsoob/data_stream.rb, line 60
def order_by(name=nil)
  @order_by ||= begin
    name ||= table_name
    Tapsoob::Utils.order_by(db, name)
  end
end
parse_encoded_data(encoded_data, checksum) click to toggle source
# File lib/tapsoob/data_stream.rb, line 207
def parse_encoded_data(encoded_data, checksum)
  raise Tapsoob::CorruptedData.new("Checksum Failed") unless Tapsoob::Utils.valid_data?(encoded_data, checksum)

  begin
    return Marshal.load(Tapsoob::Utils.base64decode(encoded_data))
  rescue Object => e
    unless ENV['NO_DUMP_MARSHAL_ERRORS']
      puts "Error encountered loading data, wrote the data chunk to dump.#{Process.pid}.dat"
      File.open("dump.#{Process.pid}.dat", "w") { |f| f.write(encoded_data) }
    end
    raise e
  end
end
string_columns() click to toggle source
# File lib/tapsoob/data_stream.rb, line 52
def string_columns
  @string_columns ||= Tapsoob::Utils.incorrect_blobs(db, table_name)
end
table() click to toggle source
# File lib/tapsoob/data_stream.rb, line 56
def table
  @table ||= db[table_name_sql]
end
table_name() click to toggle source
# File lib/tapsoob/data_stream.rb, line 36
def table_name
  state[:table_name].to_sym
end
table_name_sql() click to toggle source
# File lib/tapsoob/data_stream.rb, line 40
def table_name_sql
  table_name
end
to_hash() click to toggle source
# File lib/tapsoob/data_stream.rb, line 44
def to_hash
  state.merge(:klass => self.class.to_s)
end
to_json() click to toggle source
# File lib/tapsoob/data_stream.rb, line 48
def to_json
  JSON.generate(to_hash)
end
update_chunksize_stats() click to toggle source
# File lib/tapsoob/data_stream.rb, line 112
def update_chunksize_stats
  return if state[:num_chunksize] >= max_chunksize_training
  state[:total_chunksize] += state[:chunksize]
  state[:num_chunksize] += 1
  state[:avg_chunksize] = state[:total_chunksize] / state[:num_chunksize] rescue state[:chunksize]
end
verify_stream() click to toggle source
# File lib/tapsoob/data_stream.rb, line 281
def verify_stream
  state[:offset] = table.count
end