class Cassandra::Cluster::Connector

@private

Constants

NO_CONNECTIONS
UNCLAIMED_TIMEOUT

Public Class Methods

new(logger, io_reactor, cluster_registry, connection_options, execution_options) click to toggle source
   # File lib/cassandra/cluster/connector.rb
25 def initialize(logger,
26                io_reactor,
27                cluster_registry,
28                connection_options,
29                execution_options)
30   @logger             = logger
31   @reactor            = io_reactor
32   @registry           = cluster_registry
33   @connection_options = connection_options
34   @execution_options  = execution_options
35   @connections        = ::Hash.new
36   @open_connections   = ::Hash.new
37 
38   mon_initialize
39 end

Public Instance Methods

connect(host) click to toggle source
   # File lib/cassandra/cluster/connector.rb
41 def connect(host)
42   synchronize do
43     open_connections = @open_connections[host]
44     if open_connections
45       connection = open_connections.shift
46       @open_connections.delete(host) if open_connections.empty?
47       return Ione::Future.resolved(connection)
48     end
49   end
50 
51   f = do_connect(host)
52 
53   f.on_failure do |error|
54     connection_error(host, error)
55   end
56 
57   f.on_value do |connection|
58     connection.on_closed do |cause|
59       disconnected(host, cause)
60     end
61 
62     connected(host)
63   end
64 
65   f
66 end
refresh_status(host) click to toggle source
    # File lib/cassandra/cluster/connector.rb
 68 def refresh_status(host)
 69   if synchronize { @connections[host] }
 70     @registry.host_up(host.ip)
 71 
 72     return Future.resolved
 73   end
 74 
 75   @logger.debug("Checking if host #{host.ip} is up")
 76   f = do_connect(host)
 77 
 78   f.on_failure do |error|
 79     connection_error(host, error)
 80   end
 81 
 82   f.on_value do |connection|
 83     connection.on_closed do |cause|
 84       disconnected(host, cause)
 85     end
 86 
 87     synchronize do
 88       @open_connections[host] ||= []
 89       @open_connections[host] << connection
 90     end
 91 
 92     timer = @reactor.schedule_timer(UNCLAIMED_TIMEOUT)
 93     timer.on_value do
 94       close = false
 95 
 96       synchronize do
 97         open_connections = @open_connections[host]
 98         if open_connections
 99           close = !open_connections.delete(connection).nil?
100           @open_connections.delete(host) if open_connections.empty?
101         end
102       end
103 
104       connection.close if close
105     end
106 
107     connected(host)
108   end
109 
110   f
111 end

Private Instance Methods

cannot_authenticate_error() click to toggle source
    # File lib/cassandra/cluster/connector.rb
235 def cannot_authenticate_error
236   Errors::AuthenticationError.new(
237     'Server requested authentication, but client was not configured to ' \
238       'authenticate',
239     nil,
240     nil,
241     nil,
242     VOID_STATEMENT,
243     VOID_OPTIONS,
244     EMPTY_LIST,
245     :quorum,
246     0
247   )
248 end
challenge_response_cycle(connection, authenticator, token) click to toggle source
    # File lib/cassandra/cluster/connector.rb
278 def challenge_response_cycle(connection, authenticator, token)
279   connection.send_request(Protocol::AuthResponseRequest.new(token),
280                           @execution_options.timeout).flat_map do |r|
281     case r
282     when Protocol::AuthChallengeResponse
283       token = authenticator.challenge_response(r.token)
284       challenge_response_cycle(connection, authenticator, token)
285     when Protocol::AuthSuccessResponse
286       begin
287         authenticator.authentication_successful(r.token)
288       rescue
289         nil
290       end
291       ::Ione::Future.resolved(connection)
292     when Protocol::ErrorResponse
293       ::Ione::Future.failed(
294         r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :quorum, 0)
295       )
296     else
297       ::Ione::Future.failed(
298         Errors::InternalError.new("Unexpected response #{r.inspect}")
299       )
300     end
301   end
302 end
connected(host) click to toggle source
    # File lib/cassandra/cluster/connector.rb
304 def connected(host)
305   notify = false
306 
307   synchronize do
308     connections = @connections[host]
309 
310     if connections
311       @connections[host] = connections + 1
312     else
313       notify = true
314 
315       @connections[host] = 1
316     end
317   end
318 
319   @registry.host_up(host.ip) if notify
320 
321   self
322 end
connection_error(host, error) click to toggle source
    # File lib/cassandra/cluster/connector.rb
353 def connection_error(host, error)
354   notify = false
355 
356   synchronize do
357     notify = !error.nil? && !@connections.key?(host)
358   end
359 
360   @logger.debug("Host #{host.ip} refused connection (#{error.class.name}: " \
361     "#{error.message})")
362 
363   if notify
364     @logger.warn("Host #{host.ip} refused all connections")
365     @registry.host_down(host.ip)
366   end
367 
368   self
369 end
disconnected(host, error) click to toggle source
    # File lib/cassandra/cluster/connector.rb
324 def disconnected(host, error)
325   notify = false
326 
327   synchronize do
328     connections = @connections[host]
329 
330     return self unless connections
331 
332     connections -= 1
333 
334     if connections == 0
335       notify = !error.nil?
336       @connections.delete(host)
337     else
338       @connections[host] = connections
339     end
340   end
341 
342   @logger.debug("Host #{host.ip} closed connection (#{error.class.name}: " \
343     "#{error.message})") if error
344 
345   if notify
346     @logger.warn("Host #{host.ip} closed all connections")
347     @registry.host_down(host.ip)
348   end
349 
350   self
351 end
do_connect(host) click to toggle source
    # File lib/cassandra/cluster/connector.rb
