class Cassandra::Cluster::Client

@private

Constants

BATCH_TYPES
BOOTSTRAPPING_ERROR_CODE
CLIENT_CLOSED
CLIENT_NOT_CONNECTED
NOT_CONNECTED
NO_CONNECTIONS
OVERLOADED_ERROR_CODE
READ_TIMEOUT_ERROR_CODE
SELECT_SCHEMA_LOCAL
SELECT_SCHEMA_PEERS
SERVER_ERROR_CODE
UNAVAILABLE_ERROR_CODE
UNPREPARED_ERROR_CODE
WRITE_TIMEOUT_ERROR_CODE

Attributes

keyspace[R]

Public Class Methods

new(logger, cluster_registry, cluster_schema, io_reactor, connector, profile_manager, reconnection_policy, address_resolution_policy, connection_options, futures_factory, timestamp_generator) click to toggle source
   # File lib/cassandra/cluster/client.rb
27 def initialize(logger,
28                cluster_registry,
29                cluster_schema,
30                io_reactor,
31                connector,
32                profile_manager,
33                reconnection_policy,
34                address_resolution_policy,
35                connection_options,
36                futures_factory,
37                timestamp_generator)
38   @logger                      = logger
39   @registry                    = cluster_registry
40   @schema                      = cluster_schema
41   @reactor                     = io_reactor
42   @connector                   = connector
43   @profile_manager             = profile_manager
44   @reconnection_policy         = reconnection_policy
45   @address_resolver            = address_resolution_policy
46   @connection_options          = connection_options
47   @futures                     = futures_factory
48   @connections                 = ::Hash.new
49   @prepared_statements         = ::Hash.new
50   @preparing_statements        = ::Hash.new {|hash, host| hash[host] = {}}
51   @pending_connections         = ::Hash.new
52   @keyspace                    = nil
53   @state                       = :idle
54   @timestamp_generator         = timestamp_generator
55 
56   mon_initialize
57 end

Public Instance Methods

batch(statement, options) click to toggle source
    # File lib/cassandra/cluster/client.rb
320 def batch(statement, options)
321   if @connection_options.protocol_version < 2
322     return @futures.error(
323       Errors::ClientError.new(
324         'Batch statements are not supported by the current version of ' \
325         'Apache Cassandra'
326       )
327     )
328   end
329 
330   timestamp = @timestamp_generator.next if @timestamp_generator && @connection_options.protocol_version > 2
331   payload   = nil
332   payload   = options.payload if @connection_options.protocol_version >= 4
333   timeout   = options.timeout
334   request   = Protocol::BatchRequest.new(BATCH_TYPES[statement.type],
335                                          options.consistency,
336                                          options.trace?,
337                                          options.serial_consistency,
338                                          timestamp,
339                                          payload)
340   keyspace  = @keyspace
341   plan      = options.load_balancing_policy.plan(keyspace, statement, options)
342   promise   = @futures.promise
343 
344   batch_by_plan(promise, keyspace, statement, options, request, plan, timeout)
345 
346   promise.future
347 end
close() click to toggle source
    # File lib/cassandra/cluster/client.rb
138 def close
139   state = nil
140 
141   synchronize do
142     return CLIENT_NOT_CONNECTED if @state == :idle
143     return @closed_future if @state == :closed || @state == :closing
144 
145     state = @state
146     @state = :closing
147   end
148 
149   @closed_future = begin
150     @registry.remove_listener(self)
151     @schema.remove_listener(self)
152 
153     f = if state == :connecting
154           @connected_future.recover.flat_map { close_connections }
155         else
156           close_connections
157         end
158 
159     f.map(self)
160   end
161   @closed_future.on_complete(&method(:closed))
162   @closed_future
163 end
connect() click to toggle source
    # File lib/cassandra/cluster/client.rb
 59 def connect
 60   connecting_hosts = ::Hash.new
 61 
 62   synchronize do
 63     return CLIENT_CLOSED     if @state == :closed || @state == :closing
 64     return @connected_future if @state == :connecting || @state == :connected
 65 
 66     @state = :connecting
 67     @registry.each_host do |host|
 68       distance = @profile_manager.distance(host)
 69 
 70       case distance
 71       when :ignore
 72         next
 73       when :local
 74         pool_size = @connection_options.connections_per_local_node
 75       when :remote
 76         pool_size = @connection_options.connections_per_remote_node
 77       else
 78         @logger.error("Not connecting to #{host.ip} - invalid load balancing " \
 79           'distance. Distance must be one of ' \
 80           "#{LoadBalancing::DISTANCES.inspect}, #{distance.inspect} given")
 81         next
 82       end
 83 
 84       connecting_hosts[host] = pool_size
 85       @pending_connections[host] = 0
 86       @preparing_statements[host] = {}
 87       @connections[host] = ConnectionPool.new
 88     end
 89   end
 90 
 91   @connected_future = begin
 92     @logger.info('Creating session')
 93     @registry.add_listener(self)
 94     @schema.add_listener(self)
 95 
 96     futures = connecting_hosts.map do |(host, pool_size)|
 97       f = connect_to_host(host, pool_size)
 98       f.recover do |error|
 99         FailedConnection.new(error, host)
100       end
101     end
102 
103     Ione::Future.all(*futures).map do |connections|
104       connections.flatten!
105       raise NO_HOSTS if connections.empty?
106 
107       failed_connections = connections.reject(&:connected?)
108 
109       # convert Cassandra::Protocol::CqlProtocolHandler to something with a real host
110       failed_connections.map! do |c|
111         if c.host.is_a?(String)
112           host = @registry.each_host.detect { |h| h.ip.to_s == c.host } || raise("Unable to find host #{c.host}")
113           FailedConnection.new(c.error, host)
114         else
115           c
116         end
117       end
118 
119       if failed_connections.size == connections.size
120         errors = {}
121         connections.each {|c| errors[c.host] = c.error unless c.error.nil?}
122         raise Errors::NoHostsAvailable.new(errors)
123       else
124         failed_connections.each do |f|
125           connect_to_host_with_retry(f.host,
126                                      connecting_hosts[f.host],
127                                      @reconnection_policy.schedule)
128         end
129       end
130 
131       self
132     end
133   end
134   @connected_future.on_complete(&method(:connected))
135   @connected_future
136 end
execute(statement, options) click to toggle source
    # File lib/cassandra/cluster/client.rb
