class Cassandra::Cluster

Cluster represents a cassandra cluster. It serves as a {Cassandra::Session session factory} factory and a collection of metadata.

@see Cassandra::Cluster#connect Creating a new session @see Cassandra::Cluster#each_host Getting all peers in the cluster @see Cassandra::Cluster#each_keyspace Getting all existing keyspaces

Public Class Methods

new(logger, io_reactor, executor, control_connection, cluster_registry, cluster_schema, cluster_metadata, execution_options, connection_options, profile_manager, reconnection_policy, address_resolution_policy, connector, futures_factory, timestamp_generator) click to toggle source

@private

   # File lib/cassandra/cluster.rb
30 def initialize(logger,
31                io_reactor,
32                executor,
33                control_connection,
34                cluster_registry,
35                cluster_schema,
36                cluster_metadata,
37                execution_options,
38                connection_options,
39                profile_manager,
40                reconnection_policy,
41                address_resolution_policy,
42                connector,
43                futures_factory,
44                timestamp_generator)
45   @logger                = logger
46   @io_reactor            = io_reactor
47   @executor              = executor
48   @control_connection    = control_connection
49   @registry              = cluster_registry
50   @schema                = cluster_schema
51   @metadata              = cluster_metadata
52   @execution_options     = execution_options
53   @connection_options    = connection_options
54   @profile_manager       = profile_manager
55   @reconnection_policy   = reconnection_policy
56   @address_resolver      = address_resolution_policy
57   @connector             = connector
58   @futures               = futures_factory
59   @timestamp_generator   = timestamp_generator
60 
61   @control_connection.on_close do |_cause|
62     begin
63       @profile_manager.teardown(self)
64     rescue
65       nil
66     end
67   end
68 end

Public Instance Methods

close() click to toggle source

Synchronously closes all sessions managed by this cluster

@return [self] this cluster

@see Cassandra::Cluster#close_async

    # File lib/cassandra/cluster.rb
306 def close
307   close_async.get
308 end
close_async() click to toggle source

Asynchronously closes all sessions managed by this cluster

@return [Cassandra::Future<Cassandra::Cluster>] a future that resolves to

self once closed
    # File lib/cassandra/cluster.rb
285 def close_async
286   promise = @futures.promise
287 
288   @control_connection.close_async.on_complete do |f|
289     if f.resolved?
290       promise.fulfill(self)
291     else
292       f.on_failure {|e| promise.break(e)}
293     end
294 
295     @executor.shutdown
296   end
297 
298   promise.future
299 end
connect(keyspace = nil) click to toggle source

Synchronously create a new session, optionally scoped to a keyspace

@param keyspace [String] optional keyspace to scope the session to

@raise [ArgumentError] if keyspace is not a String @raise [Cassandra::Errors::NoHostsAvailable] when all hosts failed @raise [Cassandra::Errors::AuthenticationError] when authentication fails @raise [Cassandra::Errors::ProtocolError] when protocol negotiation fails @raise [Cassandra::Error] other unexpected errors

@return [Cassandra::Session] a new session that can prepare and execute

statements

@see Cassandra::Cluster#connect_async

    # File lib/cassandra/cluster.rb
277 def connect(keyspace = nil)
278   connect_async(keyspace).get
279 end
connect_async(keyspace = nil) click to toggle source

Asynchronously create a new session, optionally scoped to a keyspace

@param keyspace [String] optional keyspace to scope session to

@return [Cassandra::Future<Cassandra::Session>] a future new session that

can prepare and execute statements

@see Cassandra::Cluster#connect A list of possible exceptions that this

future can be resolved with
    # File lib/cassandra/cluster.rb
226 def connect_async(keyspace = nil)
227   if !keyspace.nil? && !keyspace.is_a?(::String)
228     return @futures.error(::ArgumentError.new("keyspace must be a string, #{keyspace.inspect} given"))
229   end
230 
231   client  = Client.new(@logger,
232                        @registry,
233                        @schema,
234                        @io_reactor,
235                        @connector,
236                        @profile_manager,
237                        @reconnection_policy,
238                        @address_resolver,
239                        @connection_options,
240                        @futures,
241                        @timestamp_generator)
242   session = Session.new(client, @execution_options, @futures, @profile_manager)
243   promise = @futures.promise
244 
245   client.connect.on_complete do |f|
246     if f.resolved?
247       if keyspace
248         f = session.execute_async("USE #{Util.escape_name(keyspace)}")
249 
250         f.on_success {promise.fulfill(session)}
251         f.on_failure {|e| promise.break(e)}
252       else
253         promise.fulfill(session)
254       end
255     else
256       f.on_failure {|e| promise.break(e)}
257     end
258   end
259 
260   promise.future
261 end
each_execution_profile(&block) click to toggle source

