class NoSE::Loader::MysqlLoader

Load data from a MySQL database into a backend

Public Class Methods

new(workload = nil, backend = nil) click to toggle source
# File lib/nose/loader/mysql.rb, line 15
def initialize(workload = nil, backend = nil)
  @logger = Logging.logger['nose::loader::mysqlloader']

  @workload = workload
  @backend = backend
end

Public Instance Methods

load(indexes, config, show_progress = false, limit = nil, skip_existing = true) click to toggle source

Load a generated set of indexes with data from MySQL

# File lib/nose/loader/mysql.rb, line 23
def load(indexes, config, show_progress = false, limit = nil,
         skip_existing = true)
  indexes.map!(&:to_id_graph).uniq! if @backend.by_id_graph

  # XXX Assuming backend is thread-safe
  Parallel.each(indexes, in_threads: 2) do |index|
    load_index index, config, show_progress, limit, skip_existing
  end
end
workload(config) click to toggle source

Read all tables in the database and construct a workload object

# File lib/nose/loader/mysql.rb, line 34
def workload(config)
  client = new_client config

  workload = Workload.new
  results = if @array_options
              client.query('SHOW TABLES').each(**@array_options)
            else
              client.query('SHOW TABLES').each
            end

  results.each do |table, *|
    # TODO: Handle foreign keys
    workload << entity_for_table(client, table)
  end

  workload
end

Private Instance Methods

entity_for_table(client, table) click to toggle source

Generate an entity definition from a given table @return [Entity]

# File lib/nose/loader/mysql.rb, line 160
def entity_for_table(client, table)
  entity = Entity.new table
  count = client.query("SELECT COUNT(*) FROM #{table}").first
  entity.count = count.is_a?(Hash) ? count.values.first : count

  describe = if @array_options
               client.query("DESCRIBE #{table}").each(**@array_options)
             else
               client.query("DESCRIBE #{table}").each
             end

  describe.each do |name, type, _, key|
    field_class = key == 'PRI' ? Fields::IDField : field_class(type)
    entity << field_class.new(name)
  end

  entity
end
field_class(type) click to toggle source

Produce the Ruby class used to represent a MySQL type @return [Class]

# File lib/nose/loader/mysql.rb, line 181
def field_class(type)
  case type
  when /datetime/
    Fields::DateField
  when /float/
    Fields::FloatField
  when /text/
    # TODO: Get length
    Fields::StringField
  when /varchar\(([0-9]+)\)/
    # TODO: Use length
    Fields::StringField
  when /(tiny)?int/
    Fields::IntegerField
  end
end
hash_from_row(row, fields) click to toggle source

Construct a hash from the given row returned by the client @return [Hash]

# File lib/nose/loader/mysql.rb, line 104
def hash_from_row(row, fields)
  row_hash = {}
  fields.each_with_index do |field, i|
    value = field.class.value_from_string row[i]
    row_hash[field.id] = value
  end

  row_hash
end
index_sql(index, limit = nil) click to toggle source

Construct a SQL statement to fetch the data to populate this index @return [String]

# File lib/nose/loader/mysql.rb, line 144
def index_sql(index, limit = nil)
  # Get all the necessary fields
  fields, select = index_sql_select index

  # Construct the join condition
  tables = index_sql_tables index

  query = "SELECT #{select.join ', '} FROM #{tables}"
  query += " LIMIT #{limit}" unless limit.nil?

  @logger.debug query
  [query, fields]
end
index_sql_select(index) click to toggle source

Get all the fields selected by this index

# File lib/nose/loader/mysql.rb, line 115
def index_sql_select(index)
  fields = index.hash_fields.to_a + index.order_fields + index.extra.to_a

  [fields, fields.map do |field|
    "#{field.parent.name}.#{field.name} AS " \
    "#{field.parent.name}_#{field.name}"
  end]
end
index_sql_tables(index) click to toggle source

Get the list of tables along with the join condition for a query to fetch index data @return [String]

# File lib/nose/loader/mysql.rb, line 127
def index_sql_tables(index)
  # Create JOIN statements
  tables = index.graph.entities.map(&:name).join ' JOIN '
  return tables if index.graph.size == 1

  tables << ' WHERE '
  tables << index.path.each_cons(2).map do |_prev_key, key|
    key = key.reverse if key.relationship == :many
    "#{key.parent.name}.#{key.name}=" \
      "#{key.entity.name}.#{key.entity.id_field.name}"
  end.join(' AND ')

  tables
end
load_index(index, config, show_progress, limit, skip_existing) click to toggle source

Load a single index into the backend @return [void]

# File lib/nose/loader/mysql.rb, line 73
def load_index(index, config, show_progress, limit, skip_existing)
  client = new_client config

  # Skip this index if it's not empty
  if skip_existing && !@backend.index_empty?(index)
    @logger.info "Skipping index #{index.inspect}" if show_progress
    return
  end
  @logger.info index.inspect if show_progress

  sql, fields = index_sql index, limit
  results = if @query_options
              client.query(sql, **@query_options)
            else
              client.query(sql).map { |row| hash_from_row row, fields }
            end

  result_chunk = []
  results.each do |result|
    result_chunk.push result
    next if result_chunk.length < 1000

    @backend.index_insert_chunk index, result_chunk
    result_chunk = []
  end
  @backend.index_insert_chunk index, result_chunk \
    unless result_chunk.empty?
end
new_client(config) click to toggle source

Create a new client from the given configuration

# File lib/nose/loader/mysql.rb, line 55
def new_client(config)
  if Object.const_defined?(:Mysql2)
    @query_options = { stream: true, cache_rows: false }
    @array_options = { as: :array }
    Mysql2::Client.new host: config[:host],
                       username: config[:username],
                       password: config[:password],
                       database: config[:database]
  else
    @query_options = false
    @array_options = false
    Mysql.connect config[:host], config[:username], config[:password],
                  config[:database]
  end
end