class DataMiner::Step::Import

A step that imports data from a remote source.

Create these by calling import inside a data_miner block.

@see DataMiner::ActiveRecordClassMethods#data_miner Overview of how to define data miner scripts inside of ActiveRecord models. @see DataMiner::Script#import Creating an import step by calling DataMiner::Script#import from inside a data miner script @see DataMiner::Attribute The Attribute class, which maps local columns and remote data fields from within an import step

Attributes

attributes[R]

The mappings of local columns to remote data source fields. @return [Array<DataMiner::Attribute>]

description[R]

Description of what this step does. @return [String]

limit[R]

Max number of rows to import. @return [Numeric]

listeners[R]

@private

random_skip[R]

Number from zero to one representing what percentage of rows to skip. Defaults to 0, of course :) @return [Numeric]

Public Class Methods

new(script, description, settings, &blk) click to toggle source

@private

# File lib/data_miner/step/import.rb, line 35
def initialize(script, description, settings, &blk)
  settings = settings.stringify_keys
  if settings.has_key?('table')
    raise ::ArgumentError, %{[data_miner] :table is no longer an allowed setting.}
  end
  if (errata_settings = settings['errata']) and not errata_settings.is_a?(::Hash)
    raise ::ArgumentError, %{[data_miner] :errata must be a hash of initialization settings to Errata}
  end
  @script = script
  @attributes = ::ActiveSupport::OrderedHash.new
  @validate_query = !!settings['validate']
  @description = description
  if settings.has_key? 'errata'
    errata_settings = settings['errata'].stringify_keys
    errata_settings['responder'] ||= model
    settings['errata'] = errata_settings
  end
  @table_settings = settings.dup
  @table_settings['streaming'] = true
  @table_mutex = ::Mutex.new
  @limit = settings.fetch 'limit', (1.0/0)
  @random_skip = settings['random_skip']
  @listeners = []
  instance_eval(&blk)
end

Public Instance Methods

key(attr_name, attr_options = {}) click to toggle source

Store data into a model column AND use it as the key.

@see DataMiner::Attribute The actual Attribute class.

Enables idempotency. In other words, you can run the data miner script multiple times, get updated data, and not get duplicate rows.

@param [String] attr_name The name of the local model column. @param [optional, Hash] attr_options Options that will be passed to DataMiner::Attribute.new @option attr_options [*] anything Any option for DataMiner::Attribute.

@return [nil]

# File lib/data_miner/step/import.rb, line 89
def key(attr_name, attr_options = {})
  attr_name = attr_name.to_s
  if attributes.has_key? attr_name
    raise "You should only call store or key once for #{model.name}##{attr_name}"
  end
  @key = attr_name
  store attr_name, attr_options
end
register(step) click to toggle source
# File lib/data_miner/step/import.rb, line 111
def register(step)
  if step.target?(self)
    listeners << step
  end
end
start() click to toggle source

@private

# File lib/data_miner/step/import.rb, line 99
def start
  upsert_enabled? ? save_with_upsert : save_with_find_or_initialize
  refresh
  nil
end
store(attr_name, attr_options = {}, &blk) click to toggle source

Store data into a model column.

@see DataMiner::Attribute The actual Attribute class.

@param [String] attr_name The name of the local model column. @param [optional, Hash] attr_options Options that will be passed to DataMiner::Attribute.new @option attr_options [*] anything Any option for DataMiner::Attribute.

@return [nil]

# File lib/data_miner/step/import.rb, line 70
def store(attr_name, attr_options = {}, &blk)
  attr_name = attr_name.to_s
  if attributes.has_key? attr_name
    raise "You should only call store or key once for #{model.name}##{attr_name}"
  end
  attributes[attr_name] = DataMiner::Attribute.new self, attr_name, attr_options, &blk
end
validate?() click to toggle source

@private Whether to run ActiveRecord validations. Slows things down because Upsert isn’t used.

# File lib/data_miner/step/import.rb, line 107
def validate?
  @validate_query == true