Yield or enumerate each execution profile defined in this cluster @overload each_execution_profile

@yieldparam name [String, Symbol] name of current profile
@yieldparam profile [Cassandra::Execution::Profile] current profile
@return [Cassandra::Cluster] self

@overload each_execution_profile

@return [Hash<String, Cassandra::Execution::Profile>] a hash of profiles keyed on name
    # File lib/cassandra/cluster.rb
177 def each_execution_profile(&block)
178   if block_given?
179     @profile_manager.profiles.each_pair(&block)
180     self
181   else
182     # Return a dup of the hash to prevent the user from adding/removing profiles from the profile-manager.
183     @profile_manager.profiles.dup
184   end
185 end
Also aliased as: execution_profiles
each_host(&block) click to toggle source

Yield or enumerate each member of this cluster @overload each_host

@yieldparam host [Cassandra::Host] current host
@return [Cassandra::Cluster] self

@overload each_host

@return [Array<Cassandra::Host>] a list of hosts
    # File lib/cassandra/cluster.rb
112 def each_host(&block)
113   r = @registry.each_host(&block)
114   return self if r == @registry
115   r
116 end
Also aliased as: hosts
each_keyspace(&block) click to toggle source

Yield or enumerate each keyspace defined in this cluster @overload each_keyspace

@yieldparam keyspace [Cassandra::Keyspace] current keyspace
@return [Cassandra::Cluster] self

@overload each_keyspace

@return [Array<Cassandra::Keyspace>] a list of keyspaces
    # File lib/cassandra/cluster.rb
136 def each_keyspace(&block)
137   r = @schema.each_keyspace(&block)
138   return self if r == @schema
139   r
140 end
Also aliased as: keyspaces
execution_profile(name) click to toggle source

@param name [String] Name of profile to retrieve @return [Cassandra::Execution::Profile] execution profile of the given name

    # File lib/cassandra/cluster.rb
166 def execution_profile(name)
167   @profile_manager.profiles[name]
168 end
execution_profiles(&block)
hosts(&block)
Alias for: each_host
inspect() click to toggle source

@private

    # File lib/cassandra/cluster.rb
311 def inspect
312   "#<#{self.class.name}:0x#{object_id.to_s(16)} " \
313   "name=#{name.inspect}, " \
314   "port=#{@connection_options.port}, " \
315   "protocol_version=#{@connection_options.protocol_version}, " \
316   "execution_profiles=#{@profile_manager.profiles.inspect}, " \
317   "hosts=#{hosts.inspect}, " \
318   "keyspaces=#{keyspaces.inspect}>"
319 end
keyspaces(&block)
Alias for: each_keyspace
port() click to toggle source

@return [Integer] Cassandra native protocol port

    # File lib/cassandra/cluster.rb
155 def port
156   @connection_options.port
157 end
protocol_version() click to toggle source

@return [Integer] the version of the native protocol used in communication with nodes

    # File lib/cassandra/cluster.rb
160 def protocol_version
161   @connection_options.protocol_version
162 end
refresh_schema() click to toggle source

Synchronously refresh schema metadata

@return [nil] nothing @raise [Cassandra::Errors::ClientError] when cluster is disconnected @raise [Cassandra::Error] other unexpected errors

@see Cassandra::Cluster#refresh_schema_async

    # File lib/cassandra/cluster.rb
213 def refresh_schema
214   refresh_schema_async.get
215 end
refresh_schema_async() click to toggle source

@!method refresh_schema_async

Trigger an asynchronous schema metadata refresh
@return [Cassandra::Future<nil>] a future that will be fulfilled when
  schema metadata has been refreshed
    # File lib/cassandra/cluster.rb
192 def refresh_schema_async
193   promise = @futures.promise
194   @control_connection.send(:refresh_maybe_retry, :schema).on_complete do |f|
195     if f.resolved?
196       promise.fulfill(nil)
197     else
198       f.on_failure do |e|
199         promise.break(e)
200       end
201     end
202   end
203   promise.future
204 end
register(listener) click to toggle source

Register a cluster state listener. State listener will start receiving notifications about topology and schema changes

@param listener [Cassandra::Listener] cluster state listener @return [self]

   # File lib/cassandra/cluster.rb
89 def register(listener)
90   @registry.add_listener(listener)
91   @schema.add_listener(listener)
92   self
93 end
unregister(listener) click to toggle source

Unregister a cluster state listener. State listener will stop receiving notifications about topology and schema changes

@param listener [Cassandra::Listener] cluster state listener @return [self]

    # File lib/cassandra/cluster.rb
100 def unregister(listener)
101   @registry.remove_listener(listener)
102   @schema.remove_listener(listener)
103   self
104 end