class MigrationBundler::Databases::CassandraDatabase

Attributes

client[R]
keyspace[R]

Public Class Methods

exception_class() click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 14
def exception_class
  Cql::CqlError
end
migration_ext() click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 10
def migration_ext
  ".cql"
end
new(url) click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 19
def initialize(url)
  super(url)
  options = { host: url.host, port: (url.port || 9042) }
  @client = Cql::Client.connect(options)
  @keyspace = url.path[1..-1] # Drop leading slash
end

Public Instance Methods

all_versions() click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 44
def all_versions
  client.use(keyspace)
  rows = client.execute("SELECT version FROM schema_migrations WHERE partition_key = 0 ORDER BY version ASC")
  rows.each.map { |row| row['version'] }
end
create_migrations_table() click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 63
def create_migrations_table
  client.execute "CREATE KEYSPACE IF NOT EXISTS #{keyspace} WITH replication = {'class' : 'SimpleStrategy', 'replication_factor' : 1};"
  client.execute "CREATE TABLE IF NOT EXISTS #{keyspace}.schema_migrations (partition_key INT, version VARINT, PRIMARY KEY (partition_key, version));"
end
current_version() click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 38
def current_version
  client.use(keyspace)
  rows = client.execute("SELECT version FROM schema_migrations WHERE partition_key = 0 ORDER BY version DESC LIMIT 1")
  rows.empty? ? nil : rows.each.first['version']
end
drop(keyspaces = [keyspace]) click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 59
def drop(keyspaces = [keyspace])
  keyspaces.each { |keyspace| client.execute "DROP KEYSPACE IF EXISTS #{keyspace}" }
end
dump_rows(table_name) click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 68
def dump_rows(table_name)
  client.use(keyspace)
  rows = client.execute "SELECT * FROM #{table_name}"
  columns = Array.new.tap do |columns|
    rows.metadata.each do |column_metadata|
      columns << column_metadata.column_name
    end
  end
  Array.new.tap do |statements|
    rows.each do |row|
      values = columns.map do |column|
        value = row[column]
        serialize_value_of_type(value, rows.metadata[column].type)
      end
      statements << "INSERT INTO #{table_name} (#{columns.join(', ')}) VALUES (#{values.join(', ')});"
    end
  end
end
execute_migration(cql) click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 55
def execute_migration(cql)
  cql.split(';').each { |statement| client.execute(statement) unless statement.strip.empty? }
end
insert_version(version) click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 50
def insert_version(version)
  client.use(keyspace)
  client.execute "INSERT INTO schema_migrations (partition_key, version) VALUES (0, ?)", version
end
migrations_table?() click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 26
def migrations_table?
  client.use('system')
  rows = client.execute "SELECT columnfamily_name FROM schema_columnfamilies WHERE keyspace_name='#{keyspace}' AND columnfamily_name='schema_migrations'"
  !rows.empty?
end
origin_version() click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 32
def origin_version
  client.use(keyspace)
  rows = client.execute("SELECT version FROM schema_migrations WHERE partition_key = 0 ORDER BY version ASC LIMIT 1")
  rows.empty? ? nil : rows.each.first['version']
end
serialize_value_of_type(value, type) click to toggle source
# File lib/migration_bundler/databases/cassandra_database.rb, line 87
def serialize_value_of_type(value, type)
  if type.is_a?(Array) && type.first == :list
    '[' + serialize_value_of_type(value, type.last) + ']'
  elsif type.is_a?(Array) && type.first == :set
    '{' + serialize_value_of_type(value, type.last) + '}'
  elsif value.kind_of?(Enumerable)
    value.map { |e| serialize_value_of_type(e, type) }.join(', ')
  else
    if type == :blob
      '0x' + value.unpack('H*').first
    else
      case value
      when String then "'#{value}'"
      when Cql::TimeUuid then value.to_s
      when NilClass then 'NULL'
      else
        value
      end
    end
  end
end