class Cassandra::Cluster::Client
@private
Constants
- BATCH_TYPES
- BOOTSTRAPPING_ERROR_CODE
- CLIENT_CLOSED
- CLIENT_NOT_CONNECTED
- NOT_CONNECTED
- NO_CONNECTIONS
- OVERLOADED_ERROR_CODE
- READ_TIMEOUT_ERROR_CODE
- SELECT_SCHEMA_LOCAL
- SELECT_SCHEMA_PEERS
- SERVER_ERROR_CODE
- UNAVAILABLE_ERROR_CODE
- UNPREPARED_ERROR_CODE
- WRITE_TIMEOUT_ERROR_CODE
Attributes
keyspace[R]
Public Class Methods
new(logger, cluster_registry, cluster_schema, io_reactor, connector, profile_manager, reconnection_policy, address_resolution_policy, connection_options, futures_factory, timestamp_generator)
click to toggle source
# File lib/cassandra/cluster/client.rb 27 def initialize(logger, 28 cluster_registry, 29 cluster_schema, 30 io_reactor, 31 connector, 32 profile_manager, 33 reconnection_policy, 34 address_resolution_policy, 35 connection_options, 36 futures_factory, 37 timestamp_generator) 38 @logger = logger 39 @registry = cluster_registry 40 @schema = cluster_schema 41 @reactor = io_reactor 42 @connector = connector 43 @profile_manager = profile_manager 44 @reconnection_policy = reconnection_policy 45 @address_resolver = address_resolution_policy 46 @connection_options = connection_options 47 @futures = futures_factory 48 @connections = ::Hash.new 49 @prepared_statements = ::Hash.new 50 @preparing_statements = ::Hash.new {|hash, host| hash[host] = {}} 51 @pending_connections = ::Hash.new 52 @keyspace = nil 53 @state = :idle 54 @timestamp_generator = timestamp_generator 55 56 mon_initialize 57 end
Public Instance Methods
batch(statement, options)
click to toggle source
# File lib/cassandra/cluster/client.rb 320 def batch(statement, options) 321 if @connection_options.protocol_version < 2 322 return @futures.error( 323 Errors::ClientError.new( 324 'Batch statements are not supported by the current version of ' \ 325 'Apache Cassandra' 326 ) 327 ) 328 end 329 330 timestamp = @timestamp_generator.next if @timestamp_generator && @connection_options.protocol_version > 2 331 payload = nil 332 payload = options.payload if @connection_options.protocol_version >= 4 333 timeout = options.timeout 334 request = Protocol::BatchRequest.new(BATCH_TYPES[statement.type], 335 options.consistency, 336 options.trace?, 337 options.serial_consistency, 338 timestamp, 339 payload) 340 keyspace = @keyspace 341 plan = options.load_balancing_policy.plan(keyspace, statement, options) 342 promise = @futures.promise 343 344 batch_by_plan(promise, keyspace, statement, options, request, plan, timeout) 345 346 promise.future 347 end
close()
click to toggle source
# File lib/cassandra/cluster/client.rb 138 def close 139 state = nil 140 141 synchronize do 142 return CLIENT_NOT_CONNECTED if @state == :idle 143 return @closed_future if @state == :closed || @state == :closing 144 145 state = @state 146 @state = :closing 147 end 148 149 @closed_future = begin 150 @registry.remove_listener(self) 151 @schema.remove_listener(self) 152 153 f = if state == :connecting 154 @connected_future.recover.flat_map { close_connections } 155 else 156 close_connections 157 end 158 159 f.map(self) 160 end 161 @closed_future.on_complete(&method(:closed)) 162 @closed_future 163 end
connect()
click to toggle source
# File lib/cassandra/cluster/client.rb 59 def connect 60 connecting_hosts = ::Hash.new 61 62 synchronize do 63 return CLIENT_CLOSED if @state == :closed || @state == :closing 64 return @connected_future if @state == :connecting || @state == :connected 65 66 @state = :connecting 67 @registry.each_host do |host| 68 distance = @profile_manager.distance(host) 69 70 case distance 71 when :ignore 72 next 73 when :local 74 pool_size = @connection_options.connections_per_local_node 75 when :remote 76 pool_size = @connection_options.connections_per_remote_node 77 else 78 @logger.error("Not connecting to #{host.ip} - invalid load balancing " \ 79 'distance. Distance must be one of ' \ 80 "#{LoadBalancing::DISTANCES.inspect}, #{distance.inspect} given") 81 next 82 end 83 84 connecting_hosts[host] = pool_size 85 @pending_connections[host] = 0 86 @preparing_statements[host] = {} 87 @connections[host] = ConnectionPool.new 88 end 89 end 90 91 @connected_future = begin 92 @logger.info('Creating session') 93 @registry.add_listener(self) 94 @schema.add_listener(self) 95 96 futures = connecting_hosts.map do |(host, pool_size)| 97 f = connect_to_host(host, pool_size) 98 f.recover do |error| 99 FailedConnection.new(error, host) 100 end 101 end 102 103 Ione::Future.all(*futures).map do |connections| 104 connections.flatten! 105 raise NO_HOSTS if connections.empty? 106 107 failed_connections = connections.reject(&:connected?) 108 109 # convert Cassandra::Protocol::CqlProtocolHandler to something with a real host 110 failed_connections.map! do |c| 111 if c.host.is_a?(String) 112 host = @registry.each_host.detect { |h| h.ip.to_s == c.host } || raise("Unable to find host #{c.host}") 113 FailedConnection.new(c.error, host) 114 else 115 c 116 end 117 end 118 119 if failed_connections.size == connections.size 120 errors = {} 121 connections.each {|c| errors[c.host] = c.error unless c.error.nil?} 122 raise Errors::NoHostsAvailable.new(errors) 123 else 124 failed_connections.each do |f| 125 connect_to_host_with_retry(f.host, 126 connecting_hosts[f.host], 127 @reconnection_policy.schedule) 128 end 129 end 130 131 self 132 end 133 end 134 @connected_future.on_complete(&method(:connected)) 135 @connected_future 136 end
execute(statement, options)
click to toggle source
# File lib/cassandra/cluster/client.rb 293 def execute(statement, options) 294 timestamp = @timestamp_generator.next if @timestamp_generator && @connection_options.protocol_version > 2 295 payload = nil 296 payload = options.payload if @connection_options.protocol_version >= 4 297 timeout = options.timeout 298 result_metadata = statement.result_metadata 299 request = Protocol::ExecuteRequest.new(nil, 300 statement.params_types, 301 statement.params, 302 result_metadata.nil?, 303 options.consistency, 304 options.serial_consistency, 305 options.page_size, 306 options.paging_state, 307 options.trace?, 308 timestamp, 309 payload) 310 promise = @futures.promise 311 312 keyspace = @keyspace 313 plan = options.load_balancing_policy.plan(keyspace, statement, options) 314 315 execute_by_plan(promise, keyspace, statement, options, request, plan, timeout) 316 317 promise.future 318 end
host_down(host)
click to toggle source
# File lib/cassandra/cluster/client.rb 201 def host_down(host) 202 pool = nil 203 204 synchronize do 205 return Ione::Future.resolved unless @connections.key?(host) 206 207 @pending_connections.delete(host) unless @pending_connections[host] > 0 208 @preparing_statements.delete(host) 209 pool = @connections.delete(host) 210 end 211 212 if pool 213 Ione::Future.all(*pool.snapshot.map!(&:close)).map(nil) 214 else 215 Ione::Future.resolved 216 end 217 end
host_found(host)
click to toggle source
These methods shall be called from inside reactor thread only
# File lib/cassandra/cluster/client.rb 166 def host_found(host) 167 nil 168 end
host_lost(host)
click to toggle source
# File lib/cassandra/cluster/client.rb 170 def host_lost(host) 171 nil 172 end
host_up(host)
click to toggle source
# File lib/cassandra/cluster/client.rb 174 def host_up(host) 175 pool_size = 0 176 177 synchronize do 178 distance = @profile_manager.distance(host) 179 case distance 180 when :ignore 181 return Ione::Future.resolved 182 when :local 183 pool_size = @connection_options.connections_per_local_node 184 when :remote 185 pool_size = @connection_options.connections_per_remote_node 186 else 187 @logger.error("Not connecting to #{host.ip} - " \ 188 'invalid load balancing distance. Distance must be one of ' \ 189 "#{LoadBalancing::DISTANCES.inspect}, #{distance.inspect} given") 190 return Ione::Future.resolved 191 end 192 193 @pending_connections[host] ||= 0 194 @preparing_statements[host] = {} 195 @connections[host] = ConnectionPool.new 196 end 197 198 connect_to_host_maybe_retry(host, pool_size) 199 end
inspect()
click to toggle source
# File lib/cassandra/cluster/client.rb 349 def inspect 350 "#<#{self.class.name}:0x#{object_id.to_s(16)}>" 351 end
keyspace_changed(keyspace)
click to toggle source
# File lib/cassandra/cluster/client.rb 222 def keyspace_changed(keyspace) 223 end
keyspace_created(keyspace)
click to toggle source
# File lib/cassandra/cluster/client.rb 219 def keyspace_created(keyspace) 220 end
keyspace_dropped(keyspace)
click to toggle source
# File lib/cassandra/cluster/client.rb 225 def keyspace_dropped(keyspace) 226 @keyspace = nil if @keyspace == keyspace.name 227 nil 228 end
prepare(cql, options)
click to toggle source
# File lib/cassandra/cluster/client.rb 271 def prepare(cql, options) 272 payload = nil 273 payload = options.payload if @connection_options.protocol_version >= 4 274 request = Protocol::PrepareRequest.new(cql, options.trace?, payload) 275 timeout = options.timeout 276 promise = @futures.promise 277 278 keyspace = @keyspace 279 statement = VOID_STATEMENT 280 plan = options.load_balancing_policy.plan(keyspace, statement, options) 281 282 send_request_by_plan(promise, 283 keyspace, 284 statement, 285 options, 286 request, 287 plan, 288 timeout) 289 290 promise.future 291 end
query(statement, options)
click to toggle source
# File lib/cassandra/cluster/client.rb 230 def query(statement, options) 231 if !statement.params.empty? && @connection_options.protocol_version == 1 232 return @futures.error( 233 Errors::ClientError.new( 234 'Positional arguments are not supported by the current version of ' \ 235 'Apache Cassandra' 236 ) 237 ) 238 end 239 240 timestamp = @timestamp_generator.next if @timestamp_generator && @connection_options.protocol_version > 2 241 payload = nil 242 payload = options.payload if @connection_options.protocol_version >= 4 243 request = Protocol::QueryRequest.new(statement.cql, 244 statement.params, 245 statement.params_types, 246 options.consistency, 247 options.serial_consistency, 248 options.page_size, 249 options.paging_state, 250 options.trace?, 251 statement.params_names, 252 timestamp, 253 payload) 254 timeout = options.timeout 255 promise = @futures.promise 256 257 keyspace = @keyspace 258 plan = options.load_balancing_policy.plan(keyspace, statement, options) 259 260 send_request_by_plan(promise, 261 keyspace, 262 statement, 263 options, 264 request, 265 plan, 266 timeout) 267 268 promise.future 269 end
Private Instance Methods
batch_and_send_request_by_plan(host, connection, promise, keyspace, batch_statement, request, options, plan, timeout, errors, hosts, retries)
click to toggle source
# File lib/cassandra/cluster/client.rb 833 def batch_and_send_request_by_plan(host, 834 connection, 835 promise, 836 keyspace, 837 batch_statement, 838 request, 839 options, 840 plan, 841 timeout, 842 errors, 843 hosts, 844 retries) 845 request.clear 846 unprepared = Hash.new {|hash, cql| hash[cql] = []} 847 848 batch_statement.statements.each do |statement| 849 cql = statement.cql 850 851 if statement.is_a?(Statements::Bound) 852 # Get the prepared statement id for this statement from our cache if possible. We are optimistic 853 # that the statement has previously been prepared on all hosts, so the id will be valid. However, if 854 # we're in the midst of preparing the statement on the given host, we know that executing with the id 855 # will fail. So, act like we don't have the prepared-statement id in that case. 856 857 id = synchronize { @preparing_statements[host][cql] ? nil : @prepared_statements[cql] } 858 859 if id 860 request.add_prepared(id, statement.params, statement.params_types) 861 else 862 unprepared[cql] << statement 863 end 864 else 865 request.add_query(cql, statement.params, statement.params_types) 866 end 867 end 868 869 if unprepared.empty? 870 do_send_request_by_plan(host, 871 connection, 872 promise, 873 keyspace, 874 batch_statement, 875 options, 876 request, 877 plan, 878 timeout, 879 errors, 880 hosts, 881 retries) 882 else 883 to_prepare = unprepared.to_a 884 futures = to_prepare.map do |cql, _| 885 prepare_statement(host, connection, cql, timeout) 886 end 887 888 Ione::Future.all(*futures).on_complete do |f| 889 if f.resolved? 890 prepared_ids = f.value 891 to_prepare.each_with_index do |(_, statements), i| 892 statements.each do |statement| 893 request.add_prepared(prepared_ids[i], 894 statement.params, 895 statement.params_types) 896 end 897 end 898 899 do_send_request_by_plan(host, 900 connection, 901 promise, 902 keyspace, 903 batch_statement, 904 options, 905 request, 906 plan, 907 timeout, 908 errors, 909 hosts, 910 retries) 911 else 912 f.on_failure do |e| 913 if e.is_a?(Errors::HostError) || 914 (e.is_a?(Errors::TimeoutError) && batch_statement.idempotent?) 915 errors ||= {} 916 errors[host] = e 917 batch_by_plan(promise, 918 keyspace, 919 batch_statement, 920 options, 921 request, 922 plan, 923 timeout, 924 errors, 925 hosts, 926 retries) 927 else 928 promise.break(e) 929 end 930 end 931 end 932 end 933 end 934 end
batch_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors = nil, hosts = [], retries = -1)
click to toggle source
# File lib/cassandra/cluster/client.rb 729 def batch_by_plan(promise, 730 keyspace, 731 statement, 732 options, 733 request, 734 plan, 735 timeout, 736 errors = nil, 737 hosts = [], 738 retries = -1) 739 unless plan.has_next? 740 promise.break(Errors::NoHostsAvailable.new(errors)) 741 return 742 end 743 744 hosts << host = plan.next 745 retries += 1 746 pool = nil 747 synchronize { pool = @connections[host] } 748 749 unless pool 750 errors ||= {} 751 errors[host] = NOT_CONNECTED 752 return batch_by_plan(promise, 753 keyspace, 754 statement, 755 options, 756 request, 757 plan, 758 timeout, 759 errors, 760 hosts, 761 retries) 762 end 763 764 connection = pool.random_connection 765 766 if keyspace && connection.keyspace != keyspace 767 switch = switch_keyspace(connection, keyspace, timeout) 768 switch.on_complete do |s| 769 if s.resolved? 770 batch_and_send_request_by_plan(host, 771 connection, 772 promise, 773 keyspace, 774 statement, 775 request, 776 options, 777 plan, 778 timeout, 779 errors, 780 hosts, 781 retries) 782 else 783 s.on_failure do |e| 784 if e.is_a?(Errors::HostError) || 785 (e.is_a?(Errors::TimeoutError) && statement.idempotent?) 786 errors ||= {} 787 errors[host] = e 788 batch_by_plan(promise, 789 keyspace, 790 statement, 791 options, 792 request, 793 plan, 794 timeout, 795 errors, 796 hosts, 797 retries) 798 else 799 promise.break(e) 800 end 801 end 802 end 803 end 804 else 805 batch_and_send_request_by_plan(host, 806 connection, 807 promise, 808 keyspace, 809 statement, 810 request, 811 options, 812 plan, 813 timeout, 814 errors, 815 hosts, 816 retries) 817 end 818 rescue => e 819 errors ||= {} 820 errors[host] = e 821 batch_by_plan(promise, 822 keyspace, 823 statement, 824 options, 825 request, 826 plan, 827 timeout, 828 errors, 829 hosts, 830 retries) 831 end
close_connections()
click to toggle source
# File lib/cassandra/cluster/client.rb 422 def close_connections 423 futures = [] 424 synchronize do 425 @connections.each do |_host, connections| 426 connections.snapshot.each do |c| 427 futures << c.close 428 end 429 end.clear 430 end 431 432 Ione::Future.all(*futures).map(self) 433 end
closed(f)
click to toggle source
# File lib/cassandra/cluster/client.rb 408 def closed(f) 409 synchronize do 410 @state = :closed 411 412 if f.resolved? 413 @logger.info('Session closed') 414 else 415 f.on_failure do |e| 416 @logger.error("Session failed to close (#{e.class.name}: #{e.message})") 417 end 418 end 419 end 420 end
connect_to_host(host, pool_size)
click to toggle source
# File lib/cassandra/cluster/client.rb 458 def connect_to_host(host, pool_size) 459 size = 0 460 461 synchronize do 462 unless @connections.include?(host) 463 @logger.info("Not connecting to #{host.ip} - host is currently down") 464 return NO_CONNECTIONS 465 end 466 467 pool = @connections[host] 468 size = pool_size - pool.size 469 470 if size <= 0 471 @logger.info("Not connecting to #{host.ip} - host is already fully connected") 472 return NO_CONNECTIONS 473 end 474 475 size -= @pending_connections[host] 476 477 if size <= 0 478 @logger.info("Not connecting to #{host.ip} - " \ 479 'host is already pending connections') 480 return NO_CONNECTIONS 481 end 482 483 @pending_connections[host] += size 484 end 485 486 @logger.debug("Creating #{size} request connections to #{host.ip}") 487 futures = size.times.map do 488 @connector.connect(host).recover do |e| 489 FailedConnection.new(e, host) 490 end 491 end 492 493 Ione::Future.all(*futures).flat_map do |connections| 494 error = nil 495 496 connections.reject! do |connection| 497 if connection.connected? 498 false 499 else 500 error = connection.error 501 true 502 end 503 end 504 505 @logger.debug("Created #{connections.size} request connections to #{host.ip}") 506 507 pool = nil 508 509 synchronize do 510 @pending_connections[host] -= size 511 512 if @connections.include?(host) 513 pool = @connections[host] 514 else 515 @pending_connections.delete(host) unless @pending_connections[host] > 0 516 end 517 end 518 519 if pool 520 pool.add_connections(connections) 521 522 connections.each do |connection| 523 connection.on_closed do |cause| 524 if cause 525 @logger.info('Request connection closed ' \ 526 "(#{cause.class.name}: #{cause.message})") 527 else 528 @logger.info('Request connection closed') 529 end 530 connect_to_host_maybe_retry(host, pool_size) if cause 531 end 532 end 533 else 534 connections.each(&:close) 535 end 536 537 if error 538 Ione::Future.failed(error) 539 else 540 Ione::Future.resolved(connections) 541 end 542 end 543 end
connect_to_host_maybe_retry(host, pool_size)
click to toggle source
# File lib/cassandra/cluster/client.rb 435 def connect_to_host_maybe_retry(host, pool_size) 436 connect_to_host(host, pool_size).fallback do |e| 437 @logger.error('Scheduling initial connection retry to ' \ 438 "#{host.ip} (#{e.class.name}: #{e.message})") 439 connect_to_host_with_retry(host, pool_size, @reconnection_policy.schedule) 440 end.map(nil) 441 end
connect_to_host_with_retry(host, pool_size, schedule)
click to toggle source
# File lib/cassandra/cluster/client.rb 443 def connect_to_host_with_retry(host, pool_size, schedule) 444 interval = schedule.next 445 446 @logger.debug("Reconnecting to #{host.ip} in #{interval} seconds") 447 448 f = @reactor.schedule_timer(interval) 449 f.flat_map do 450 connect_to_host(host, pool_size).fallback do |e| 451 @logger.error('Scheduling connection retry to ' \ 452 "#{host.ip} (#{e.class.name}: #{e.message})") 453 connect_to_host_with_retry(host, pool_size, schedule) 454 end 455 end 456 end
connected(f)
click to toggle source
# File lib/cassandra/cluster/client.rb 388 def connected(f) 389 if f.resolved? 390 synchronize do 391 @state = :connected 392 end 393 394 @logger.info('Session created') 395 else 396 synchronize do 397 @state = :defunct 398 end 399 400 f.on_failure do |e| 401 @logger.error("Session failed to connect (#{e.class.name}: #{e.message})") 402 end 403 404 close 405 end 406 end
do_send_request_by_plan(host, connection, promise, keyspace, statement, options, request, plan, timeout, errors, hosts, retries)
click to toggle source
# File lib/cassandra/cluster/client.rb 1040 def do_send_request_by_plan(host, 1041 connection, 1042 promise, 1043 keyspace, 1044 statement, 1045 options, 1046 request, 1047 plan, 1048 timeout, 1049 errors, 1050 hosts, 1051 retries) 1052 request.retries = retries 1053 1054 f = connection.send_request(request, timeout) 1055 f.on_complete do |response_future| 1056 errors ||= {} 1057 handle_response(response_future, 1058 host, 1059 connection, 1060 promise, 1061 keyspace, 1062 statement, 1063 options, 1064 request, 1065 plan, 1066 timeout, 1067 errors, 1068 hosts, 1069 retries) 1070 end 1071 rescue => e 1072 promise.break(e) 1073 end
execute_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors = nil, hosts = [], retries = -1)
click to toggle source
# File lib/cassandra/cluster/client.rb 545 def execute_by_plan(promise, 546 keyspace, 547 statement, 548 options, 549 request, 550 plan, 551 timeout, 552 errors = nil, 553 hosts = [], 554 retries = -1) 555 unless plan.has_next? 556 promise.break(Errors::NoHostsAvailable.new(errors)) 557 return 558 end 559 560 hosts << host = plan.next 561 retries += 1 562 563 pool = nil 564 synchronize { pool = @connections[host] } 565 566 unless pool 567 errors ||= {} 568 errors[host] = NOT_CONNECTED 569 return execute_by_plan(promise, 570 keyspace, 571 statement, 572 options, 573 request, 574 plan, 575 timeout, 576 errors, 577 hosts, 578 retries) 579 end 580 581 connection = pool.random_connection 582 583 if keyspace && connection.keyspace != keyspace 584 switch = switch_keyspace(connection, keyspace, timeout) 585 switch.on_complete do |s| 586 if s.resolved? 587 prepare_and_send_request_by_plan(host, 588 connection, 589 promise, 590 keyspace, 591 statement, 592 options, 593 request, 594 plan, 595 timeout, 596 errors, 597 hosts, 598 retries) 599 else 600 s.on_failure do |e| 601 if e.is_a?(Errors::HostError) || 602 (e.is_a?(Errors::TimeoutError) && statement.idempotent?) 603 errors ||= {} 604 errors[host] = e 605 execute_by_plan(promise, 606 keyspace, 607 statement, 608 options, 609 request, 610 plan, 611 timeout, 612 errors, 613 hosts, 614 retries) 615 else 616 promise.break(e) 617 end 618 end 619 end 620 end 621 else 622 prepare_and_send_request_by_plan(host, 623 connection, 624 promise, 625 keyspace, 626 statement, 627 options, 628 request, 629 plan, 630 timeout, 631 errors, 632 hosts, 633 retries) 634 end 635 rescue => e 636 errors ||= {} 637 errors[host] = e 638 execute_by_plan(promise, 639 keyspace, 640 statement, 641 options, 642 request, 643 plan, 644 timeout, 645 errors, 646 hosts, 647 retries) 648 end
handle_response(response_future, host, connection, promise, keyspace, statement, options, request, plan, timeout, errors, hosts, retries)
click to toggle source
# File lib/cassandra/cluster/client.rb 1075 def handle_response(response_future, 1076 host, 1077 connection, 1078 promise, 1079 keyspace, 1080 statement, 1081 options, 1082 request, 1083 plan, 1084 timeout, 1085 errors, 1086 hosts, 1087 retries) 1088 if response_future.resolved? 1089 r = response_future.value 1090 1091 begin 1092 decision = nil 1093 1094 case r 1095 when Protocol::UnavailableErrorResponse 1096 decision = options.retry_policy.unavailable(statement, 1097 r.consistency, 1098 r.required, 1099 r.alive, 1100 retries) 1101 when Protocol::WriteTimeoutErrorResponse 1102 decision = options.retry_policy.write_timeout(statement, 1103 r.consistency, 1104 r.write_type, 1105 r.blockfor, 1106 r.received, 1107 retries) 1108 when Protocol::ReadTimeoutErrorResponse 1109 decision = options.retry_policy.read_timeout(statement, 1110 r.consistency, 1111 r.blockfor, 1112 r.received, 1113 r.data_present, 1114 retries) 1115 when Protocol::UnpreparedErrorResponse 1116 cql = nil 1117 if statement.is_a?(Cassandra::Statements::Batch) 1118 # Find the prepared statement with the prepared-statement-id reported by the node. 1119 unprepared_child = statement.statements.select do |s| 1120 (s.is_a?(Cassandra::Statements::Prepared) || s.is_a?(Cassandra::Statements::Bound)) && s.id == r.id 1121 end.first 1122 cql = unprepared_child ? unprepared_child.cql : nil 1123 else 1124 # This is a normal statement, so we have everything we need. 1125 cql = statement.cql 1126 synchronize { @preparing_statements[host].delete(cql) } 1127 end 1128 1129 prepare = prepare_statement(host, connection, cql, timeout) 1130 prepare.on_complete do |_| 1131 if prepare.resolved? 1132 request.id = prepare.value unless request.is_a?(Cassandra::Protocol::BatchRequest) 1133 do_send_request_by_plan(host, 1134 connection, 1135 promise, 1136 keyspace, 1137 statement, 1138 options, 1139 request, 1140 plan, 1141 timeout, 1142 errors, 1143 hosts, 1144 retries) 1145 else 1146 prepare.on_failure do |e| 1147 if e.is_a?(Errors::HostError) || 1148 (e.is_a?(Errors::TimeoutError) && statement.idempotent?) 1149 errors[host] = e 1150 execute_by_plan(promise, 1151 keyspace, 1152 statement, 1153 options, 1154 request, 1155 plan, 1156 timeout, 1157 errors, 1158 hosts, 1159 retries) 1160 else 1161 promise.break(e) 1162 end 1163 end 1164 end 1165 end 1166 when Protocol::ErrorResponse 1167 error = r.to_error(keyspace, 1168 statement, 1169 options, 1170 hosts, 1171 request.consistency, 1172 retries) 1173 1174 if error.is_a?(Errors::HostError) || 1175 (error.is_a?(Errors::TimeoutError) && statement.idempotent?) 1176 errors[host] = error 1177 1178 case request 1179 when Protocol::QueryRequest, Protocol::PrepareRequest 1180 send_request_by_plan(promise, 1181 keyspace, 1182 statement, 1183 options, 1184 request, 1185 plan, 1186 timeout, 1187 errors, 1188 hosts, 1189 retries) 1190 when Protocol::ExecuteRequest 1191 execute_by_plan(promise, 1192 keyspace, 1193 statement, 1194 options, 1195 request, 1196 plan, 1197 timeout, 1198 errors, 1199 hosts, 1200 retries) 1201 when Protocol::BatchRequest 1202 batch_by_plan(promise, 1203 keyspace, 1204 statement, 1205 options, 1206 request, 1207 plan, 1208 timeout, 1209 errors, 1210 hosts, 1211 retries) 1212 end 1213 else 1214 promise.break(error) 1215 end 1216 when Protocol::SetKeyspaceResultResponse 1217 @keyspace = r.keyspace 1218 promise.fulfill(Cassandra::Results::Void.new(r.custom_payload, 1219 r.warnings, 1220 r.trace_id, 1221 keyspace, 1222 statement, 1223 options, 1224 hosts, 1225 request.consistency, 1226 retries, 1227 self, 1228 @futures)) 1229 when Protocol::PreparedResultResponse 1230 cql = request.cql 1231 synchronize do 1232 @prepared_statements[cql] = r.id 1233 @preparing_statements[host].delete(cql) 1234 end 1235 1236 metadata = r.metadata 1237 pk_idx = r.pk_idx 1238 pk_idx ||= @schema.get_pk_idx(metadata) 1239 1240 promise.fulfill( 1241 Statements::Prepared.new(r.id, 1242 r.custom_payload, 1243 r.warnings, 1244 cql, 1245 metadata, 1246 r.result_metadata, 1247 pk_idx, 1248 r.trace_id, 1249 keyspace, 1250 statement, 1251 options, 1252 hosts, 1253 request.consistency, 1254 retries, 1255 self, 1256 @connection_options) 1257 ) 1258 when Protocol::RawRowsResultResponse 1259 r.materialize(statement.result_metadata) 1260 promise.fulfill( 1261 Results::Paged.new(r.custom_payload, 1262 r.warnings, 1263 r.rows, 1264 r.paging_state, 1265 r.trace_id, 1266 keyspace, 1267 statement, 1268 options, 1269 hosts, 1270 request.consistency, 1271 retries, 1272 self, 1273 @futures) 1274 ) 1275 when Protocol::RowsResultResponse 1276 promise.fulfill( 1277 Results::Paged.new(r.custom_payload, 1278 r.warnings, 1279 r.rows, 1280 r.paging_state, 1281 r.trace_id, 1282 keyspace, 1283 statement, 1284 options, 1285 hosts, 1286 request.consistency, 1287 retries, 1288 self, 1289 @futures) 1290 ) 1291 when Protocol::SchemaChangeResultResponse 1292 if r.change == 'DROPPED' && 1293 r.target == Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE 1294 @schema.delete_keyspace(r.keyspace) 1295 end 1296 1297 @logger.debug('Waiting for schema to propagate to all hosts after a change') 1298 wait_for_schema_agreement(connection, 1299 @reconnection_policy.schedule).on_complete do |f| 1300 unless f.resolved? 1301 f.on_failure do |e| 1302 @logger.error( 1303 "Schema agreement failure (#{e.class.name}: #{e.message})" 1304 ) 1305 end 1306 end 1307 promise.fulfill( 1308 Results::Void.new(r.custom_payload, 1309 r.warnings, 1310 r.trace_id, 1311 keyspace, 1312 statement, 1313 options, 1314 hosts, 1315 request.consistency, 1316 retries, 1317 self, 1318 @futures) 1319 ) 1320 end 1321 else 1322 promise.fulfill(Results::Void.new(r.custom_payload, 1323 r.warnings, 1324 r.trace_id, 1325 keyspace, 1326 statement, 1327 options, 1328 hosts, 1329 request.consistency, 1330 retries, 1331 self, 1332 @futures)) 1333 end 1334 1335 if decision 1336 case decision 1337 when Retry::Decisions::Retry 1338 request.consistency = decision.consistency 1339 do_send_request_by_plan(host, 1340 connection, 1341 promise, 1342 keyspace, 1343 statement, 1344 options, 1345 request, 1346 plan, 1347 timeout, 1348 errors, 1349 hosts, 1350 retries + 1) 1351 when Retry::Decisions::TryNextHost 1352 errors[host] = r.to_error(keyspace, 1353 statement, 1354 options, 1355 hosts, 1356 request.consistency, 1357 retries) 1358 case request 1359 when Protocol::QueryRequest, Protocol::PrepareRequest 1360 send_request_by_plan(promise, 1361 keyspace, 1362 statement, 1363 options, 1364 request, 1365 plan, 1366 timeout, 1367 errors, 1368 hosts, 1369 retries) 1370 when Protocol::ExecuteRequest 1371 execute_by_plan(promise, 1372 keyspace, 1373 statement, 1374 options, 1375 request, 1376 plan, 1377 timeout, 1378 errors, 1379 hosts, 1380 retries) 1381 when Protocol::BatchRequest 1382 batch_by_plan(promise, 1383 keyspace, 1384 statement, 1385 options, 1386 request, 1387 plan, 1388 timeout, 1389 errors, 1390 hosts, 1391 retries) 1392 else 1393 promise.break(e) 1394 end 1395 when Retry::Decisions::Ignore 1396 promise.fulfill( 1397 Results::Void.new(r.custom_payload, 1398 r.warnings, 1399 nil, 1400 keyspace, 1401 statement, 1402 options, 1403 hosts, 1404 request.consistency, 1405 retries, 1406 self, 1407 @futures) 1408 ) 1409 when Retry::Decisions::Reraise 1410 promise.break( 1411 r.to_error(keyspace, 1412 statement, 1413 options, 1414 hosts, 1415 request.consistency, 1416 retries) 1417 ) 1418 else 1419 promise.break( 1420 r.to_error(keyspace, 1421 statement, 1422 options, 1423 hosts, 1424 request.consistency, 1425 retries) 1426 ) 1427 end 1428 end 1429 rescue => e 1430 promise.break(e) 1431 end 1432 else 1433 response_future.on_failure do |ex| 1434 if ex.is_a?(Errors::HostError) || 1435 (ex.is_a?(Errors::TimeoutError) && statement.idempotent?) 1436 1437 errors[host] = ex 1438 case request 1439 when Protocol::QueryRequest, Protocol::PrepareRequest 1440 send_request_by_plan(promise, 1441 keyspace, 1442 statement, 1443 options, 1444 request, 1445 plan, 1446 timeout, 1447 errors, 1448 hosts, 1449 retries) 1450 when Protocol::ExecuteRequest 1451 execute_by_plan(promise, 1452 keyspace, 1453 statement, 1454 options, 1455 request, 1456 plan, 1457 timeout, 1458 errors, 1459 hosts, 1460 retries) 1461 when Protocol::BatchRequest 1462 batch_by_plan(promise, 1463 keyspace, 1464 statement, 1465 options, 1466 request, 1467 plan, 1468 timeout, 1469 errors, 1470 hosts, 1471 retries) 1472 else 1473 promise.break(ex) 1474 end 1475 else 1476 promise.break(ex) 1477 end 1478 end 1479 end 1480 end
peer_ip(data)
click to toggle source
# File lib/cassandra/cluster/client.rb 1520 def peer_ip(data) 1521 ip = data['rpc_address'] 1522 ip = data['peer'] if ip == '0.0.0.0' 1523 1524 @address_resolver.resolve(ip) 1525 end
prepare_and_send_request_by_plan(host, connection, promise, keyspace, statement, options, request, plan, timeout, errors, hosts, retries)
click to toggle source
# File lib/cassandra/cluster/client.rb 650 def prepare_and_send_request_by_plan(host, 651 connection, 652 promise, 653 keyspace, 654 statement, 655 options, 656 request, 657 plan, 658 timeout, 659 errors, 660 hosts, 661 retries) 662 cql = statement.cql 663 664 # Get the prepared statement id for this statement from our cache if possible. We are optimistic 665 # that the statement has previously been prepared on all hosts, so the id will be valid. However, if 666 # we're in the midst of preparing the statement on the given host, we know that executing with the id 667 # will fail. So, act like we don't have the prepared-statement id in that case. 668 669 id = synchronize { @preparing_statements[host][cql] ? nil : @prepared_statements[cql] } 670 671 if id 672 request.id = id 673 do_send_request_by_plan(host, 674 connection, 675 promise, 676 keyspace, 677 statement, 678 options, 679 request, 680 plan, 681 timeout, 682 errors, 683 hosts, 684 retries) 685 else 686 prepare = prepare_statement(host, connection, cql, timeout) 687 prepare.on_complete do |_| 688 if prepare.resolved? 689 request.id = prepare.value 690 do_send_request_by_plan(host, 691 connection, 692 promise, 693 keyspace, 694 statement, 695 options, 696 request, 697 plan, 698 timeout, 699 errors, 700 hosts, 701 retries) 702 else 703 prepare.on_failure do |e| 704 if e.is_a?(Errors::HostError) || 705 (e.is_a?(Errors::TimeoutError) && statement.idempotent?) 706 errors ||= {} 707 errors[host] = e 708 execute_by_plan(promise, 709 keyspace, 710 statement, 711 options, 712 request, 713 plan, 714 timeout, 715 errors, 716 hosts, 717 retries) 718 else 719 promise.break(e) 720 end 721 end 722 end 723 end 724 end 725 rescue => e 726 promise.break(e) 727 end
prepare_statement(host, connection, cql, timeout)
click to toggle source
# File lib/cassandra/cluster/client.rb 1566 def prepare_statement(host, connection, cql, timeout) 1567 synchronize do 1568 pending = @preparing_statements[host] 1569 1570 return pending[cql] if pending.key?(cql) 1571 end 1572 1573 request = Protocol::PrepareRequest.new(cql, false) 1574 1575 f = connection.send_request(request, timeout).map do |r| 1576 case r 1577 when Protocol::PreparedResultResponse 1578 id = r.id 1579 synchronize do 1580 @prepared_statements[cql] = id 1581 @preparing_statements[host].delete(cql) 1582 end 1583 id 1584 when Protocol::ErrorResponse 1585 raise r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :quorum, 0) 1586 else 1587 raise Errors::InternalError, "Unexpected response #{r.inspect}" 1588 end 1589 end 1590 1591 synchronize do 1592 @preparing_statements[host][cql] = f 1593 end 1594 1595 f 1596 end
send_request_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors = nil, hosts = [], retries = -1)
click to toggle source
# File lib/cassandra/cluster/client.rb 936 def send_request_by_plan(promise, 937 keyspace, 938 statement, 939 options, 940 request, 941 plan, 942 timeout, 943 errors = nil, 944 hosts = [], 945 retries = -1) 946 unless plan.has_next? 947 promise.break(Errors::NoHostsAvailable.new(errors)) 948 return 949 end 950 951 hosts << host = plan.next 952 retries += 1 953 pool = nil 954 synchronize { pool = @connections[host] } 955 956 unless pool 957 errors ||= {} 958 errors[host] = NOT_CONNECTED 959 return send_request_by_plan(promise, 960 keyspace, 961 statement, 962 options, 963 request, 964 plan, 965 timeout, 966 errors, 967 hosts, 968 retries) 969 end 970 971 connection = pool.random_connection 972 973 if keyspace && connection.keyspace != keyspace 974 switch = switch_keyspace(connection, keyspace, timeout) 975 switch.on_complete do |s| 976 if s.resolved? 977 do_send_request_by_plan(host, 978 connection, 979 promise, 980 keyspace, 981 statement, 982 options, 983 request, 984 plan, 985 timeout, 986 errors, 987 hosts, 988 retries) 989 else 990 s.on_failure do |e| 991 if e.is_a?(Errors::HostError) || 992 (e.is_a?(Errors::TimeoutError) && statement.idempotent?) 993 errors ||= {} 994 errors[host] = e 995 send_request_by_plan(promise, 996 keyspace, 997 statement, 998 options, 999 request, 1000 plan, 1001 timeout, 1002 errors, 1003 hosts, 1004 retries) 1005 else 1006 promise.break(e) 1007 end 1008 end 1009 end 1010 end 1011 else 1012 do_send_request_by_plan(host, 1013 connection, 1014 promise, 1015 keyspace, 1016 statement, 1017 options, 1018 request, 1019 plan, 1020 timeout, 1021 errors, 1022 hosts, 1023 retries) 1024 end 1025 rescue => e 1026 errors ||= {} 1027 errors[host] = e 1028 send_request_by_plan(promise, 1029 keyspace, 1030 statement, 1031 options, 1032 request, 1033 plan, 1034 timeout, 1035 errors, 1036 hosts, 1037 retries) 1038 end
send_select_request(connection, request)
click to toggle source
# File lib/cassandra/cluster/client.rb 1598 def send_select_request(connection, request) 1599 connection.send_request(request).map do |r| 1600 case r 1601 when Protocol::RowsResultResponse 1602 r.rows 1603 when Protocol::ErrorResponse 1604 raise r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :quorum, 0) 1605 else 1606 raise Errors::InternalError, "Unexpected response #{r.inspect}" 1607 end 1608 end 1609 end
switch_keyspace(connection, keyspace, timeout)
click to toggle source
# File lib/cassandra/cluster/client.rb 1527 def switch_keyspace(connection, keyspace, timeout) 1528 pending_keyspace = connection[:pending_keyspace] 1529 pending_switch = connection[:pending_switch] 1530 1531 return pending_switch || Ione::Future.resolved if pending_keyspace == keyspace 1532 1533 request = Protocol::QueryRequest.new("USE #{Util.escape_name(keyspace)}", 1534 EMPTY_LIST, 1535 EMPTY_LIST, 1536 :quorum) 1537 1538 f = connection.send_request(request, timeout).map do |r| 1539 case r 1540 when Protocol::SetKeyspaceResultResponse 1541 @keyspace = r.keyspace 1542 nil 1543 when Protocol::ErrorResponse 1544 raise r.to_error(nil, 1545 Statements::Simple.new("USE #{Util.escape_name(keyspace)}"), 1546 VOID_OPTIONS, 1547 EMPTY_LIST, 1548 :quorum, 1549 0) 1550 else 1551 raise Errors::InternalError, "Unexpected response #{r.inspect}" 1552 end 1553 end 1554 1555 connection[:pending_keyspace] = keyspace 1556 connection[:pending_switch] = f 1557 1558 f.on_complete do |_f| 1559 connection[:pending_switch] = nil 1560 connection[:pending_keyspace] = nil 1561 end 1562 1563 f 1564 end
wait_for_schema_agreement(connection, schedule)
click to toggle source
# File lib/cassandra/cluster/client.rb 1482 def wait_for_schema_agreement(connection, schedule) 1483 peers_future = send_select_request(connection, SELECT_SCHEMA_PEERS) 1484 local_future = send_select_request(connection, SELECT_SCHEMA_LOCAL) 1485 1486 Ione::Future.all(peers_future, local_future).flat_map do |(peers, local)| 1487 versions = ::Set.new 1488 1489 unless local.empty? 1490 host = @registry.host(connection.host) 1491 1492 if host && @profile_manager.distance(host) != :ignore 1493 versions << version = local.first['schema_version'] 1494 @logger.debug("Host #{host.ip} schema version is #{version}") 1495 end 1496 end 1497 1498 peers.each do |row| 1499 host = @registry.host(peer_ip(row)) 1500 next unless host && @profile_manager.distance(host) != :ignore 1501 1502 versions << version = row['schema_version'] 1503 @logger.debug("Host #{host.ip} schema version is #{version}") 1504 end 1505 1506 if versions.one? 1507 @logger.debug('All hosts have the same schema') 1508 Ione::Future.resolved 1509 else 1510 interval = schedule.next 1511 @logger.info('Hosts have different schema versions: ' \ 1512 "#{versions.to_a.inspect}, retrying in #{interval} seconds") 1513 @reactor.schedule_timer(interval).flat_map do 1514 wait_for_schema_agreement(connection, schedule) 1515 end 1516 end 1517 end 1518 end