293 def execute(statement, options)
294   timestamp = @timestamp_generator.next if @timestamp_generator && @connection_options.protocol_version > 2
295   payload         = nil
296   payload         = options.payload if @connection_options.protocol_version >= 4
297   timeout         = options.timeout
298   result_metadata = statement.result_metadata
299   request         = Protocol::ExecuteRequest.new(nil,
300                                                  statement.params_types,
301                                                  statement.params,
302                                                  result_metadata.nil?,
303                                                  options.consistency,
304                                                  options.serial_consistency,
305                                                  options.page_size,
306                                                  options.paging_state,
307                                                  options.trace?,
308                                                  timestamp,
309                                                  payload)
310   promise         = @futures.promise
311 
312   keyspace = @keyspace
313   plan     = options.load_balancing_policy.plan(keyspace, statement, options)
314 
315   execute_by_plan(promise, keyspace, statement, options, request, plan, timeout)
316 
317   promise.future
318 end
host_down(host) click to toggle source
    # File lib/cassandra/cluster/client.rb
201 def host_down(host)
202   pool = nil
203 
204   synchronize do
205     return Ione::Future.resolved unless @connections.key?(host)
206 
207     @pending_connections.delete(host) unless @pending_connections[host] > 0
208     @preparing_statements.delete(host)
209     pool = @connections.delete(host)
210   end
211 
212   if pool
213     Ione::Future.all(*pool.snapshot.map!(&:close)).map(nil)
214   else
215     Ione::Future.resolved
216   end
217 end
host_found(host) click to toggle source

These methods shall be called from inside reactor thread only

    # File lib/cassandra/cluster/client.rb
166 def host_found(host)
167   nil
168 end
host_lost(host) click to toggle source
    # File lib/cassandra/cluster/client.rb
170 def host_lost(host)
171   nil
172 end
host_up(host) click to toggle source
    # File lib/cassandra/cluster/client.rb
174 def host_up(host)
175   pool_size = 0
176 
177   synchronize do
178     distance = @profile_manager.distance(host)
179     case distance
180     when :ignore
181       return Ione::Future.resolved
182     when :local
183       pool_size = @connection_options.connections_per_local_node
184     when :remote
185       pool_size = @connection_options.connections_per_remote_node
186     else
187       @logger.error("Not connecting to #{host.ip} - " \
188         'invalid load balancing distance. Distance must be one of ' \
189         "#{LoadBalancing::DISTANCES.inspect}, #{distance.inspect} given")
190       return Ione::Future.resolved
191     end
192 
193     @pending_connections[host] ||= 0
194     @preparing_statements[host] = {}
195     @connections[host] = ConnectionPool.new
196   end
197 
198   connect_to_host_maybe_retry(host, pool_size)
199 end
inspect() click to toggle source
    # File lib/cassandra/cluster/client.rb
349 def inspect
350   "#<#{self.class.name}:0x#{object_id.to_s(16)}>"
351 end
keyspace_changed(keyspace) click to toggle source
    # File lib/cassandra/cluster/client.rb
222 def keyspace_changed(keyspace)
223 end
keyspace_created(keyspace) click to toggle source
    # File lib/cassandra/cluster/client.rb
219 def keyspace_created(keyspace)
220 end
keyspace_dropped(keyspace) click to toggle source
    # File lib/cassandra/cluster/client.rb
225 def keyspace_dropped(keyspace)
226   @keyspace = nil if @keyspace == keyspace.name
227   nil
228 end
prepare(cql, options) click to toggle source
    # File lib/cassandra/cluster/client.rb
271 def prepare(cql, options)
272   payload = nil
273   payload = options.payload if @connection_options.protocol_version >= 4
274   request = Protocol::PrepareRequest.new(cql, options.trace?, payload)
275   timeout = options.timeout
276   promise = @futures.promise
277 
278   keyspace  = @keyspace
279   statement = VOID_STATEMENT
280   plan      = options.load_balancing_policy.plan(keyspace, statement, options)
281 
282   send_request_by_plan(promise,
283                        keyspace,
284                        statement,
285                        options,
286                        request,
287                        plan,
288                        timeout)
289 
290   promise.future
291 end
query(statement, options) click to toggle source
    # File lib/cassandra/cluster/client.rb
230 def query(statement, options)
231   if !statement.params.empty? && @connection_options.protocol_version == 1
232     return @futures.error(
233       Errors::ClientError.new(
234         'Positional arguments are not supported by the current version of ' \
235         'Apache Cassandra'
236       )
237     )
238   end
239 
240   timestamp = @timestamp_generator.next if @timestamp_generator && @connection_options.protocol_version > 2
241   payload   = nil
242   payload   = options.payload if @connection_options.protocol_version >= 4
243   request   = Protocol::QueryRequest.new(statement.cql,
244                                          statement.params,
245                                          statement.params_types,
246                                          options.consistency,
247                                          options.serial_consistency,
248                                          options.page_size,
249                                          options.paging_state,
250                                          options.trace?,
251                                          statement.params_names,
252                                          timestamp,
253                                          payload)
254   timeout   = options.timeout
255   promise   = @futures.promise
256 
257   keyspace = @keyspace
258   plan = options.load_balancing_policy.plan(keyspace, statement, options)
259 
260   send_request_by_plan(promise,
261                        keyspace,
262                        statement,
263                        options,
264                        request,
265                        plan,
266                        timeout)
267 
268   promise.future
269 end

