class Cassandra::Cluster::Connector
@private
Constants
- NO_CONNECTIONS
- UNCLAIMED_TIMEOUT
Public Class Methods
new(logger, io_reactor, cluster_registry, connection_options, execution_options)
click to toggle source
# File lib/cassandra/cluster/connector.rb 25 def initialize(logger, 26 io_reactor, 27 cluster_registry, 28 connection_options, 29 execution_options) 30 @logger = logger 31 @reactor = io_reactor 32 @registry = cluster_registry 33 @connection_options = connection_options 34 @execution_options = execution_options 35 @connections = ::Hash.new 36 @open_connections = ::Hash.new 37 38 mon_initialize 39 end
Public Instance Methods
connect(host)
click to toggle source
# File lib/cassandra/cluster/connector.rb 41 def connect(host) 42 synchronize do 43 open_connections = @open_connections[host] 44 if open_connections 45 connection = open_connections.shift 46 @open_connections.delete(host) if open_connections.empty? 47 return Ione::Future.resolved(connection) 48 end 49 end 50 51 f = do_connect(host) 52 53 f.on_failure do |error| 54 connection_error(host, error) 55 end 56 57 f.on_value do |connection| 58 connection.on_closed do |cause| 59 disconnected(host, cause) 60 end 61 62 connected(host) 63 end 64 65 f 66 end
refresh_status(host)
click to toggle source
# File lib/cassandra/cluster/connector.rb 68 def refresh_status(host) 69 if synchronize { @connections[host] } 70 @registry.host_up(host.ip) 71 72 return Future.resolved 73 end 74 75 @logger.debug("Checking if host #{host.ip} is up") 76 f = do_connect(host) 77 78 f.on_failure do |error| 79 connection_error(host, error) 80 end 81 82 f.on_value do |connection| 83 connection.on_closed do |cause| 84 disconnected(host, cause) 85 end 86 87 synchronize do 88 @open_connections[host] ||= [] 89 @open_connections[host] << connection 90 end 91 92 timer = @reactor.schedule_timer(UNCLAIMED_TIMEOUT) 93 timer.on_value do 94 close = false 95 96 synchronize do 97 open_connections = @open_connections[host] 98 if open_connections 99 close = !open_connections.delete(connection).nil? 100 @open_connections.delete(host) if open_connections.empty? 101 end 102 end 103 104 connection.close if close 105 end 106 107 connected(host) 108 end 109 110 f 111 end
Private Instance Methods
cannot_authenticate_error()
click to toggle source
# File lib/cassandra/cluster/connector.rb 235 def cannot_authenticate_error 236 Errors::AuthenticationError.new( 237 'Server requested authentication, but client was not configured to ' \ 238 'authenticate', 239 nil, 240 nil, 241 nil, 242 VOID_STATEMENT, 243 VOID_OPTIONS, 244 EMPTY_LIST, 245 :one, 246 0 247 ) 248 end
challenge_response_cycle(connection, authenticator, token)
click to toggle source
# File lib/cassandra/cluster/connector.rb 278 def challenge_response_cycle(connection, authenticator, token) 279 connection.send_request(Protocol::AuthResponseRequest.new(token), 280 @execution_options.timeout).flat_map do |r| 281 case r 282 when Protocol::AuthChallengeResponse 283 token = authenticator.challenge_response(r.token) 284 challenge_response_cycle(connection, authenticator, token) 285 when Protocol::AuthSuccessResponse 286 begin 287 authenticator.authentication_successful(r.token) 288 rescue 289 nil 290 end 291 ::Ione::Future.resolved(connection) 292 when Protocol::ErrorResponse 293 ::Ione::Future.failed( 294 r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :one, 0) 295 ) 296 else 297 ::Ione::Future.failed( 298 Errors::InternalError.new("Unexpected response #{r.inspect}") 299 ) 300 end 301 end 302 end
connected(host)
click to toggle source
# File lib/cassandra/cluster/connector.rb 304 def connected(host) 305 notify = false 306 307 synchronize do 308 connections = @connections[host] 309 310 if connections 311 @connections[host] = connections + 1 312 else 313 notify = true 314 315 @connections[host] = 1 316 end 317 end 318 319 @registry.host_up(host.ip) if notify 320 321 self 322 end
connection_error(host, error)
click to toggle source
# File lib/cassandra/cluster/connector.rb 353 def connection_error(host, error) 354 notify = false 355 356 synchronize do 357 notify = !error.nil? && !@connections.key?(host) 358 end 359 360 @logger.debug("Host #{host.ip} refused connection (#{error.class.name}: " \ 361 "#{error.message})") 362 363 if notify 364 @logger.warn("Host #{host.ip} refused all connections") 365 @registry.host_down(host.ip) 366 end 367 368 self 369 end
disconnected(host, error)
click to toggle source
# File lib/cassandra/cluster/connector.rb 324 def disconnected(host, error) 325 notify = false 326 327 synchronize do 328 connections = @connections[host] 329 330 return self unless connections 331 332 connections -= 1 333 334 if connections == 0 335 notify = !error.nil? 336 @connections.delete(host) 337 else 338 @connections[host] = connections 339 end 340 end 341 342 @logger.debug("Host #{host.ip} closed connection (#{error.class.name}: " \ 343 "#{error.message})") if error 344 345 if notify 346 @logger.warn("Host #{host.ip} closed all connections") 347 @registry.host_down(host.ip) 348 end 349 350 self 351 end
do_connect(host)
click to toggle source
# File lib/cassandra/cluster/connector.rb 118 def do_connect(host) 119 @reactor.connect(host.ip.to_s, 120 @connection_options.port, 121 timeout: @connection_options.connect_timeout, 122 ssl: @connection_options.ssl) do |connection| 123 raise Errors::ClientError, 'Not connected, reactor stopped' unless connection 124 125 connection.to_io.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 126 @connection_options.nodelay? ? 1 : 0) 127 128 Protocol::CqlProtocolHandler.new(connection, 129 @reactor, 130 @connection_options.protocol_version, 131 @connection_options.compressor, 132 @connection_options.heartbeat_interval, 133 @connection_options.idle_timeout, 134 @connection_options.requests_per_connection, 135 @connection_options.custom_type_handlers) 136 end.flat_map do |connection| 137 # connection is a CqlProtocolHandler 138 f = request_options(connection) 139 f = f.flat_map do |options| 140 compression = @connection_options.compression 141 supported_algorithms = options['COMPRESSION'] 142 143 if compression && !supported_algorithms.include?(compression) 144 @logger.warn("Compression with #{compression.inspect} is not supported " \ 145 "by host at #{host.ip}, supported algorithms are " \ 146 "#{supported_algorithms.inspect}") 147 compression = nil 148 end 149 150 supported_cql_versions = options['CQL_VERSION'] 151 cql_version = (supported_cql_versions && !supported_cql_versions.empty?) ? 152 supported_cql_versions.first : 153 '3.1.0' 154 155 startup_connection(host, connection, cql_version, compression) 156 end 157 f.fallback do |error| 158 case error 159 when Errors::ProtocolError 160 synchronize do 161 current_version = connection.protocol_version 162 if current_version > 1 && @connection_options.protocol_negotiable? 163 @logger.info("Host #{host.ip} doesn't support protocol version " \ 164 "#{current_version}, downgrading") 165 166 # This is tricky. We want to try with the next lower protocol version. 167 # However, the connection_options used for all connections may have 168 # already been updated due to other node connection failures. So, 169 # it may already have a lower protocol-version than our current-1. We 170 # don't want to accidentally raise it, so we update it to the min 171 # of itself and current-1. 172 @connection_options.protocol_version = 173 [@connection_options.protocol_version, current_version - 1].min 174 do_connect(host) 175 else 176 Ione::Future.failed(error) 177 end 178 end 179 when Errors::TimeoutError 180 future = Ione::CompletableFuture.new 181 connection.close(error).on_complete do |_| 182 future.fail(error) 183 end 184 future 185 else 186 Ione::Future.failed(error) 187 end 188 end 189 end.fallback do |error| 190 case error 191 when Error 192 Ione::Future.failed(error) 193 else 194 e = Errors::IOError.new(error.message) 195 e.set_backtrace(error.backtrace) 196 Ione::Future.failed(e) 197 end 198 end 199 end
request_options(connection)
click to toggle source
# File lib/cassandra/cluster/connector.rb 250 def request_options(connection) 251 connection.send_request(Protocol::OptionsRequest.new, 252 @execution_options.timeout).map do |r| 253 case r 254 when Protocol::SupportedResponse 255 r.options 256 when Protocol::ErrorResponse 257 raise r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :one, 0) 258 else 259 raise Errors::InternalError, "Unexpected response #{r.inspect}" 260 end 261 end 262 end
send_credentials(connection, credentials)
click to toggle source
# File lib/cassandra/cluster/connector.rb 264 def send_credentials(connection, credentials) 265 connection.send_request(Protocol::CredentialsRequest.new(credentials), 266 @execution_options.timeout).map do |r| 267 case r 268 when Protocol::ReadyResponse 269 connection 270 when Protocol::ErrorResponse 271 raise r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :one, 0) 272 else 273 raise Errors::InternalError, "Unexpected response #{r.inspect}" 274 end 275 end 276 end
startup_connection(host, connection, cql_version, compression)
click to toggle source
# File lib/cassandra/cluster/connector.rb 201 def startup_connection(host, connection, cql_version, compression) 202 connection.send_request(Protocol::StartupRequest.new(cql_version, compression), 203 @execution_options.timeout).flat_map do |r| 204 case r 205 when Protocol::AuthenticateResponse 206 if @connection_options.protocol_version == 1 207 credentials = @connection_options.credentials 208 if credentials 209 send_credentials(connection, credentials) 210 else 211 Ione::Future.failed(cannot_authenticate_error) 212 end 213 else 214 authenticator = @connection_options.create_authenticator(r.authentication_class, host) 215 if authenticator 216 challenge_response_cycle(connection, authenticator, authenticator.initial_response) 217 else 218 Ione::Future.failed(cannot_authenticate_error) 219 end 220 end 221 when Protocol::ReadyResponse 222 ::Ione::Future.resolved(connection) 223 when Protocol::ErrorResponse 224 ::Ione::Future.failed( 225 r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :one, 0) 226 ) 227 else 228 ::Ione::Future.failed( 229 Errors::InternalError.new("Unexpected response #{r.inspect}") 230 ) 231 end 232 end 233 end