class Tapsoob::DataStream
Constants
- DEFAULT_CHUNKSIZE
Attributes
db[R]
options[R]
state[R]
Public Class Methods
factory(db, state, opts)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 285 def self.factory(db, state, opts) if defined?(Sequel::MySQL) && Sequel::MySQL.respond_to?(:convert_invalid_date_time=) Sequel::MySQL.convert_invalid_date_time = :nil end if state.has_key?(:klass) return eval(state[:klass]).new(db, state, opts) end if Tapsoob::Utils.single_integer_primary_key(db, state[:table_name].to_sym) DataStreamKeyed.new(db, state, opts) else DataStream.new(db, state, opts) end end
new(db, state, opts = {})
click to toggle source
# File lib/tapsoob/data_stream.rb, line 10 def initialize(db, state, opts = {}) @db = db @state = { :offset => 0, :avg_chunksize => 0, :num_chunksize => 0, :total_chunksize => 0 }.merge(state) @state[:chunksize] ||= DEFAULT_CHUNKSIZE @options = opts @complete = false end
parse_json(json)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 201 def self.parse_json(json) hash = JSON.parse(json).symbolize_keys hash[:state].symbolize_keys! if hash.has_key?(:state) hash end
Public Instance Methods
complete?()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 143 def complete? @complete end
encode_rows(rows)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 119 def encode_rows(rows) Tapsoob::Utils.base64encode(Marshal.dump(rows)) end
error()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 32 def error state[:error] || false end
error=(val)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 28 def error=(val) state[:error] = val end
fetch(opts = {})
click to toggle source
# File lib/tapsoob/data_stream.rb, line 123 def fetch(opts = {}) opts = (opts.empty? ? { :type => "database", :source => db.uri } : opts) log.debug "DataStream#fetch state -> #{state.inspect}" t1 = Time.now rows = (opts[:type] == "file" ? fetch_file(opts[:source]) : fetch_rows) encoded_data = encode_rows(rows) t2 = Time.now elapsed_time = t2 - t1 if opts[:type] == "file" @complete = rows[:data] == [ ] else @complete = rows == { } end [encoded_data, (@complete ? 0 : rows[:data].size), elapsed_time] end
fetch_chunksize()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 104 def fetch_chunksize chunksize = state[:chunksize] return chunksize if state[:num_chunksize] < max_chunksize_training return chunksize if state[:avg_chunksize] == 0 return chunksize if state[:error] state[:avg_chunksize] > chunksize ? state[:avg_chunksize] : chunksize end
fetch_data_in_database(params)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 186 def fetch_data_in_database(params) encoded_data = params[:encoded_data] rows = parse_encoded_data(encoded_data, params[:checksum]) @complete = rows[:data] == [ ] unless @complete import_rows(rows) rows[:data].size else 0 end end
fetch_database() { |rows| ... }
click to toggle source
# File lib/tapsoob/data_stream.rb, line 147 def fetch_database params = fetch_from_database encoded_data = params[:encoded_data] json = params[:json] rows = parse_encoded_data(encoded_data, json[:checksum]) @complete = rows == { } # update local state state.merge!(json[:state].merge(:chunksize => state[:chunksize])) unless @complete yield rows if block_given? state[:offset] += rows[:data].size rows[:data].size else 0 end end
fetch_file(dump_path)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 86 def fetch_file(dump_path) state[:chunksize] = fetch_chunksize ds = JSON.parse(File.read(File.join(dump_path, "data", "#{table_name}.json"))) log.debug "DataStream#fetch_file" rows = { :table_name => ds["table_name"], :header => ds["header"], :data => ((@options[:"skip-duplicates"] ? ds["data"].uniq : ds["data"])[state[:offset], (state[:offset] + state[:chunksize])] || [ ]), :types => ds["types"] } update_chunksize_stats rows end
fetch_from_database()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 168 def fetch_from_database res = nil log.debug "DataStream#fetch_from_database state -> #{state.inspect}" state[:chunksize] = Tapsoob::Utils.calculate_chunksize(state[:chunksize]) do |c| state[:chunksize] = c.to_i encoded_data = fetch.first checksum = Tapsoob::Utils.checksum(encoded_data).to_s res = { :json => { :checksum => checksum, :state => to_hash }, :encoded_data => encoded_data } end res end
fetch_rows()
click to toggle source
keep a record of the average chunksize within the first few hundred thousand records, after chunksize goes below 100 or maybe if offset is > 1000
# File lib/tapsoob/data_stream.rb, line 73 def fetch_rows state[:chunksize] = fetch_chunksize ds = table.order(*order_by).limit(state[:chunksize], state[:offset]) log.debug "DataStream#fetch_rows SQL -> #{ds.sql}" rows = Tapsoob::Utils.format_data(ds.all, :string_columns => string_columns, :schema => db.schema(table_name), :table => table_name ) update_chunksize_stats rows end
import_rows(rows)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 221 def import_rows(rows) columns = rows[:header] data = rows[:data] # Only import existing columns if table.columns.size != columns.size existing_columns = table.columns.map(&:to_s) additional_columns = columns - existing_columns additional_columns_idxs = additional_columns.map { |c| columns.index(c) } additional_columns_idxs.reverse.each do |idx| columns.delete_at(idx) rows[:types].delete_at(idx) end data.each_index { |didx| additional_columns_idxs.reverse.each { |idx| data[didx].delete_at(idx) } } end # Decode blobs if rows.has_key?(:types) && rows[:types].include?("blob") blob_indices = rows[:types].each_index.select { |idx| rows[:types][idx] == "blob" } data.each_index do |idx| blob_indices.each do |bi| data[idx][bi] = Sequel::SQL::Blob.new(Tapsoob::Utils.base64decode(data[idx][bi])) unless data[idx][bi].nil? end end end # Parse date/datetime/time columns if rows.has_key?(:types) %w(date datetime time).each do |type| if rows[:types].include?(type) type_indices = rows[:types].each_index.select { |idx| rows[:types][idx] == type } data.each_index do |idx| type_indices.each do |ti| data[idx][ti] = Sequel.send("string_to_#{type}".to_sym, data[idx][ti]) unless data[idx][ti].nil? end end end end end # Remove id column if @options[:"discard-identity"] && rows[:header].include?("id") columns = rows[:header] - ["id"] data = data.map { |d| d[1..-1] } end table.import(columns, data, :commit_every => 100) state[:offset] += rows[:data].size rescue Exception => ex case ex.message when /integer out of range/ then raise Tapsoob::InvalidData, <<-ERROR, [] \nDetected integer data that exceeds the maximum allowable size for an integer type. This generally occurs when importing from SQLite due to the fact that SQLite does not enforce maximum values on integer types. ERROR else raise ex end end
increment(row_count)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 67 def increment(row_count) state[:offset] += row_count end
log()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 23 def log Tapsoob.log.level = Logger::DEBUG if state[:debug] Tapsoob.log end
max_chunksize_training()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 100 def max_chunksize_training 20 end
order_by(name=nil)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 60 def order_by(name=nil) @order_by ||= begin name ||= table_name Tapsoob::Utils.order_by(db, name) end end
parse_encoded_data(encoded_data, checksum)
click to toggle source
# File lib/tapsoob/data_stream.rb, line 207 def parse_encoded_data(encoded_data, checksum) raise Tapsoob::CorruptedData.new("Checksum Failed") unless Tapsoob::Utils.valid_data?(encoded_data, checksum) begin return Marshal.load(Tapsoob::Utils.base64decode(encoded_data)) rescue Object => e unless ENV['NO_DUMP_MARSHAL_ERRORS'] puts "Error encountered loading data, wrote the data chunk to dump.#{Process.pid}.dat" File.open("dump.#{Process.pid}.dat", "w") { |f| f.write(encoded_data) } end raise e end end
string_columns()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 52 def string_columns @string_columns ||= Tapsoob::Utils.incorrect_blobs(db, table_name) end
table()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 56 def table @table ||= db[table_name_sql] end
table_name()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 36 def table_name state[:table_name].to_sym end
table_name_sql()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 40 def table_name_sql table_name end
to_hash()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 44 def to_hash state.merge(:klass => self.class.to_s) end
to_json()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 48 def to_json JSON.generate(to_hash) end
update_chunksize_stats()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 112 def update_chunksize_stats return if state[:num_chunksize] >= max_chunksize_training state[:total_chunksize] += state[:chunksize] state[:num_chunksize] += 1 state[:avg_chunksize] = state[:total_chunksize] / state[:num_chunksize] rescue state[:chunksize] end
verify_stream()
click to toggle source
# File lib/tapsoob/data_stream.rb, line 281 def verify_stream state[:offset] = table.count end