Private Instance Methods

batch_and_send_request_by_plan(host, connection, promise, keyspace, batch_statement, request, options, plan, timeout, errors, hosts, retries) click to toggle source
    # File lib/cassandra/cluster/client.rb
833 def batch_and_send_request_by_plan(host,
834                                    connection,
835                                    promise,
836                                    keyspace,
837                                    batch_statement,
838                                    request,
839                                    options,
840                                    plan,
841                                    timeout,
842                                    errors,
843                                    hosts,
844                                    retries)
845   request.clear
846   unprepared = Hash.new {|hash, cql| hash[cql] = []}
847 
848   batch_statement.statements.each do |statement|
849     cql = statement.cql
850 
851     if statement.is_a?(Statements::Bound)
852       # Get the prepared statement id for this statement from our cache if possible. We are optimistic
853       # that the statement has previously been prepared on all hosts, so the id will be valid. However, if
854       # we're in the midst of preparing the statement on the given host, we know that executing with the id
855       # will fail. So, act like we don't have the prepared-statement id in that case.
856 
857       id = synchronize { @preparing_statements[host][cql] ? nil : @prepared_statements[cql] }
858 
859       if id
860         request.add_prepared(id, statement.params, statement.params_types)
861       else
862         unprepared[cql] << statement
863       end
864     else
865       request.add_query(cql, statement.params, statement.params_types)
866     end
867   end
868 
869   if unprepared.empty?
870     do_send_request_by_plan(host,
871                             connection,
872                             promise,
873                             keyspace,
874                             batch_statement,
875                             options,
876                             request,
877                             plan,
878                             timeout,
879                             errors,
880                             hosts,
881                             retries)
882   else
883     to_prepare = unprepared.to_a
884     futures    = to_prepare.map do |cql, _|
885       prepare_statement(host, connection, cql, timeout)
886     end
887 
888     Ione::Future.all(*futures).on_complete do |f|
889       if f.resolved?
890         prepared_ids = f.value
891         to_prepare.each_with_index do |(_, statements), i|
892           statements.each do |statement|
893             request.add_prepared(prepared_ids[i],
894                                  statement.params,
895                                  statement.params_types)
896           end
897         end
898 
899         do_send_request_by_plan(host,
900                                 connection,
901                                 promise,
902                                 keyspace,
903                                 batch_statement,
904                                 options,
905                                 request,
906                                 plan,
907                                 timeout,
908                                 errors,
909                                 hosts,
910                                 retries)
911       else
912         f.on_failure do |e|
913           if e.is_a?(Errors::HostError) ||
914              (e.is_a?(Errors::TimeoutError) && batch_statement.idempotent?)
915             errors ||= {}
916             errors[host] = e
917             batch_by_plan(promise,
918                           keyspace,
919                           batch_statement,
920                           options,
921                           request,
922                           plan,
923                           timeout,
924                           errors,
925                           hosts,
926                           retries)
927           else
928             promise.break(e)
929           end
930         end
931       end
932     end
933   end
934 end
batch_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors = nil, hosts = [], retries = -1) click to toggle source
    # File lib/cassandra/cluster/client.rb
729 def batch_by_plan(promise,
730                   keyspace,
731                   statement,
732                   options,
733                   request,
734                   plan,
735                   timeout,
736                   errors = nil,
737                   hosts = [],
738                   retries = -1)
739   unless plan.has_next?
740     promise.break(Errors::NoHostsAvailable.new(errors))
741     return
742   end
743 
744   hosts << host = plan.next
745   retries += 1
746   pool = nil
747   synchronize { pool = @connections[host] }
748 
749   unless pool
750     errors ||= {}
751     errors[host] = NOT_CONNECTED
752     return batch_by_plan(promise,
753                          keyspace,
754                          statement,
755                          options,
756                          request,
757                          plan,
758                          timeout,
759                          errors,
760                          hosts,
761                          retries)
762   end
763 
764   connection = pool.random_connection
765 
766   if keyspace && connection.keyspace != keyspace
767     switch = switch_keyspace(connection, keyspace, timeout)
768     switch.on_complete do |s|
769       if s.resolved?
770         batch_and_send_request_by_plan(host,
771                                        connection,
772                                        promise,
773                                        keyspace,
774                                        statement,
775                                        request,
776                                        options,
777                                        plan,
778                                        timeout,
779                                        errors,
780                                        hosts,
781                                        retries)
782       else
783         s.on_failure do |e|
784           if e.is_a?(Errors::HostError) ||
785              (e.is_a?(Errors::TimeoutError) && statement.idempotent?)
786             errors ||= {}
787             errors[host] = e
788             batch_by_plan(promise,
789                           keyspace,
790                           statement,
791                           options,
792                           request,
793                           plan,
794                           timeout,
795                           errors,
796                           hosts,
797                           retries)
798           else
799             promise.break(e)
800           end
801         end
802       end
803     end
804   else
805     batch_and_send_request_by_plan(host,
806                                    connection,
807                                    promise,
808                                    keyspace,
809                                    statement,
810                                    request,
811                                    options,
812                                    plan,
813                                    timeout,
814                                    errors,
815                                    hosts,
816                                    retries)
817   end
818 rescue => e
819   errors ||= {}
820   errors[host] = e
821   batch_by_plan(promise,
822                 keyspace,
823                 statement,
824                 options,
825                 request,
826                 plan,
827                 timeout,
828                 errors,
829                 hosts,
830                 retries)
831 end
close_connections() click to toggle source
    # File lib/cassandra/cluster/client.rb