end

Private Instance Methods

count_every() click to toggle source
# File lib/data_miner/step/import.rb, line 123
def count_every
  @count_every ||= ENV.fetch('DATA_MINER_COUNT_EVERY', -1).to_i
end
refresh() click to toggle source
# File lib/data_miner/step/import.rb, line 207
def refresh
  @table = nil
  nil
end
save_with_find_or_initialize() click to toggle source
# File lib/data_miner/step/import.rb, line 158
def save_with_find_or_initialize
  count = 0
  table.each do |row|
    next if random_skip and random_skip > Kernel.rand
    $stderr.puts "#{count}..." if count_every > 0 and count % count_every == 0
    break if count > limit
    count += 1
    record = @key ? model.send("find_or_initialize_by_#{@key}", attributes[@key].read(row)) : model.new
    attributes.each { |_, attr| attr.set_from_row record, row }
    record.save!
    listeners.select! do |listener|
      listener.notify self, count
    end
  end
end
save_with_upsert() click to toggle source
# File lib/data_miner/step/import.rb, line 127
def save_with_upsert
  c = model.connection_pool.checkout
  attrs_except_key = attributes.except(@key).values
  count = 0
  Upsert.stream(c, model.table_name) do |upsert|
    table.each do |row|
      next if random_skip and random_skip > Kernel.rand
      $stderr.puts "#{count}..." if count_every > 0 and count % count_every == 0
      break if count > limit
      count += 1
      selector = @key ? { @key => attributes[@key].read(row) } : { model.primary_key => nil }
      document = attrs_except_key.inject({}) do |memo, attr|
        attr.updates(row).each do |k, v|
          case memo[k]
          when ::Hash
            memo[k] = memo[k].merge v
          else
            memo[k] = v
          end
        end
        memo
      end
      upsert.row selector, document
      listeners.select! do |listener|
        listener.notify self, count
      end
    end
  end
  model.connection_pool.checkin c
end
storing_primary_key?() click to toggle source
# File lib/data_miner/step/import.rb, line 196
def storing_primary_key?
  return @storing_primary_key_query if defined?(@storing_primary_key_query)
  @storing_primary_key_query = model.primary_key && attributes.has_key?(model.primary_key)
end
table() click to toggle source
# File lib/data_miner/step/import.rb, line 201
def table
  @table || @table_mutex.synchronize do
    @table ||= ::RemoteTable.new(@table_settings)
  end
end
table_has_autoincrementing_primary_key?() click to toggle source
# File lib/data_miner/step/import.rb, line 174
def table_has_autoincrementing_primary_key?
  return @table_has_autoincrementing_primary_key_query if defined?(@table_has_autoincrementing_primary_key_query)
  c = model.connection_pool.checkout
  answer = if (pk = model.primary_key) and model.columns_hash[pk].type == :integer
    case c.adapter_name
    when /mysql/i
      extra = c.select_value %{SELECT EXTRA FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = #{c.quote(c.current_database)} AND TABLE_NAME = #{c.quote(model.table_name)} AND COLUMN_NAME = #{c.quote(pk)}}
      extra.to_s.include?('auto_increment')
    when /postgres/i
      column_default = c.select_value %{SELECT COLUMN_DEFAULT FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = #{c.quote(model.table_name)} AND COLUMN_NAME = #{c.quote(pk)}}
      column_default.to_s.include?('nextval')
    when /sqlite/i
      # FIXME doesn't work
      # row = c.select_rows("PRAGMA table_info(#{model.quoted_table_name})").detect { |r| r[1] == pk }
      # row[2] == 'INTEGER' and row[3] == 1 and row[5] == 1
      true
    end
  end
  model.connection_pool.checkin c
  @table_has_autoincrementing_primary_key_query = answer
end
upsert_enabled?() click to toggle source
# File lib/data_miner/step/import.rb, line 119
def upsert_enabled?
  (not validate?) and (storing_primary_key? or table_has_autoincrementing_primary_key?)
end