class Cassandra::Cluster
Cluster
represents a cassandra cluster. It serves as a {Cassandra::Session session factory} factory and a collection of metadata.
@see Cassandra::Cluster#connect
Creating a new session @see Cassandra::Cluster#each_host
Getting all peers in the cluster @see Cassandra::Cluster#each_keyspace
Getting all existing keyspaces
Public Class Methods
@private
# File lib/cassandra/cluster.rb 30 def initialize(logger, 31 io_reactor, 32 executor, 33 control_connection, 34 cluster_registry, 35 cluster_schema, 36 cluster_metadata, 37 execution_options, 38 connection_options, 39 profile_manager, 40 reconnection_policy, 41 address_resolution_policy, 42 connector, 43 futures_factory, 44 timestamp_generator) 45 @logger = logger 46 @io_reactor = io_reactor 47 @executor = executor 48 @control_connection = control_connection 49 @registry = cluster_registry 50 @schema = cluster_schema 51 @metadata = cluster_metadata 52 @execution_options = execution_options 53 @connection_options = connection_options 54 @profile_manager = profile_manager 55 @reconnection_policy = reconnection_policy 56 @address_resolver = address_resolution_policy 57 @connector = connector 58 @futures = futures_factory 59 @timestamp_generator = timestamp_generator 60 61 @control_connection.on_close do |_cause| 62 begin 63 @profile_manager.teardown(self) 64 rescue 65 nil 66 end 67 end 68 end
Public Instance Methods
Synchronously closes all sessions managed by this cluster
@return [self] this cluster
@see Cassandra::Cluster#close_async
# File lib/cassandra/cluster.rb 306 def close 307 close_async.get 308 end
Asynchronously closes all sessions managed by this cluster
@return [Cassandra::Future<Cassandra::Cluster>] a future that resolves to
self once closed
# File lib/cassandra/cluster.rb 285 def close_async 286 promise = @futures.promise 287 288 @control_connection.close_async.on_complete do |f| 289 if f.resolved? 290 promise.fulfill(self) 291 else 292 f.on_failure {|e| promise.break(e)} 293 end 294 295 @executor.shutdown 296 end 297 298 promise.future 299 end
Synchronously create a new session, optionally scoped to a keyspace
@param keyspace [String] optional keyspace to scope the session to
@raise [ArgumentError] if keyspace is not a String @raise [Cassandra::Errors::NoHostsAvailable] when all hosts failed @raise [Cassandra::Errors::AuthenticationError] when authentication fails @raise [Cassandra::Errors::ProtocolError] when protocol negotiation fails @raise [Cassandra::Error] other unexpected errors
@return [Cassandra::Session] a new session that can prepare and execute
statements
@see Cassandra::Cluster#connect_async
# File lib/cassandra/cluster.rb 277 def connect(keyspace = nil) 278 connect_async(keyspace).get 279 end
Asynchronously create a new session, optionally scoped to a keyspace
@param keyspace [String] optional keyspace to scope session to
@return [Cassandra::Future<Cassandra::Session>] a future new session that
can prepare and execute statements
@see Cassandra::Cluster#connect
A list of possible exceptions that this
future can be resolved with
# File lib/cassandra/cluster.rb 226 def connect_async(keyspace = nil) 227 if !keyspace.nil? && !keyspace.is_a?(::String) 228 return @futures.error(::ArgumentError.new("keyspace must be a string, #{keyspace.inspect} given")) 229 end 230 231 client = Client.new(@logger, 232 @registry, 233 @schema, 234 @io_reactor, 235 @connector, 236 @profile_manager, 237 @reconnection_policy, 238 @address_resolver, 239 @connection_options, 240 @futures, 241 @timestamp_generator) 242 session = Session.new(client, @execution_options, @futures, @profile_manager) 243 promise = @futures.promise 244 245 client.connect.on_complete do |f| 246 if f.resolved? 247 if keyspace 248 f = session.execute_async("USE #{Util.escape_name(keyspace)}") 249 250 f.on_success {promise.fulfill(session)} 251 f.on_failure {|e| promise.break(e)} 252 else 253 promise.fulfill(session) 254 end 255 else 256 f.on_failure {|e| promise.break(e)} 257 end 258 end 259 260 promise.future 261 end
Yield or enumerate each execution profile defined in this cluster @overload each_execution_profile
@yieldparam name [String, Symbol] name of current profile @yieldparam profile [Cassandra::Execution::Profile] current profile @return [Cassandra::Cluster] self
@overload each_execution_profile
@return [Hash<String, Cassandra::Execution::Profile>] a hash of profiles keyed on name
# File lib/cassandra/cluster.rb 177 def each_execution_profile(&block) 178 if block_given? 179 @profile_manager.profiles.each_pair(&block) 180 self 181 else 182 # Return a dup of the hash to prevent the user from adding/removing profiles from the profile-manager. 183 @profile_manager.profiles.dup 184 end 185 end
Yield or enumerate each member of this cluster @overload each_host
@yieldparam host [Cassandra::Host] current host @return [Cassandra::Cluster] self
@overload each_host
@return [Array<Cassandra::Host>] a list of hosts
# File lib/cassandra/cluster.rb 112 def each_host(&block) 113 r = @registry.each_host(&block) 114 return self if r == @registry 115 r 116 end
Yield or enumerate each keyspace defined in this cluster @overload each_keyspace
@yieldparam keyspace [Cassandra::Keyspace] current keyspace @return [Cassandra::Cluster] self
@overload each_keyspace
@return [Array<Cassandra::Keyspace>] a list of keyspaces
# File lib/cassandra/cluster.rb 136 def each_keyspace(&block) 137 r = @schema.each_keyspace(&block) 138 return self if r == @schema 139 r 140 end
@param name [String] Name of profile to retrieve @return [Cassandra::Execution::Profile] execution profile of the given name
# File lib/cassandra/cluster.rb 166 def execution_profile(name) 167 @profile_manager.profiles[name] 168 end
@private
# File lib/cassandra/cluster.rb 311 def inspect 312 "#<#{self.class.name}:0x#{object_id.to_s(16)} " \ 313 "name=#{name.inspect}, " \ 314 "port=#{@connection_options.port}, " \ 315 "protocol_version=#{@connection_options.protocol_version}, " \ 316 "execution_profiles=#{@profile_manager.profiles.inspect}, " \ 317 "hosts=#{hosts.inspect}, " \ 318 "keyspaces=#{keyspaces.inspect}>" 319 end
@return [Integer] Cassandra
native protocol port
# File lib/cassandra/cluster.rb 155 def port 156 @connection_options.port 157 end
@return [Integer] the version of the native protocol used in communication with nodes
# File lib/cassandra/cluster.rb 160 def protocol_version 161 @connection_options.protocol_version 162 end
Synchronously refresh schema metadata
@return [nil] nothing @raise [Cassandra::Errors::ClientError] when cluster is disconnected @raise [Cassandra::Error] other unexpected errors
@see Cassandra::Cluster#refresh_schema_async
# File lib/cassandra/cluster.rb 213 def refresh_schema 214 refresh_schema_async.get 215 end
@!method refresh_schema_async
Trigger an asynchronous schema metadata refresh @return [Cassandra::Future<nil>] a future that will be fulfilled when schema metadata has been refreshed
# File lib/cassandra/cluster.rb 192 def refresh_schema_async 193 promise = @futures.promise 194 @control_connection.send(:refresh_maybe_retry, :schema).on_complete do |f| 195 if f.resolved? 196 promise.fulfill(nil) 197 else 198 f.on_failure do |e| 199 promise.break(e) 200 end 201 end 202 end 203 promise.future 204 end
Register a cluster state listener. State listener will start receiving notifications about topology and schema changes
@param listener [Cassandra::Listener] cluster state listener @return [self]
# File lib/cassandra/cluster.rb 89 def register(listener) 90 @registry.add_listener(listener) 91 @schema.add_listener(listener) 92 self 93 end
Unregister a cluster state listener. State listener will stop receiving notifications about topology and schema changes
@param listener [Cassandra::Listener] cluster state listener @return [self]
# File lib/cassandra/cluster.rb 100 def unregister(listener) 101 @registry.remove_listener(listener) 102 @schema.remove_listener(listener) 103 self 104 end