422 def close_connections
423   futures = []
424   synchronize do
425     @connections.each do |_host, connections|
426       connections.snapshot.each do |c|
427         futures << c.close
428       end
429     end.clear
430   end
431 
432   Ione::Future.all(*futures).map(self)
433 end
closed(f) click to toggle source
    # File lib/cassandra/cluster/client.rb
408 def closed(f)
409   synchronize do
410     @state = :closed
411 
412     if f.resolved?
413       @logger.info('Session closed')
414     else
415       f.on_failure do |e|
416         @logger.error("Session failed to close (#{e.class.name}: #{e.message})")
417       end
418     end
419   end
420 end
connect_to_host(host, pool_size) click to toggle source
    # File lib/cassandra/cluster/client.rb
458 def connect_to_host(host, pool_size)
459   size = 0
460 
461   synchronize do
462     unless @connections.include?(host)
463       @logger.info("Not connecting to #{host.ip} - host is currently down")
464       return NO_CONNECTIONS
465     end
466 
467     pool = @connections[host]
468     size = pool_size - pool.size
469 
470     if size <= 0
471       @logger.info("Not connecting to #{host.ip} - host is already fully connected")
472       return NO_CONNECTIONS
473     end
474 
475     size -= @pending_connections[host]
476 
477     if size <= 0
478       @logger.info("Not connecting to #{host.ip} - " \
479         'host is already pending connections')
480       return NO_CONNECTIONS
481     end
482 
483     @pending_connections[host] += size
484   end
485 
486   @logger.debug("Creating #{size} request connections to #{host.ip}")
487   futures = size.times.map do
488     @connector.connect(host).recover do |e|
489       FailedConnection.new(e, host)
490     end
491   end
492 
493   Ione::Future.all(*futures).flat_map do |connections|
494     error = nil
495 
496     connections.reject! do |connection|
497       if connection.connected?
498         false
499       else
500         error = connection.error
501         true
502       end
503     end
504 
505     @logger.debug("Created #{connections.size} request connections to #{host.ip}")
506 
507     pool = nil
508 
509     synchronize do
510       @pending_connections[host] -= size
511 
512       if @connections.include?(host)
513         pool = @connections[host]
514       else
515         @pending_connections.delete(host) unless @pending_connections[host] > 0
516       end
517     end
518 
519     if pool
520       pool.add_connections(connections)
521 
522       connections.each do |connection|
523         connection.on_closed do |cause|
524           if cause
525             @logger.info('Request connection closed ' \
526                 "(#{cause.class.name}: #{cause.message})")
527           else
528             @logger.info('Request connection closed')
529           end
530           connect_to_host_maybe_retry(host, pool_size) if cause
531         end
532       end
533     else
534       connections.each(&:close)
535     end
536 
537     if error
538       Ione::Future.failed(error)
539     else
540       Ione::Future.resolved(connections)
541     end
542   end
543 end
connect_to_host_maybe_retry(host, pool_size) click to toggle source
    # File lib/cassandra/cluster/client.rb
435 def connect_to_host_maybe_retry(host, pool_size)
436   connect_to_host(host, pool_size).fallback do |e|
437     @logger.error('Scheduling initial connection retry to ' \
438       "#{host.ip} (#{e.class.name}: #{e.message})")
439     connect_to_host_with_retry(host, pool_size, @reconnection_policy.schedule)
440   end.map(nil)
441 end
connect_to_host_with_retry(host, pool_size, schedule) click to toggle source
    # File lib/cassandra/cluster/client.rb
443 def connect_to_host_with_retry(host, pool_size, schedule)
444   interval = schedule.next
445 
446   @logger.debug("Reconnecting to #{host.ip} in #{interval} seconds")
447 
448   f = @reactor.schedule_timer(interval)
449   f.flat_map do
450     connect_to_host(host, pool_size).fallback do |e|
451       @logger.error('Scheduling connection retry to ' \
452         "#{host.ip} (#{e.class.name}: #{e.message})")
453       connect_to_host_with_retry(host, pool_size, schedule)
454     end
455   end
456 end
connected(f) click to toggle source
    # File lib/cassandra/cluster/client.rb
388 def connected(f)
389   if f.resolved?
390     synchronize do
391       @state = :connected
392     end
393 
394     @logger.info('Session created')
395   else
396     synchronize do
397       @state = :defunct
398     end
399 
400     f.on_failure do |e|
401       @logger.error("Session failed to connect (#{e.class.name}: #{e.message})")
402     end
403 
404     close
405   end
406 end
do_send_request_by_plan(host, connection, promise, keyspace, statement, options, request, plan, timeout, errors, hosts, retries) click to toggle source
     # File lib/cassandra/cluster/client.rb
1040 def do_send_request_by_plan(host,
1041                             connection,
1042                             promise,
1043                             keyspace,
1044                             statement,
1045                             options,
1046                             request,
1047                             plan,
1048                             timeout,
1049                             errors,
1050                             hosts,
1051                             retries)
1052   request.retries = retries
1053 
1054   f = connection.send_request(request, timeout)
1055   f.on_complete do |response_future|
1056     errors ||= {}
1057     handle_response(response_future,
1058                     host,
1059                     connection,
1060                     promise,
1061                     keyspace,
1062                     statement,
1063                     options,
1064                     request,
1065                     plan,
1066                     timeout,
1067                     errors,
1068                     hosts,
1069                     retries)
1070   end
1071 rescue => e
1072   promise.break(e)
1073 end
execute_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors = nil, hosts = [], retries = -1) click to toggle source
    # File lib/cassandra/cluster/client.rb