118 def do_connect(host)
119   @reactor.connect(host.ip.to_s,
120                    @connection_options.port,
121                    timeout: @connection_options.connect_timeout,
122                    ssl: @connection_options.ssl) do |connection|
123     raise Errors::ClientError, 'Not connected, reactor stopped' unless connection
124 
125     connection.to_io.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY,
126                                 @connection_options.nodelay? ? 1 : 0)
127 
128     Protocol::CqlProtocolHandler.new(connection,
129                                      @reactor,
130                                      @connection_options.protocol_version,
131                                      @connection_options.compressor,
132                                      @connection_options.heartbeat_interval,
133                                      @connection_options.idle_timeout,
134                                      @connection_options.requests_per_connection,
135                                      @connection_options.custom_type_handlers)
136   end.flat_map do |connection|
137     # connection is a CqlProtocolHandler
138     f = request_options(connection)
139     f = f.flat_map do |options|
140       compression = @connection_options.compression
141       supported_algorithms = options['COMPRESSION']
142 
143       if compression && !supported_algorithms.include?(compression)
144         @logger.warn("Compression with #{compression.inspect} is not supported " \
145           "by host at #{host.ip}, supported algorithms are " \
146           "#{supported_algorithms.inspect}")
147         compression = nil
148       end
149 
150       supported_cql_versions = options['CQL_VERSION']
151       cql_version = (supported_cql_versions && !supported_cql_versions.empty?) ?
152           supported_cql_versions.first :
153           '3.1.0'
154 
155       startup_connection(host, connection, cql_version, compression)
156     end
157     f.fallback do |error|
158       case error
159       when Errors::ProtocolError
160         synchronize do
161           current_version = connection.protocol_version
162           if current_version > 1 && @connection_options.protocol_negotiable?
163             @logger.info("Host #{host.ip} doesn't support protocol version " \
164               "#{current_version}, downgrading")
165 
166             # This is tricky. We want to try with the next lower protocol version.
167             # However, the connection_options used for all connections may have
168             # already been updated due to other node connection failures. So,
169             # it may already have a lower protocol-version than our current-1. We
170             # don't want to accidentally raise it, so we update it to the min
171             # of itself and current-1.
172             @connection_options.protocol_version =
173               [@connection_options.protocol_version, current_version - 1].min
174             do_connect(host)
175           else
176             Ione::Future.failed(error)
177           end
178         end
179       when Errors::TimeoutError
180         future = Ione::CompletableFuture.new
181         connection.close(error).on_complete do |_|
182           future.fail(error)
183         end
184         future
185       else
186         Ione::Future.failed(error)
187       end
188     end
189   end.fallback do |error|
190     case error
191     when Error
192       Ione::Future.failed(error)
193     else
194       e = Errors::IOError.new(error.message)
195       e.set_backtrace(error.backtrace)
196       Ione::Future.failed(e)
197     end
198   end
199 end
request_options(connection) click to toggle source
    # File lib/cassandra/cluster/connector.rb
250 def request_options(connection)
251   connection.send_request(Protocol::OptionsRequest.new,
252                           @execution_options.timeout).map do |r|
253     case r
254     when Protocol::SupportedResponse
255       r.options
256     when Protocol::ErrorResponse
257       raise r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :quorum, 0)
258     else
259       raise Errors::InternalError, "Unexpected response #{r.inspect}"
260     end
261   end
262 end
send_credentials(connection, credentials) click to toggle source
    # File lib/cassandra/cluster/connector.rb
264 def send_credentials(connection, credentials)
265   connection.send_request(Protocol::CredentialsRequest.new(credentials),
266                           @execution_options.timeout).map do |r|
267     case r
268     when Protocol::ReadyResponse
269       connection
270     when Protocol::ErrorResponse
271       raise r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :quorum, 0)
272     else
273       raise Errors::InternalError, "Unexpected response #{r.inspect}"
274     end
275   end
276 end
startup_connection(host, connection, cql_version, compression) click to toggle source
    # File lib/cassandra/cluster/connector.rb
201 def startup_connection(host, connection, cql_version, compression)
202   connection.send_request(Protocol::StartupRequest.new(cql_version, compression),
203                           @execution_options.timeout).flat_map do |r|
204     case r
205     when Protocol::AuthenticateResponse
206       if @connection_options.protocol_version == 1
207         credentials = @connection_options.credentials
208         if credentials
209           send_credentials(connection, credentials)
210         else
211           Ione::Future.failed(cannot_authenticate_error)
212         end
213       else
214         authenticator = @connection_options.create_authenticator(r.authentication_class, host)
215         if authenticator
216           challenge_response_cycle(connection, authenticator, authenticator.initial_response)
217         else
218           Ione::Future.failed(cannot_authenticate_error)
219         end
220       end
221     when Protocol::ReadyResponse
222       ::Ione::Future.resolved(connection)
223     when Protocol::ErrorResponse
224       ::Ione::Future.failed(
225         r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :quorum, 0)
226       )
227     else
228       ::Ione::Future.failed(
229         Errors::InternalError.new("Unexpected response #{r.inspect}")
230       )
231     end
232   end
233 end