class Purview::Databases::Base

Attributes

name[R]
opts[R]
tables[R]

Public Class Methods

new(name, opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 6
def initialize(name, opts={})
  @name = name.to_sym
  @opts = opts
  @tables = Set.new.tap do |result|
    (default_tables + tables_opt).each do |table|
      table.database = self if result.add?(table)
    end
  end
end

Public Instance Methods

baseline_table(table, timestamp=Time.now) click to toggle source
# File lib/purview/databases/base.rb, line 16
def baseline_table(table, timestamp=Time.now)
  ensure_table_valid_for_database(table)
  raise Purview::Exceptions::CouldNotBaselineTable.new(table) \
    unless table_initialized?(table)
  table_name = table_name(table)
  with_context_logging("`baseline_table` for: #{table_name}") do
    starting_timestamp = timestamp
    with_table_locked(table, starting_timestamp) do
      loop do
        last_window = sync_table_without_lock(table, timestamp)
        break if last_window.max > starting_timestamp
      end
    end
  end
  table_name
end
create_index(index, opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 33
def create_index(index, opts={})
  ensure_index_valid_for_database(index)
  table_opts = extract_table_opts(opts)
  table_name = table_name(index.table, table_opts)
  index_opts = extract_index_opts(opts)
  index_name = index_name(
    table_name,
    index,
    index_opts
  )
  with_context_logging("`create_index` for: #{index_name}") do
    with_new_or_existing_connection(opts) do |connection|
      connection.execute(
        create_index_sql(
          table_name,
          index_name,
          index,
          index_opts
        )
      )
    end
  end
  index_name
end
create_table(table, opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 58
def create_table(table, opts={})
  ensure_table_valid_for_database(table)
  ensure_table_metadata_exists_for_table(table)
  table_opts = extract_table_opts(opts)
  table_name = table_name(table, table_opts)
  with_context_logging("`create_table` for: #{table_name}") do
    with_new_or_existing_connection(opts) do |connection|
      connection.execute(
        send(
          create_table_sql_method_name(table, table_opts),
          table_name,
          table,
          table_opts
        )
      )
      if table_opts[:create_indices]
        table.indices.each do |index|
          create_index(
            index,
            :connection => connection,
            :table => { :name => table_name }
          )
        end
      end
    end
  end
  table_name
end
disable_table(table, timestamp=Time.now) click to toggle source
# File lib/purview/databases/base.rb, line 87
def disable_table(table, timestamp=Time.now)
  ensure_table_valid_for_database(table)
  table_name = table_name(table)
  with_context_logging("`disable_table` for: #{table_name}") do
    with_new_connection do |connection|
      rows_affected = \
        connection.execute(disable_table_sql(table)).rows_affected
      raise Purview::Exceptions::CouldNotDisableTable.new(table) \
        if zero?(rows_affected)
    end
  end
  table_name
end
drop_index(index, opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 101
def drop_index(index, opts={})
  ensure_index_valid_for_database(index)
  table_opts = extract_table_opts(opts)
  table_name = table_name(index.table, table_opts)
  index_opts = extract_index_opts(opts)
  index_name = index_name(
    table_name,
    index,
    index_opts
  )
  with_context_logging("`drop_index` for: #{index_name}") do
    with_new_or_existing_connection(opts) do |connection|
      connection.execute(
        drop_index_sql(
          table_name,
          index_name,
          index,
          index_opts
        )
      )
    end
  end
  index_name
end
drop_table(table, opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 126
def drop_table(table, opts={})
  ensure_table_valid_for_database(table)
  ensure_table_metadata_absent_for_table(table)
  table_opts = extract_table_opts(opts)
  table_name = table_name(table, table_opts)
  with_context_logging("`drop_table` for: #{table_name}") do
    with_new_connection do |connection|
      connection.execute(
        drop_table_sql(
          table_name,
          table,
          table_opts
        )
      )
    end
  end
  table_name
end
enable_table(table, timestamp=Time.now) click to toggle source
# File lib/purview/databases/base.rb, line 145
def enable_table(table, timestamp=Time.now)
  ensure_table_valid_for_database(table)
  table_name = table_name(table)
  with_context_logging("`enable_table` for: #{table_name}") do
    with_new_connection do |connection|
      rows_affected = \
        connection.execute(enable_table_sql(table, timestamp)).rows_affected
      raise Purview::Exceptions::CouldNotEnableTable.new(table) \
        if zero?(rows_affected)
    end
  end
  table_name
end
initialize_table(table, timestamp=Time.now) click to toggle source
# File lib/purview/databases/base.rb, line 159
def initialize_table(table, timestamp=Time.now)
  ensure_table_valid_for_database(table)
  table_name = table_name(table)
  with_context_logging("`initialize_table` for: #{table_name}") do
    with_new_connection do |connection|
      rows_affected = \
        connection.execute(initialize_table_sql(table, timestamp)).rows_affected
      raise Purview::Exceptions::CouldNotInitializeTable.new(table) \
        if zero?(rows_affected)
    end
  end
  table_name
end
lock_table(table, timestamp=Time.now) click to toggle source
# File lib/purview/databases/base.rb, line 173
def lock_table(table, timestamp=Time.now)
  ensure_table_valid_for_database(table)
  table_name = table_name(table)
  with_context_logging("`lock_table` for: #{table_name}") do
    with_new_connection do |connection|
      rows_affected = \
        connection.execute(lock_table_sql(table, timestamp)).rows_affected
      raise Purview::Exceptions::CouldNotLockTable.new(table) \
        if zero?(rows_affected)
    end
  end
  table_name
end
sync() click to toggle source
# File lib/purview/databases/base.rb, line 187
def sync
  with_context_logging('`sync`') do
    with_timestamp do |timestamp|
      with_next_table(timestamp) do |table|
        sync_table_with_lock(table, timestamp)
      end
    end
  end
end
sync_table(table, timestamp=Time.now) click to toggle source
# File lib/purview/databases/base.rb, line 197
def sync_table(table, timestamp=Time.now)
  ensure_table_valid_for_database(table)
  raise Purview::Exceptions::CouldNotSyncTable.new(table) \
    unless table_initialized?(table) && table_enabled?(table)
  table_name = table_name(table)
  with_context_logging("`sync_table` for: #{table_name}") do
    sync_table_with_lock(table, timestamp)
  end
  table_name
end
table_metadata(table) click to toggle source
# File lib/purview/databases/base.rb, line 208
def table_metadata(table)
  ensure_table_valid_for_database(table)
  table_metadata = nil
  table_name = table_name(table)
  with_context_logging("`table_metadata` for: #{table_name}") do
    with_new_connection do |connection|
      table_metadata = Purview::Structs::TableMetadata.new(
        table_metadata_table.columns.reduce({}) do |memo, column|
          memo[column.name] = get_table_metadata_value(
            connection,
            table,
            column
          )
          memo
        end
      )
    end
  end
  table_metadata
end
unlock_table(table) click to toggle source
# File lib/purview/databases/base.rb, line 229
def unlock_table(table)
  ensure_table_valid_for_database(table)
  table_name = table_name(table)
  with_context_logging("`unlock_table` for: #{table_name}") do
    with_new_connection do |connection|
      rows_affected = \
        connection.execute(unlock_table_sql(table)).rows_affected
      raise Purview::Exceptions::CouldNotUnlockTable.new(table) \
        if zero?(rows_affected)
    end
  end
  table_name
end

Private Instance Methods

column_definition(column) click to toggle source
# File lib/purview/databases/base.rb, line 254
def column_definition(column)
  column.name.to_s.tap do |column_definition|
    type = type(column)
    column_definition << " #{type}"
    limit = limit(column)
    column_definition << "(#{limit})" if limit
    primary_key = primary_key?(column)
    column_definition << ' PRIMARY KEY' if primary_key
    nullable = nullable?(column)
    column_definition << " #{nullable ? 'NULL' : 'NOT NULL'}"
    default = default(column)
    column_definition << " DEFAULT #{default}" if default
  end
end
column_definitions(index_or_table) click to toggle source
# File lib/purview/databases/base.rb, line 273
def column_definitions(index_or_table)
  index_or_table.columns.map { |column| column_definition(column) }
end
column_names(index_or_table) click to toggle source
# File lib/purview/databases/base.rb, line 269
def column_names(index_or_table)
  index_or_table.columns.map(&:name)
end
connection_type() click to toggle source
# File lib/purview/databases/base.rb, line 277
def connection_type
  raise %{All "#{Base}(s)" must override the "connection_type" method}
end
create_index_sql(table_name, index_name, index, index_opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 281
def create_index_sql(table_name, index_name, index, index_opts={})
  raise %{All "#{Base}(s)" must override the "create_index_sql" method}
end
create_table_sql(table_name, table, table_opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 285
def create_table_sql(table_name, table, table_opts={})
  raise %{All "#{Base}(s)" must override the "create_table_sql" method}
end
create_table_sql_method_name(table, table_opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 289
def create_table_sql_method_name(table, table_opts={})
  "create#{table_opts[:temporary] && '_temporary'}_table_sql".to_sym
end
create_temporary_table_sql(table_name, table, table_opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 293
def create_temporary_table_sql(table_name, table, table_opts={})
  raise %{All "#{Base}(s)" must override the "create_temporary_table_sql" method}
end
database_host() click to toggle source
# File lib/purview/databases/base.rb, line 297
def database_host
  opts[:database_host]
end
database_name() click to toggle source
# File lib/purview/databases/base.rb, line 301
def database_name
  name
end
database_password() click to toggle source
# File lib/purview/databases/base.rb, line 305
def database_password
  opts[:database_password]
end
database_port() click to toggle source
# File lib/purview/databases/base.rb, line 309
def database_port
  opts[:database_port]
end
database_username() click to toggle source
# File lib/purview/databases/base.rb, line 313
def database_username
  opts[:database_username]
end
default(column) click to toggle source
# File lib/purview/databases/base.rb, line 317
def default(column)
  column.default || default_map[column.type]
end
default_map() click to toggle source
# File lib/purview/databases/base.rb, line 321
def default_map
  {}
end
default_tables() click to toggle source
# File lib/purview/databases/base.rb, line 325
def default_tables
  []
end
dialect_type() click to toggle source
# File lib/purview/databases/base.rb, line 329
def dialect_type
  raise %{All "#{Base}(s)" must override the "dialect_type" method}
end
disable_table_sql(table) click to toggle source
# File lib/purview/databases/base.rb, line 333
def disable_table_sql(table)
  raise %{All "#{Base}(s)" must override the "disable_table_sql" method}
end
drop_index_sql(table_name, index_name, index, index_opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 337
def drop_index_sql(table_name, index_name, index, index_opts={})
  raise %{All "#{Base}(s)" must override the "drop_index_sql" method}
end
drop_table_sql(table_name, table, table_opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 341
def drop_table_sql(table_name, table, table_opts={})
  raise %{All "#{Base}(s)" must override the "drop_table_sql" method}
end
enable_table_sql(table, timestamp) click to toggle source
# File lib/purview/databases/base.rb, line 345
def enable_table_sql(table, timestamp)
  raise %{All "#{Base}(s)" must override the "enable_table_sql" method}
end
ensure_index_valid_for_database(index) click to toggle source
# File lib/purview/databases/base.rb, line 349
def ensure_index_valid_for_database(index)
  raise ArgumentError.new('Must provide an `Index`') \
    unless index
  ensure_table_valid_for_database(index.table)
end
ensure_table_metadata_absent_for_table(table) click to toggle source
# File lib/purview/databases/base.rb, line 355
def ensure_table_metadata_absent_for_table(table)
  with_new_connection do |connection|
    connection.execute(ensure_table_metadata_table_exists_sql)
    connection.execute(ensure_table_metadata_absent_for_table_sql(table))
  end
end
ensure_table_metadata_absent_for_table_sql(table) click to toggle source
# File lib/purview/databases/base.rb, line 362
def ensure_table_metadata_absent_for_table_sql(table)
  raise %{All "#{Base}(s)" must override the "ensure_table_metadata_absent_for_table_sql" method}
end
ensure_table_metadata_exists_for_table(table) click to toggle source
# File lib/purview/databases/base.rb, line 366
def ensure_table_metadata_exists_for_table(table)
  with_new_connection do |connection|
    connection.execute(ensure_table_metadata_table_exists_sql)
    connection.execute(ensure_table_metadata_exists_for_table_sql(table))
  end
end
ensure_table_metadata_exists_for_table_sql(table) click to toggle source
# File lib/purview/databases/base.rb, line 373
def ensure_table_metadata_exists_for_table_sql(table)
  raise %{All "#{Base}(s)" must override the "ensure_table_metadata_exists_for_table_sql" method}
end
ensure_table_metadata_table_exists_sql() click to toggle source
# File lib/purview/databases/base.rb, line 377
def ensure_table_metadata_table_exists_sql
  raise %{All "#{Base}(s)" must override the "ensure_table_metadata_table_exists_sql" method}
end
ensure_table_valid_for_database(table) click to toggle source
# File lib/purview/databases/base.rb, line 381
def ensure_table_valid_for_database(table)
  raise ArgumentError.new('Must provide a `Table`') \
    unless table
  raise Purview::Exceptions::WrongDatabaseForTable.new(table) \
    unless tables.include?(table)
end
extract_index_opts(opts) click to toggle source
# File lib/purview/databases/base.rb, line 388
def extract_index_opts(opts)
  opts[:index] || {}
end
extract_table_opts(opts) click to toggle source
# File lib/purview/databases/base.rb, line 392
def extract_table_opts(opts)
  opts[:table] || { :create_indices => true }
end
get_table_metadata_value(connection, table, column) click to toggle source
# File lib/purview/databases/base.rb, line 396
def get_table_metadata_value(connection, table, column)
  row = connection.execute(get_table_metadata_value_sql(table, column)).rows[0]
  raise Purview::Exceptions::CouldNotFindMetadataForTable.new(table) \
    unless row
  value = row[column.name]
  value && column.type.parse(value)
end
index_name(table_name, index, index_opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 404
def index_name(table_name, index, index_opts={})
  index_opts[:name] || 'index_%s_on_%s' % [
    table_name,
    column_names(index).join('_and_'),
  ]
end
initialize_table_sql(table, timestamp) click to toggle source
# File lib/purview/databases/base.rb, line 411
def initialize_table_sql(table, timestamp)
  raise %{All "#{Base}(s)" must override the "initialize_table_sql" method}
end
limit(column) click to toggle source
# File lib/purview/databases/base.rb, line 415
def limit(column)
  limitless_types.include?(column.type) ? nil : (column.limit || limit_map[column.type])
end
limit_map() click to toggle source
# File lib/purview/databases/base.rb, line 419
def limit_map
  {}
end
limitless_types() click to toggle source
# File lib/purview/databases/base.rb, line 423
def limitless_types
  []
end
lock_table_sql(table, timestamp) click to toggle source
# File lib/purview/databases/base.rb, line 427
def lock_table_sql(table, timestamp)
  raise %{All "#{Base}(s)" must override the "lock_table_sql" method}
end
next_table(connection, timestamp) click to toggle source
# File lib/purview/databases/base.rb, line 431
def next_table(connection, timestamp)
  row = connection.execute(next_table_sql(timestamp)).rows[0]
  value = row && row[table_metadata_table.table_name_column.name]
  value && table_metadata_table.table_name_column.type.parse(value)
end
next_table_sql(timestamp) click to toggle source
# File lib/purview/databases/base.rb, line 437
def next_table_sql(timestamp)
  raise %{All "#{Base}(s)" must override the "next_table_sql" method}
end
next_window(connection, table, timestamp) click to toggle source
# File lib/purview/databases/base.rb, line 441
def next_window(connection, table, timestamp)
  min = get_table_metadata_value(
    connection,
    table,
    table_metadata_table.max_timestamp_pulled_column
  )
  max = min + table.window_size
  now = timestamp
  min.to_i >= now.to_i ? nil : Purview::Structs::Window.new(
    :min => min,
    :max => max > now ? now : max
  )
end
nullable?(column) click to toggle source
# File lib/purview/databases/base.rb, line 455
def nullable?(column)
  column.nullable?
end
primary_key?(column) click to toggle source
# File lib/purview/databases/base.rb, line 459
def primary_key?(column)
  column.primary_key?
end
set_table_metadata_value(connection, table, column, value) click to toggle source
# File lib/purview/databases/base.rb, line 463
def set_table_metadata_value(connection, table, column, value)
  rows_affected = \
    connection.execute(set_table_metadata_value_sql(table, column, value)).rows_affected
  raise Purview::Exceptions::CouldNotUpdateMetadataForTable.new(table) \
    if zero?(rows_affected)
end
sync_table_with_lock(table, timestamp) click to toggle source
# File lib/purview/databases/base.rb, line 470
def sync_table_with_lock(table, timestamp)
  last_window = nil
  with_table_locked(table, timestamp) do
    last_window = sync_table_without_lock(table, timestamp)
  end
  last_window
end
sync_table_without_lock(table, timestamp) click to toggle source
# File lib/purview/databases/base.rb, line 478
def sync_table_without_lock(table, timestamp)
  last_window = nil
  with_next_window(table, timestamp) do |window|
    with_new_connection do |connection|
      with_transaction(connection) do
        table.sync(connection, window)
        set_table_metadata_value(
          connection,
          table,
          table_metadata_table.last_pulled_at_column,
          timestamp
        )
        set_table_metadata_value(
          connection,
          table,
          table_metadata_table.max_timestamp_pulled_column,
          window.max
        )
        last_window = window
      end
    end
  end
  last_window
end
table_disabled?(table) click to toggle source
# File lib/purview/databases/base.rb, line 503
def table_disabled?(table)
  !table_enabled?(table)
end
table_enabled?(table) click to toggle source
# File lib/purview/databases/base.rb, line 507
def table_enabled?(table)
  with_new_connection do |connection|
    !!get_table_metadata_value(
        connection,
        table,
        table_metadata_table.enabled_at_column
      )
  end
end
table_initialized?(table) click to toggle source
# File lib/purview/databases/base.rb, line 517
def table_initialized?(table)
  with_new_connection do |connection|
    !!get_table_metadata_value(
        connection,
        table,
        table_metadata_table.max_timestamp_pulled_column
      )
  end
end
table_locked?(table) click to toggle source
# File lib/purview/databases/base.rb, line 527
def table_locked?(table)
  with_new_connection do |connection|
    !!get_table_metadata_value(
        connection,
        table,
        table_metadata_table.locked_at_column
      )
  end
end
table_metadata_table() click to toggle source
# File lib/purview/databases/base.rb, line 537
def table_metadata_table
  @table_metadata_table ||= Purview::Tables::TableMetadata.new(self)
end
table_name(table, table_opts={}) click to toggle source
# File lib/purview/databases/base.rb, line 541
def table_name(table, table_opts={})
  table_opts[:name] || table.name
end
table_unlocked?(table) click to toggle source
# File lib/purview/databases/base.rb, line 545
def table_unlocked?(table)
  !table_locked?(table)
end
tables_by_name() click to toggle source
# File lib/purview/databases/base.rb, line 549
def tables_by_name
  @tables_by_name ||= {}.tap do |result|
    tables.each do |table|
      result[table.name] = table
    end
  end
end
tables_opt() click to toggle source
# File lib/purview/databases/base.rb, line 557
def tables_opt
  opts[:tables] || []
end
type(column) click to toggle source
# File lib/purview/databases/base.rb, line 561
def type(column)
  type_map[column.type]
end
type_map() click to toggle source
# File lib/purview/databases/base.rb, line 565
def type_map
  {
    Purview::Types::Boolean => 'boolean',
    Purview::Types::Date => 'date',
    Purview::Types::Float => 'float',
    Purview::Types::Integer => 'integer',
    Purview::Types::String => 'varchar',
    Purview::Types::Text => 'text',
    Purview::Types::Time => 'time',
    Purview::Types::Timestamp => 'timestamp',
  }
end
unlock_table_sql(table) click to toggle source
# File lib/purview/databases/base.rb, line 578
def unlock_table_sql(table)
  raise %{All "#{Base}(s)" must override the "unlock_table_sql" method}
end
with_next_table(timestamp) { |table| ... } click to toggle source
# File lib/purview/databases/base.rb, line 582
def with_next_table(timestamp)
  with_new_connection do |connection|
    table = next_table(connection, timestamp)
    raise Purview::Exceptions::NoTable.new \
      unless table
    yield table
  end
end
with_next_window(table, timestamp) { |window| ... } click to toggle source
# File lib/purview/databases/base.rb, line 591
def with_next_window(table, timestamp)
  with_new_connection do |connection|
    window = next_window(
      connection,
      table,
      timestamp
    )
    raise Purview::Exceptions::NoWindowForTable.new(table) \
      unless window
    yield window
  end
end
with_table_locked(table, timestamp) { || ... } click to toggle source
# File lib/purview/databases/base.rb, line 604
def with_table_locked(table, timestamp)
  lock_table(table, timestamp)
  yield
ensure
  unlock_table(table)
end