545 def execute_by_plan(promise,
546                     keyspace,
547                     statement,
548                     options,
549                     request,
550                     plan,
551                     timeout,
552                     errors = nil,
553                     hosts = [],
554                     retries = -1)
555   unless plan.has_next?
556     promise.break(Errors::NoHostsAvailable.new(errors))
557     return
558   end
559 
560   hosts << host = plan.next
561   retries += 1
562 
563   pool = nil
564   synchronize { pool = @connections[host] }
565 
566   unless pool
567     errors ||= {}
568     errors[host] = NOT_CONNECTED
569     return execute_by_plan(promise,
570                            keyspace,
571                            statement,
572                            options,
573                            request,
574                            plan,
575                            timeout,
576                            errors,
577                            hosts,
578                            retries)
579   end
580 
581   connection = pool.random_connection
582 
583   if keyspace && connection.keyspace != keyspace
584     switch = switch_keyspace(connection, keyspace, timeout)
585     switch.on_complete do |s|
586       if s.resolved?
587         prepare_and_send_request_by_plan(host,
588                                          connection,
589                                          promise,
590                                          keyspace,
591                                          statement,
592                                          options,
593                                          request,
594                                          plan,
595                                          timeout,
596                                          errors,
597                                          hosts,
598                                          retries)
599       else
600         s.on_failure do |e|
601           if e.is_a?(Errors::HostError) ||
602              (e.is_a?(Errors::TimeoutError) && statement.idempotent?)
603             errors ||= {}
604             errors[host] = e
605             execute_by_plan(promise,
606                             keyspace,
607                             statement,
608                             options,
609                             request,
610                             plan,
611                             timeout,
612                             errors,
613                             hosts,
614                             retries)
615           else
616             promise.break(e)
617           end
618         end
619       end
620     end
621   else
622     prepare_and_send_request_by_plan(host,
623                                      connection,
624                                      promise,
625                                      keyspace,
626                                      statement,
627                                      options,
628                                      request,
629                                      plan,
630                                      timeout,
631                                      errors,
632                                      hosts,
633                                      retries)
634   end
635 rescue => e
636   errors ||= {}
637   errors[host] = e
638   execute_by_plan(promise,
639                   keyspace,
640                   statement,
641                   options,
642                   request,
643                   plan,
644                   timeout,
645                   errors,
646                   hosts,
647                   retries)
648 end
handle_response(response_future, host, connection, promise, keyspace, statement, options, request, plan, timeout, errors, hosts, retries) click to toggle source
     # File lib/cassandra/cluster/client.rb
