class DataMiner::Step::Import
A step that imports data from a remote source.
Create these by calling import
inside a data_miner
block.
@see DataMiner::ActiveRecordClassMethods#data_miner
Overview of how to define data miner scripts inside of ActiveRecord models. @see DataMiner::Script#import
Creating an import step by calling DataMiner::Script#import
from inside a data miner script @see DataMiner::Attribute
The Attribute
class, which maps local columns and remote data fields from within an import step
Attributes
The mappings of local columns to remote data source fields. @return [Array<DataMiner::Attribute>]
Description of what this step does. @return [String]
Max number of rows to import. @return [Numeric]
@private
Number from zero to one representing what percentage of rows to skip. Defaults to 0, of course :) @return [Numeric]
Public Class Methods
@private
# File lib/data_miner/step/import.rb, line 35 def initialize(script, description, settings, &blk) settings = settings.stringify_keys if settings.has_key?('table') raise ::ArgumentError, %{[data_miner] :table is no longer an allowed setting.} end if (errata_settings = settings['errata']) and not errata_settings.is_a?(::Hash) raise ::ArgumentError, %{[data_miner] :errata must be a hash of initialization settings to Errata} end @script = script @attributes = ::ActiveSupport::OrderedHash.new @validate_query = !!settings['validate'] @description = description if settings.has_key? 'errata' errata_settings = settings['errata'].stringify_keys errata_settings['responder'] ||= model settings['errata'] = errata_settings end @table_settings = settings.dup @table_settings['streaming'] = true @table_mutex = ::Mutex.new @limit = settings.fetch 'limit', (1.0/0) @random_skip = settings['random_skip'] @listeners = [] instance_eval(&blk) end
Public Instance Methods
Store data into a model column AND use it as the key.
@see DataMiner::Attribute
The actual Attribute
class.
Enables idempotency. In other words, you can run the data miner script multiple times, get updated data, and not get duplicate rows.
@param [String] attr_name The name of the local model column. @param [optional, Hash] attr_options Options that will be passed to DataMiner::Attribute.new
@option attr_options [*] anything Any option for DataMiner::Attribute
.
@return [nil]
# File lib/data_miner/step/import.rb, line 89 def key(attr_name, attr_options = {}) attr_name = attr_name.to_s if attributes.has_key? attr_name raise "You should only call store or key once for #{model.name}##{attr_name}" end @key = attr_name store attr_name, attr_options end
# File lib/data_miner/step/import.rb, line 111 def register(step) if step.target?(self) listeners << step end end
@private
# File lib/data_miner/step/import.rb, line 99 def start upsert_enabled? ? save_with_upsert : save_with_find_or_initialize refresh nil end
Store data into a model column.
@see DataMiner::Attribute
The actual Attribute
class.
@param [String] attr_name The name of the local model column. @param [optional, Hash] attr_options Options that will be passed to DataMiner::Attribute.new
@option attr_options [*] anything Any option for DataMiner::Attribute
.
@return [nil]
# File lib/data_miner/step/import.rb, line 70 def store(attr_name, attr_options = {}, &blk) attr_name = attr_name.to_s if attributes.has_key? attr_name raise "You should only call store or key once for #{model.name}##{attr_name}" end attributes[attr_name] = DataMiner::Attribute.new self, attr_name, attr_options, &blk end
@private Whether to run ActiveRecord validations. Slows things down because Upsert isn’t used.
# File lib/data_miner/step/import.rb, line 107 def validate? @validate_query == true end
Private Instance Methods
# File lib/data_miner/step/import.rb, line 123 def count_every @count_every ||= ENV.fetch('DATA_MINER_COUNT_EVERY', -1).to_i end
# File lib/data_miner/step/import.rb, line 207 def refresh @table = nil nil end
# File lib/data_miner/step/import.rb, line 158 def save_with_find_or_initialize count = 0 table.each do |row| next if random_skip and random_skip > Kernel.rand $stderr.puts "#{count}..." if count_every > 0 and count % count_every == 0 break if count > limit count += 1 record = @key ? model.send("find_or_initialize_by_#{@key}", attributes[@key].read(row)) : model.new attributes.each { |_, attr| attr.set_from_row record, row } record.save! listeners.select! do |listener| listener.notify self, count end end end
# File lib/data_miner/step/import.rb, line 127 def save_with_upsert c = model.connection_pool.checkout attrs_except_key = attributes.except(@key).values count = 0 Upsert.stream(c, model.table_name) do |upsert| table.each do |row| next if random_skip and random_skip > Kernel.rand $stderr.puts "#{count}..." if count_every > 0 and count % count_every == 0 break if count > limit count += 1 selector = @key ? { @key => attributes[@key].read(row) } : { model.primary_key => nil } document = attrs_except_key.inject({}) do |memo, attr| attr.updates(row).each do |k, v| case memo[k] when ::Hash memo[k] = memo[k].merge v else memo[k] = v end end memo end upsert.row selector, document listeners.select! do |listener| listener.notify self, count end end end model.connection_pool.checkin c end
# File lib/data_miner/step/import.rb, line 196 def storing_primary_key? return @storing_primary_key_query if defined?(@storing_primary_key_query) @storing_primary_key_query = model.primary_key && attributes.has_key?(model.primary_key) end
# File lib/data_miner/step/import.rb, line 201 def table @table || @table_mutex.synchronize do @table ||= ::RemoteTable.new(@table_settings) end end
# File lib/data_miner/step/import.rb, line 174 def table_has_autoincrementing_primary_key? return @table_has_autoincrementing_primary_key_query if defined?(@table_has_autoincrementing_primary_key_query) c = model.connection_pool.checkout answer = if (pk = model.primary_key) and model.columns_hash[pk].type == :integer case c.adapter_name when /mysql/i extra = c.select_value %{SELECT EXTRA FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = #{c.quote(c.current_database)} AND TABLE_NAME = #{c.quote(model.table_name)} AND COLUMN_NAME = #{c.quote(pk)}} extra.to_s.include?('auto_increment') when /postgres/i column_default = c.select_value %{SELECT COLUMN_DEFAULT FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = #{c.quote(model.table_name)} AND COLUMN_NAME = #{c.quote(pk)}} column_default.to_s.include?('nextval') when /sqlite/i # FIXME doesn't work # row = c.select_rows("PRAGMA table_info(#{model.quoted_table_name})").detect { |r| r[1] == pk } # row[2] == 'INTEGER' and row[3] == 1 and row[5] == 1 true end end model.connection_pool.checkin c @table_has_autoincrementing_primary_key_query = answer end
# File lib/data_miner/step/import.rb, line 119 def upsert_enabled? (not validate?) and (storing_primary_key? or table_has_autoincrementing_primary_key?) end