class NoSE::Backend::CassandraBackend
A backend which communicates with Cassandra via CQL
Public Class Methods
NoSE::Backend::Backend::new
# File lib/nose/backend/cassandra.rb, line 12 def initialize(model, indexes, plans, update_plans, config) super @hosts = config[:hosts] @port = config[:port] @keyspace = config[:keyspace] @generator = Cassandra::Uuid::Generator.new end
Public Instance Methods
Check if a given index exists in the target database
# File lib/nose/backend/cassandra.rb, line 85 def drop_index(index) client.execute "DROP TABLE \"#{index.key}\"" end
Generate a random UUID
# File lib/nose/backend/cassandra.rb, line 22 def generate_id @generator.uuid end
Check if the given index is empty
# File lib/nose/backend/cassandra.rb, line 73 def index_empty?(index) query = "SELECT COUNT(*) FROM \"#{index.key}\" LIMIT 1" client.execute(query).first.values.first.zero? end
Check if a given index exists in the target database
# File lib/nose/backend/cassandra.rb, line 79 def index_exists?(index) client @cluster.keyspace(@keyspace).has_table? index.key end
Insert
a chunk of rows into an index @return [Array<Array<Cassandra::Uuid>>]
# File lib/nose/backend/cassandra.rb, line 51 def index_insert_chunk(index, chunk) fields = index.all_fields.to_a prepared = "INSERT INTO \"#{index.key}\" (" \ "#{field_names fields}" \ ") VALUES (#{(['?'] * fields.length).join ', '})" prepared = client.prepare prepared ids = [] client.execute(client.batch do |batch| chunk.each do |row| index_row = index_row(row, fields) ids << (index.hash_fields.to_a + index.order_fields).map do |field| index_row[fields.index field] end batch.add prepared, arguments: index_row end end) ids end
Sample a number of values from the given index
# File lib/nose/backend/cassandra.rb, line 90 def index_sample(index, count) field_list = index.all_fields.map { |f| "\"#{f.id}\"" } query = "SELECT #{field_list.join ', '} " \ "FROM \"#{index.key}\" LIMIT #{count}" rows = client.execute(query).rows # XXX Ignore null values for now # fail if rows.any? { |row| row.values.any?(&:nil?) } rows end
Produce the DDL necessary for column families for the given indexes and optionally execute them against the server
# File lib/nose/backend/cassandra.rb, line 28 def indexes_ddl(execute = false, skip_existing = false, drop_existing = false) Enumerator.new do |enum| @indexes.map do |index| ddl = index_cql index enum.yield ddl begin drop_index(index) if drop_existing && index_exists?(index) client.execute(ddl) if execute rescue Cassandra::Errors::AlreadyExistsError => exc next if skip_existing new_exc = IndexAlreadyExists.new exc.message new_exc.set_backtrace exc.backtrace raise new_exc end end end end
Private Instance Methods
Return the datatype to use in Cassandra for a given field @return [Symbol]
# File lib/nose/backend/cassandra.rb, line 160 def cassandra_type(field_class) case [field_class] when [Fields::IntegerField] :int when [Fields::FloatField] :float when [Fields::StringField] :text when [Fields::DateField] :timestamp when [Fields::IDField], [Fields::ForeignKeyField] :uuid end end
Get a Cassandra client, connecting if not done already
# File lib/nose/backend/cassandra.rb, line 151 def client return @client unless @client.nil? @cluster = Cassandra.cluster hosts: @hosts, port: @port, timeout: nil @client = @cluster.connect @keyspace end
Get a comma-separated list of field names with optional types @return [String]
# File lib/nose/backend/cassandra.rb, line 142 def field_names(fields, types = false) fields.map do |field| name = "\"#{field.id}\"" name += ' ' + cassandra_type(field.class).to_s if types name end.join ', ' end
Produce the CQL to create the definition for a given index @return [String]
# File lib/nose/backend/cassandra.rb, line 128 def index_cql(index) ddl = "CREATE COLUMNFAMILY \"#{index.key}\" (" \ "#{field_names index.all_fields, true}, " \ "PRIMARY KEY((#{field_names index.hash_fields})" cluster_key = index.order_fields ddl += ", #{field_names cluster_key}" unless cluster_key.empty? ddl += '));' ddl end
Produce an array of fields in the correct order for a CQL insert @return [Array]
# File lib/nose/backend/cassandra.rb, line 106 def index_row(row, fields) fields.map do |field| value = row[field.id] if field.is_a?(Fields::IDField) value = case value when Numeric Cassandra::Uuid.new value.to_i when String Cassandra::Uuid.new value when nil Cassandra::Uuid::Generator.new.uuid else value end end value end end