1075 def handle_response(response_future,
1076                     host,
1077                     connection,
1078                     promise,
1079                     keyspace,
1080                     statement,
1081                     options,
1082                     request,
1083                     plan,
1084                     timeout,
1085                     errors,
1086                     hosts,
1087                     retries)
1088   if response_future.resolved?
1089     r = response_future.value
1090 
1091     begin
1092       decision = nil
1093 
1094       case r
1095       when Protocol::UnavailableErrorResponse
1096         decision = options.retry_policy.unavailable(statement,
1097                                                     r.consistency,
1098                                                     r.required,
1099                                                     r.alive,
1100                                                     retries)
1101       when Protocol::WriteTimeoutErrorResponse
1102         decision = options.retry_policy.write_timeout(statement,
1103                                                       r.consistency,
1104                                                       r.write_type,
1105                                                       r.blockfor,
1106                                                       r.received,
1107                                                       retries)
1108       when Protocol::ReadTimeoutErrorResponse
1109         decision = options.retry_policy.read_timeout(statement,
1110                                                      r.consistency,
1111                                                      r.blockfor,
1112                                                      r.received,
1113                                                      r.data_present,
1114                                                      retries)
1115       when Protocol::UnpreparedErrorResponse
1116         cql = nil
1117         if statement.is_a?(Cassandra::Statements::Batch)
1118           # Find the prepared statement with the prepared-statement-id reported by the node.
1119           unprepared_child = statement.statements.select do |s|
1120             (s.is_a?(Cassandra::Statements::Prepared) || s.is_a?(Cassandra::Statements::Bound)) && s.id == r.id
1121           end.first
1122           cql = unprepared_child ? unprepared_child.cql : nil
1123         else
1124           # This is a normal statement, so we have everything we need.
1125           cql = statement.cql
1126           synchronize { @preparing_statements[host].delete(cql) }
1127         end
1128 
1129         prepare = prepare_statement(host, connection, cql, timeout)
1130         prepare.on_complete do |_|
1131           if prepare.resolved?
1132             request.id = prepare.value unless request.is_a?(Cassandra::Protocol::BatchRequest)
1133             do_send_request_by_plan(host,
1134                                     connection,
1135                                     promise,
1136                                     keyspace,
1137                                     statement,
1138                                     options,
1139                                     request,
1140                                     plan,
1141                                     timeout,
1142                                     errors,
1143                                     hosts,
1144                                     retries)
1145           else
1146             prepare.on_failure do |e|
1147               if e.is_a?(Errors::HostError) ||
1148                  (e.is_a?(Errors::TimeoutError) && statement.idempotent?)
1149                 errors[host] = e
1150                 execute_by_plan(promise,
1151                                 keyspace,
1152                                 statement,
1153                                 options,
1154                                 request,
1155                                 plan,
1156                                 timeout,
1157                                 errors,
1158                                 hosts,
1159                                 retries)
1160               else
1161                 promise.break(e)
1162               end
1163             end
1164           end
1165         end
1166       when Protocol::ErrorResponse
1167         error = r.to_error(keyspace,
1168                            statement,
1169                            options,
1170                            hosts,
1171                            request.consistency,
1172                            retries)
1173 
1174         if error.is_a?(Errors::HostError) ||
1175            (error.is_a?(Errors::TimeoutError) && statement.idempotent?)
1176           errors[host] = error
1177 
1178           case request
1179           when Protocol::QueryRequest, Protocol::PrepareRequest
1180             send_request_by_plan(promise,
1181                                  keyspace,
1182                                  statement,
1183                                  options,
1184                                  request,
1185                                  plan,
1186                                  timeout,
1187                                  errors,
1188                                  hosts,
1189                                  retries)
1190           when Protocol::ExecuteRequest
1191             execute_by_plan(promise,
1192                             keyspace,
1193                             statement,
1194                             options,
1195                             request,
1196                             plan,
1197                             timeout,
1198                             errors,
1199                             hosts,
1200                             retries)
1201           when Protocol::BatchRequest
1202             batch_by_plan(promise,
1203                           keyspace,
1204                           statement,
1205                           options,
1206                           request,
1207                           plan,
1208                           timeout,
1209                           errors,
1210                           hosts,
1211                           retries)
1212           end
1213         else
1214           promise.break(error)
1215         end
1216       when Protocol::SetKeyspaceResultResponse
1217         @keyspace = r.keyspace
1218         promise.fulfill(Cassandra::Results::Void.new(r.custom_payload,
1219                                                      r.warnings,
1220                                                      r.trace_id,
1221                                                      keyspace,
1222                                                      statement,
1223                                                      options,
1224                                                      hosts,
1225                                                      request.consistency,
1226                                                      retries,
1227                                                      self,
1228                                                      @futures))
1229       when Protocol::PreparedResultResponse
1230         cql = request.cql
1231         synchronize do
1232           @prepared_statements[cql] = r.id
1233           @preparing_statements[host].delete(cql)
1234         end
1235 
1236         metadata = r.metadata
1237         pk_idx = r.pk_idx
1238         pk_idx ||= @schema.get_pk_idx(metadata)
1239 
1240         promise.fulfill(
1241           Statements::Prepared.new(r.id,
1242                                    r.custom_payload,
1243                                    r.warnings,
1244                                    cql,
1245                                    metadata,
1246                                    r.result_metadata,
1247                                    pk_idx,
1248                                    r.trace_id,
1249                                    keyspace,
1250                                    statement,
1251                                    options,
1252                                    hosts,
1253                                    request.consistency,
1254                                    retries,
1255                                    self,
1256                                    @connection_options)
1257         )
1258       when Protocol::RawRowsResultResponse
1259         r.materialize(statement.result_metadata)
1260         promise.fulfill(
1261           Results::Paged.new(r.custom_payload,
1262                              r.warnings,
1263                              r.rows,
1264                              r.paging_state,
1265                              r.trace_id,
1266                              keyspace,
1267                              statement,
1268                              options,
1269                              hosts,
1270                              request.consistency,
1271                              retries,
1272                              self,
1273                              @futures)
1274         )
1275       when Protocol::RowsResultResponse
1276         promise.fulfill(
1277           Results::Paged.new(r.custom_payload,
1278                              r.warnings,
1279                              r.rows,
1280                              r.paging_state,
1281                              r.trace_id,
1282                              keyspace,
1283                              statement,
1284                              options,
1285                              hosts,
1286                              request.consistency,
1287                              retries,
1288                              self,
1289                              @futures)
1290         )
1291       when Protocol::SchemaChangeResultResponse
1292         if r.change == 'DROPPED' &&
1293            r.target == Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE
1294           @schema.delete_keyspace(r.keyspace)
1295         end
1296 
1297         @logger.debug('Waiting for schema to propagate to all hosts after a change')
1298         wait_for_schema_agreement(connection,
1299                                   @reconnection_policy.schedule).on_complete do |f|
1300           unless f.resolved?
1301             f.on_failure do |e|
1302               @logger.error(
1303                 "Schema agreement failure (#{e.class.name}: #{e.message})"
1304               )
1305             end
1306           end
1307           promise.fulfill(
1308             Results::Void.new(r.custom_payload,
1309                               r.warnings,
1310                               r.trace_id,
1311                               keyspace,
1312                               statement,
1313                               options,
1314                               hosts,
1315                               request.consistency,
1316                               retries,
1317                               self,
1318                               @futures)
1319           )
1320         end
1321       else
1322         promise.fulfill(Results::Void.new(r.custom_payload,
1323                                           r.warnings,
1324                                           r.trace_id,
1325                                           keyspace,
1326                                           statement,
1327                                           options,
1328                                           hosts,
1329                                           request.consistency,
1330                                           retries,
1331                                           self,
1332                                           @futures))
1333       end
1334 
1335       if decision
1336         case decision
1337         when Retry::Decisions::Retry
1338           request.consistency = decision.consistency
1339           do_send_request_by_plan(host,
1340                                   connection,
1341                                   promise,
1342                                   keyspace,
1343                                   statement,
1344                                   options,
1345                                   request,
1346                                   plan,
1347                                   timeout,
1348                                   errors,
1349                                   hosts,
1350                                   retries + 1)
1351         when Retry::Decisions::TryNextHost
1352           errors[host] = r.to_error(keyspace,
1353                                     statement,
1354                                     options,
1355                                     hosts,
1356                                     request.consistency,
1357                                     retries)
1358           case request
1359           when Protocol::QueryRequest, Protocol::PrepareRequest
1360             send_request_by_plan(promise,
1361                                  keyspace,
1362                                  statement,
1363                                  options,
1364                                  request,
1365                                  plan,
1366                                  timeout,
1367                                  errors,
1368                                  hosts,
1369                                  retries)
1370           when Protocol::ExecuteRequest
1371             execute_by_plan(promise,
1372                             keyspace,
1373                             statement,
1374                             options,
1375                             request,
1376                             plan,
1377                             timeout,
1378                             errors,
1379                             hosts,
1380                             retries)
1381           when Protocol::BatchRequest
1382             batch_by_plan(promise,
1383                           keyspace,
1384                           statement,
1385                           options,
1386                           request,
1387                           plan,
1388                           timeout,
1389                           errors,
1390                           hosts,
1391                           retries)
1392           else
1393             promise.break(e)
1394           end
1395         when Retry::Decisions::Ignore
1396           promise.fulfill(
1397             Results::Void.new(r.custom_payload,
1398                               r.warnings,
1399                               nil,
1400                               keyspace,
1401                               statement,
1402                               options,
1403                               hosts,
1404                               request.consistency,
1405                               retries,
1406                               self,
1407                               @futures)
1408           )
1409         when Retry::Decisions::Reraise
1410           promise.break(
1411             r.to_error(keyspace,
1412                        statement,
1413                        options,
1414                        hosts,
1415                        request.consistency,
1416                        retries)
1417           )
1418         else
1419           promise.break(
1420             r.to_error(keyspace,
1421                        statement,
1422                        options,
1423                        hosts,
1424                        request.consistency,
1425                        retries)
1426           )
1427         end
1428       end
1429     rescue => e
1430       promise.break(e)
1431     end
1432   else
1433     response_future.on_failure do |ex|
1434       if ex.is_a?(Errors::HostError) ||
1435          (ex.is_a?(Errors::TimeoutError) && statement.idempotent?)
1436 
1437         errors[host] = ex
1438         case request
1439         when Protocol::QueryRequest, Protocol::PrepareRequest
1440           send_request_by_plan(promise,
1441                                keyspace,
1442                                statement,
1443                                options,
1444                                request,
1445                                plan,
1446                                timeout,
1447                                errors,
1448                                hosts,
1449                                retries)
1450         when Protocol::ExecuteRequest
1451           execute_by_plan(promise,
1452                           keyspace,
1453                           statement,
1454                           options,
1455                           request,
1456                           plan,
1457                           timeout,
1458                           errors,
1459                           hosts,
1460                           retries)
1461         when Protocol::BatchRequest
1462           batch_by_plan(promise,
1463                         keyspace,
1464                         statement,
1465                         options,
1466                         request,
1467                         plan,
1468                         timeout,
1469                         errors,
1470                         hosts,
1471                         retries)
1472         else
1473           promise.break(ex)
1474         end
1475       else
1476         promise.break(ex)
1477       end
1478     end
1479   end
1480 end
peer_ip(data) click to toggle source
     # File lib/cassandra/cluster/client.rb
