class Cequel::Metal::Keyspace
Handle to a Cassandra keyspace (database). Keyspace
objects are factories for DataSet
instances and provide a handle to a Schema::Keyspace
instance.
Attributes
@return [Hash] A hash of additional options passed to Cassandra, if any
@return [Symbol] The client compression option
@return [Hash] configuration options for this keyspace
@return [Hash] credentials for connect to cassandra
@return [Symbol] the default consistency for queries in this keyspace @since 1.1.0
@return [Object] The error policy object in use by this keyspace
@return [Array<String>] list of hosts to connect to
@return [String] name of the keyspace
@return Integer port to connect to Cassandra nodes on
@return [Hash] SSL Configuration options
Public Class Methods
@api private @param configuration [Options] @option (see configure
) @see Cequel.connect
# File lib/cequel/metal/keyspace.rb, line 95 def initialize(configuration={}) @lock = Monitor.new configure(configuration) end
Combine a statement with bind vars into a fully-fledged CQL query. This will no longer be needed once the CQL driver supports bound values natively.
@param statement [String] CQL statement with ? placeholders for bind
vars
@param bind_vars [Array] bind variables corresponding to ? in the
statement
@return [String] CQL statement with quoted values in place of bind
variables
# File lib/cequel/metal/keyspace.rb, line 78 def self.sanitize(statement, bind_vars) each_bind_var = bind_vars.each statement.gsub('?') { Type.quote(each_bind_var.next) } end
Public Instance Methods
@param table_name [Symbol] the name of the table @return [DataSet] data set encapsulating table
# File lib/cequel/metal/keyspace.rb, line 164 def [](table_name) DataSet.new(table_name.to_sym, self) end
return true if Cassandra server version is known to include bug CASSANDRA-8733
# File lib/cequel/metal/keyspace.rb, line 289 def bug8733_version? version_file = File.expand_path('../../../../.cassandra-versions', __FILE__) @all_versions ||= File.read(version_file).split("\n").map(&:strip) # bug exists in versions 0.3.0-2.0.12 and 2.1.0-2.1.2 @bug8733_versions ||= @all_versions[0..@all_versions.index('2.0.12')] + @all_versions[@all_versions.index('2.1.0')..@all_versions.index('2.1.2')] @bug8733_versions.include?(cassandra_version) end
@return [String] Cassandra version number
# File lib/cequel/metal/keyspace.rb, line 275 def cassandra_version return @cassandra_version if @cassandra_version statement = <<-CQL SELECT release_version FROM system.local CQL log('CQL', statement) do @cassandra_version = client_without_keyspace.execute(statement).first['release_version'] end end
Clears all active connections
@return [void]
# File lib/cequel/metal/keyspace.rb, line 246 def clear_active_connections! synchronize do if defined? @client remove_instance_variable(:@client) end if defined? @client_without_keyspace remove_instance_variable(:@client_without_keyspace) end if defined? @cluster @cluster.close remove_instance_variable(:@cluster) end end end
@return [Cassandra::Session] the low-level client session provided by the
adapter
@api private
# File lib/cequel/metal/keyspace.rb, line 173 def client synchronize do @client ||= cluster.connect(name) end end
# File lib/cequel/metal/keyspace.rb, line 300 def cluster synchronize do @cluster ||= Cassandra.cluster(client_options) end end
Configure this keyspace from a hash of options
@param configuration [Options] configuration options @option configuration [String] :host ('127.0.0.1') hostname of
single Cassandra instance to connect to
@option configuration [Integer] :port (9042) port on which to connect
to all specified hosts
@option configuration [Integer] :max_retries maximum number of retries
on connection failure
@option configuration [Array<String>] :hosts list of Cassandra
instances to connect to (hostnames only)
@option configuration [String] :username user to auth with (leave blank
for no auth)
@option configuration [String] :password password to auth with (leave
blank for no auth)
@option configuration [String] :keyspace name of keyspace to connect to @option configuration [Boolean] :ssl enable/disable ssl/tls support @option configuration [String] :server_cert path to ssl server
certificate
@option configuration [String] :client_cert path to ssl client
certificate
@option configuration [String] :private_key path to ssl client private
key
@option configuration [String] :passphrase the passphrase for client
private key
@option configuration [String] :cassandra_error_policy A mixin for
handling errors from Cassandra
@option configuration [Hash] :cassandra_options A hash of arbitrary
options to pass to Cassandra
@return [void]
# File lib/cequel/metal/keyspace.rb, line 132 def configure(configuration = {}) if configuration.key?(:thrift) warn "Cequel no longer uses the Thrift transport to communicate " \ "with Cassandra. The :thrift option is deprecated and ignored." end @configuration = configuration @error_policy = extract_cassandra_error_policy(configuration) @cassandra_options = extract_cassandra_options(configuration) @hosts, @port = extract_hosts_and_port(configuration) @credentials = extract_credentials(configuration) @ssl_config = extract_ssl_config(configuration) @name = configuration[:keyspace] @default_consistency = configuration[:default_consistency].try(:to_sym) @client_compression = configuration[:client_compression].try(:to_sym) # reset the connections clear_active_connections! end
@return [Symbol] the default consistency for queries in this keyspace @since 1.1.0
# File lib/cequel/metal/keyspace.rb, line 265 def default_consistency @default_consistency || :quorum end
Execute a CQL query in this keyspace
If a connection error occurs, will retry a maximum number of time (default 3) before re-raising the original connection error.
@param statement [String] CQL string @param bind_vars [Object] values for bind variables @return [Enumerable] the results of the query
@see execute_with_options
# File lib/cequel/metal/keyspace.rb, line 192 def execute(statement, *bind_vars) execute_with_options(Statement.new(statement, bind_vars), { consistency: default_consistency }) end
Execute a CQL query in this keyspace with the given options
@param statement [String,Statement,Batch] statement to execute @param options [Options] options for statement execution @return [Enumerable] the results of the query
@since 1.1.0
# File lib/cequel/metal/keyspace.rb, line 205 def execute_with_options(statement, options={}) options[:consistency] ||= default_consistency cql, options = *case statement when Statement [prepare_statement(statement), {arguments: statement.bind_vars}.merge(options)] when Cassandra::Statements::Batch [statement, options] end log('CQL', statement) do error_policy.execute_stmt(self) do client.execute(cql, options) end end end
@return [Boolean] true if the keyspace exists
# File lib/cequel/metal/keyspace.rb, line 270 def exists? cluster.has_keyspace?(name) end
Wraps the prepare statement in the default retry strategy
@param statement [String,Statement] statement to prepare @return [Cassandra::Statement::Prepared] the prepared statement
# File lib/cequel/metal/keyspace.rb, line 229 def prepare_statement(statement) cql = case statement when Statement statement.cql else statement end error_policy.execute_stmt(self) do client.prepare(cql) end end
@return [Schema::Keyspace] schema object providing full read/write
access to database schema
# File lib/cequel/metal/keyspace.rb, line 156 def schema Schema::Keyspace.new(self) end
Private Instance Methods
# File lib/cequel/metal/keyspace.rb, line 331 def batch_manager synchronize { @batch_manager ||= BatchManager.new(self) } end
# File lib/cequel/metal/keyspace.rb, line 322 def client_options {hosts: hosts, port: port}.tap do |options| options.merge!(credentials) if credentials options.merge!(ssl_config) if ssl_config options.merge!(compression: client_compression) if client_compression options.merge!(cassandra_options) if cassandra_options end end
# File lib/cequel/metal/keyspace.rb, line 316 def client_without_keyspace synchronize do @client_without_keyspace ||= cluster.connect end end
# File lib/cequel/metal/keyspace.rb, line 378 def extract_cassandra_error_policy(configuration) value = configuration.fetch(:cassandra_error_policy, ::Cequel::Metal::Policy::CassandraError::ClearAndRetryPolicy) # Accept a class name as a string, create an instance of it if value.is_a?(String) value.constantize.new(configuration) # Accept a class, instantiate it elsif value.is_a?(Class) value.new(configuration) # Accept a value, assume it is a ready to use policy object else value end end
# File lib/cequel/metal/keyspace.rb, line 392 def extract_cassandra_options(configuration) configuration[:cassandra_options] end
# File lib/cequel/metal/keyspace.rb, line 363 def extract_credentials(configuration) configuration.slice(:username, :password).presence end
# File lib/cequel/metal/keyspace.rb, line 339 def extract_hosts_and_port(configuration) hosts, ports = [], Set[] ports << Integer(configuration[:port]) if configuration.key?(:port) host_or_hosts = configuration.fetch(:host, configuration.fetch(:hosts, '127.0.0.1')) Array.wrap(host_or_hosts).each do |host_port| host, port = host_port.split(':') hosts << host if port warn "Specifying a hostname as host:port is deprecated. Specify " \ "only the host IP or hostname in :hosts, and specify a " \ "port for all nodes using the :port option." ports << Integer(port) end end if ports.size > 1 fail ArgumentError, "All Cassandra nodes must listen on the same " \ "port; specified multiple ports #{ports.join(', ')}" end [hosts, ports.first || 9042] end
# File lib/cequel/metal/keyspace.rb, line 367 def extract_ssl_config(configuration) ssl_config = {} ssl_config[:ssl] = configuration.fetch(:ssl, nil) ssl_config[:server_cert] = configuration.fetch(:server_cert, nil) ssl_config[:client_cert] = configuration.fetch(:client_cert, nil) ssl_config[:private_key] = configuration.fetch(:private_key, nil) ssl_config[:passphrase] = configuration.fetch(:passphrase, nil) ssl_config.each { |key, value| ssl_config.delete(key) unless value } ssl_config end
# File lib/cequel/metal/keyspace.rb, line 335 def write_target current_batch || self end