1520 def peer_ip(data)
1521   ip = data['rpc_address']
1522   ip = data['peer'] if ip == '0.0.0.0'
1523 
1524   @address_resolver.resolve(ip)
1525 end
prepare_and_send_request_by_plan(host, connection, promise, keyspace, statement, options, request, plan, timeout, errors, hosts, retries) click to toggle source
    # File lib/cassandra/cluster/client.rb
650 def prepare_and_send_request_by_plan(host,
651                                      connection,
652                                      promise,
653                                      keyspace,
654                                      statement,
655                                      options,
656                                      request,
657                                      plan,
658                                      timeout,
659                                      errors,
660                                      hosts,
661                                      retries)
662   cql = statement.cql
663 
664   # Get the prepared statement id for this statement from our cache if possible. We are optimistic
665   # that the statement has previously been prepared on all hosts, so the id will be valid. However, if
666   # we're in the midst of preparing the statement on the given host, we know that executing with the id
667   # will fail. So, act like we don't have the prepared-statement id in that case.
668 
669   id = synchronize { @preparing_statements[host][cql] ? nil : @prepared_statements[cql] }
670 
671   if id
672     request.id = id
673     do_send_request_by_plan(host,
674                             connection,
675                             promise,
676                             keyspace,
677                             statement,
678                             options,
679                             request,
680                             plan,
681                             timeout,
682                             errors,
683                             hosts,
684                             retries)
685   else
686     prepare = prepare_statement(host, connection, cql, timeout)
687     prepare.on_complete do |_|
688       if prepare.resolved?
689         request.id = prepare.value
690         do_send_request_by_plan(host,
691                                 connection,
692                                 promise,
693                                 keyspace,
694                                 statement,
695                                 options,
696                                 request,
697                                 plan,
698                                 timeout,
699                                 errors,
700                                 hosts,
701                                 retries)
702       else
703         prepare.on_failure do |e|
704           if e.is_a?(Errors::HostError) ||
705              (e.is_a?(Errors::TimeoutError) && statement.idempotent?)
706             errors ||= {}
707             errors[host] = e
708             execute_by_plan(promise,
709                             keyspace,
710                             statement,
711                             options,
712                             request,
713                             plan,
714                             timeout,
715                             errors,
716                             hosts,
717                             retries)
718           else
719             promise.break(e)
720           end
721         end
722       end
723     end
724   end
725 rescue => e
726   promise.break(e)
727 end
prepare_statement(host, connection, cql, timeout) click to toggle source
     # File lib/cassandra/cluster/client.rb
1566 def prepare_statement(host, connection, cql, timeout)
1567   synchronize do
1568     pending = @preparing_statements[host]
1569 
1570     return pending[cql] if pending.key?(cql)
1571   end
1572 
1573   request = Protocol::PrepareRequest.new(cql, false)
1574 
1575   f = connection.send_request(request, timeout).map do |r|
1576     case r
1577     when Protocol::PreparedResultResponse
1578       id = r.id
1579       synchronize do
1580         @prepared_statements[cql] = id
1581         @preparing_statements[host].delete(cql)
1582       end
1583       id
1584     when Protocol::ErrorResponse
1585       raise r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :one, 0)
1586     else
1587       raise Errors::InternalError, "Unexpected response #{r.inspect}"
1588     end
1589   end
1590 
1591   synchronize do
1592     @preparing_statements[host][cql] = f
1593   end
1594 
1595   f
1596 end
send_request_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors = nil, hosts = [], retries = -1) click to toggle source
     # File lib/cassandra/cluster/client.rb
 936 def send_request_by_plan(promise,
 937                          keyspace,
 938                          statement,
 939                          options,
 940                          request,
 941                          plan,
 942                          timeout,
 943                          errors = nil,
 944                          hosts = [],
 945                          retries = -1)
 946   unless plan.has_next?
 947     promise.break(Errors::NoHostsAvailable.new(errors))
 948     return
 949   end
 950 
 951   hosts << host = plan.next
 952   retries += 1
 953   pool = nil
 954   synchronize { pool = @connections[host] }
 955 
 956   unless pool
 957     errors ||= {}
 958     errors[host] = NOT_CONNECTED
 959     return send_request_by_plan(promise,
 960                                 keyspace,
 961                                 statement,
 962                                 options,
 963                                 request,
 964                                 plan,
 965                                 timeout,
 966                                 errors,
 967                                 hosts,
 968                                 retries)
 969   end
 970 
 971   connection = pool.random_connection
 972 
 973   if keyspace && connection.keyspace != keyspace
 974     switch = switch_keyspace(connection, keyspace, timeout)
 975     switch.on_complete do |s|
 976       if s.resolved?
 977         do_send_request_by_plan(host,
 978                                 connection,
 979                                 promise,
 980                                 keyspace,
 981                                 statement,
 982                                 options,
 983                                 request,
 984                                 plan,
 985                                 timeout,
 986                                 errors,
 987                                 hosts,
 988                                 retries)
 989       else
 990         s.on_failure do |e|
 991           if e.is_a?(Errors::HostError) ||
 992              (e.is_a?(Errors::TimeoutError) && statement.idempotent?)
 993             errors ||= {}
 994             errors[host] = e
 995             send_request_by_plan(promise,
 996                                  keyspace,
 997                                  statement,
 998                                  options,
 999                                  request,
1000                                  plan,
1001                                  timeout,
1002                                  errors,
1003                                  hosts,
1004                                  retries)
1005           else
1006             promise.break(e)
1007           end
1008         end
1009       end
1010     end
1011   else
1012     do_send_request_by_plan(host,
1013                             connection,
1014                             promise,
1015                             keyspace,
1016                             statement,
1017                             options,
1018                             request,
1019                             plan,
1020                             timeout,
1021                             errors,
1022                             hosts,
1023                             retries)
1024   end
1025 rescue => e
1026   errors ||= {}
1027   errors[host] = e
1028   send_request_by_plan(promise,
1029                        keyspace,
1030                        statement,
1031                        options,
1032                        request,
1033                        plan,
1034                        timeout,
1035                        errors,
1036                        hosts,
1037                        retries)
1038 end
send_select_request(connection, request) click to toggle source
     # File lib/cassandra/cluster/client.rb
1598 def send_select_request(connection, request)
1599   connection.send_request(request).map do |r|
1600     case r
1601     when Protocol::RowsResultResponse
1602       r.rows
1603     when Protocol::ErrorResponse
1604       raise r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :one, 0)
1605     else
1606       raise Errors::InternalError, "Unexpected response #{r.inspect}"
1607     end
1608   end
1609 end
switch_keyspace(connection, keyspace, timeout) click to toggle source
     # File lib/cassandra/cluster/client.rb
1527 def switch_keyspace(connection, keyspace, timeout)
1528   pending_keyspace = connection[:pending_keyspace]
1529   pending_switch   = connection[:pending_switch]
1530 
1531   return pending_switch || Ione::Future.resolved if pending_keyspace == keyspace
1532 
1533   request = Protocol::QueryRequest.new("USE #{Util.escape_name(keyspace)}",
1534                                        EMPTY_LIST,
1535                                        EMPTY_LIST,
1536                                        :one)
1537 
1538   f = connection.send_request(request, timeout).map do |r|
1539     case r
1540     when Protocol::SetKeyspaceResultResponse
1541       @keyspace = r.keyspace
1542       nil
1543     when Protocol::ErrorResponse
1544       raise r.to_error(nil,
1545                        Statements::Simple.new("USE #{Util.escape_name(keyspace)}"),
1546                        VOID_OPTIONS,
1547                        EMPTY_LIST,
1548                        :one,
1549                        0)
1550     else
1551       raise Errors::InternalError, "Unexpected response #{r.inspect}"
1552     end
1553   end
1554 
1555   connection[:pending_keyspace] = keyspace
1556   connection[:pending_switch]   = f
1557 
1558   f.on_complete do |_f|
1559     connection[:pending_switch]   = nil
1560     connection[:pending_keyspace] = nil
1561   end
1562 
1563   f
1564 end
wait_for_schema_agreement(connection, schedule) click to toggle source
     # File lib/cassandra/cluster/client.rb
1482 def wait_for_schema_agreement(connection, schedule)
1483   peers_future = send_select_request(connection, SELECT_SCHEMA_PEERS)
1484   local_future = send_select_request(connection, SELECT_SCHEMA_LOCAL)
1485 
1486   Ione::Future.all(peers_future, local_future).flat_map do |(peers, local)|
1487     versions = ::Set.new
1488 
1489     unless local.empty?
1490       host = @registry.host(connection.host)
1491 
1492       if host && @profile_manager.distance(host) != :ignore
1493         versions << version = local.first['schema_version']
1494         @logger.debug("Host #{host.ip} schema version is #{version}")
1495       end
1496     end
1497 
1498     peers.each do |row|
1499       host = @registry.host(peer_ip(row))
1500       next unless host && @profile_manager.distance(host) != :ignore
1501 
1502       versions << version = row['schema_version']
1503       @logger.debug("Host #{host.ip} schema version is #{version}")
1504     end
1505 
1506     if versions.one?
1507       @logger.debug('All hosts have the same schema')
1508       Ione::Future.resolved
1509     else
1510       interval = schedule.next
1511       @logger.info('Hosts have different schema versions: ' \
1512         "#{versions.to_a.inspect}, retrying in #{interval} seconds")
1513       @reactor.schedule_timer(interval).flat_map do
1514         wait_for_schema_agreement(connection, schedule)
1515       end
1516     end
1517   end
1518 end