class Cassandra::Cluster::ControlConnection
@private
Constants
- SELECT_LOCAL
- SELECT_PEERS
- SELECT_PEER_QUERY
Public Class Methods
new(logger, io_reactor, cluster_registry, cluster_schema, cluster_metadata, load_balancing_policy, reconnection_policy, address_resolution_policy, connector, connection_options, schema_fetcher)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 25 def initialize(logger, io_reactor, cluster_registry, cluster_schema, 26 cluster_metadata, load_balancing_policy, 27 reconnection_policy, address_resolution_policy, connector, 28 connection_options, schema_fetcher) 29 @logger = logger 30 @io_reactor = io_reactor 31 @registry = cluster_registry 32 @schema = cluster_schema 33 @metadata = cluster_metadata 34 @load_balancing_policy = load_balancing_policy 35 @reconnection_policy = reconnection_policy 36 @address_resolver = address_resolution_policy 37 @connector = connector 38 @connection_options = connection_options 39 @connection = nil 40 @schema_fetcher = schema_fetcher 41 @refreshing_statuses = ::Hash.new(false) 42 @refresh_schema_future = nil 43 @status = :closed 44 @refreshing_hosts = false 45 @refreshing_host = ::Hash.new(false) 46 @closed_promise = Ione::Promise.new 47 @schema_changes = ::Array.new 48 @schema_refresh_timer = nil 49 @schema_refresh_window = nil 50 51 mon_initialize 52 end
Public Instance Methods
close_async()
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 129 def close_async 130 synchronize do 131 return @closed_promise.future if @status == :closing || @status == :closed 132 @status = :closing 133 end 134 f = @io_reactor.stop 135 136 f.on_value(&method(:connection_closed)) 137 f.on_failure(&method(:connection_closed)) 138 139 @closed_promise.future 140 end
connect_async()
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 59 def connect_async 60 synchronize do 61 return Ione::Future.resolved if @status == :connecting || @status == :connected 62 @status = :connecting 63 end 64 65 f = @io_reactor.start.flat_map do 66 plan = @load_balancing_policy.plan(nil, VOID_STATEMENT, VOID_OPTIONS) 67 connect_to_first_available(plan) 68 end 69 f 70 end
connection_closed(cause)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 142 def connection_closed(cause) 143 @closed_promise.fulfill 144 end
host_down(host)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 101 def host_down(host) 102 schedule = nil 103 timer = nil 104 105 synchronize do 106 if @refreshing_statuses[host] || 107 @load_balancing_policy.distance(host) == :ignore 108 return Ione::Future.resolved 109 end 110 111 schedule = @reconnection_policy.schedule 112 timeout = schedule.next 113 114 @logger.debug("Starting to continuously refresh status of #{host.ip} in " \ 115 "#{timeout} seconds") 116 117 @refreshing_statuses[host] = timer = @io_reactor.schedule_timer(timeout) 118 end 119 120 timer.on_value do 121 refresh_host_status(host).fallback do |_e| 122 refresh_host_status_with_retry(timer, host, schedule) 123 end 124 end 125 126 nil 127 end
host_found(host)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 72 def host_found(host) 73 end
host_lost(host)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 75 def host_lost(host) 76 synchronize do 77 timer = @refreshing_statuses.delete(host) 78 @io_reactor.cancel_timer(timer) if timer 79 end 80 81 nil 82 end
host_up(host)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 84 def host_up(host) 85 synchronize do 86 timer = @refreshing_statuses.delete(host) 87 @io_reactor.cancel_timer(timer) if timer 88 89 unless @connection || 90 (@status == :closing || @status == :closed) || 91 @load_balancing_policy.distance(host) == :ignore 92 return connect_to_first_available( 93 @load_balancing_policy.plan(nil, VOID_STATEMENT, VOID_OPTIONS) 94 ) 95 end 96 end 97 98 Ione::Future.resolved 99 end
inspect()
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 146 def inspect 147 "#<#{self.class.name}:0x#{object_id.to_s(16)}>" 148 end
on_close(&block)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 54 def on_close(&block) 55 @closed_promise.future.on_value(&block) 56 @closed_promise.future.on_failure(&block) 57 end
Private Instance Methods
connect_to_first_available(plan, errors = nil)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 605 def connect_to_first_available(plan, errors = nil) 606 unless plan.has_next? 607 if errors.nil? && synchronize { @refreshing_statuses.empty? } 608 @logger.fatal(<<-MSG) 609 Control connection failed and is unlikely to recover. 610 611 This usually means that all hosts are ignored by current load 612 balancing policy, most likely because they changed datacenters. 613 Reconnections attempts will continue getting scheduled to 614 repeat this message in the logs. 615 MSG 616 end 617 618 return Ione::Future.failed(Errors::NoHostsAvailable.new(errors)) 619 end 620 621 host = plan.next 622 @logger.debug("Connecting to #{host.ip}") 623 624 f = connect_to_host(host) 625 f = f.flat_map do |connection| 626 synchronize do 627 @status = :connected 628 629 @connection = connection 630 631 connection.on_closed do |cause| 632 reconnect = false 633 634 synchronize do 635 if connection == @connection 636 if @status == :closing 637 @status = :closed 638 else 639 @status = :reconnecting 640 reconnect = true 641 end 642 643 if cause 644 @logger.info('Control connection closed ' \ 645 "(#{cause.class.name}: #{cause.message})") 646 else 647 @logger.info('Control connection closed') 648 end 649 650 @connection = nil 651 end 652 end 653 654 reconnect_async(@reconnection_policy.schedule) if reconnect 655 end 656 end 657 658 refresh_maybe_retry(:metadata) 659 end 660 f = f.flat_map { register_async } 661 f = f.flat_map { refresh_peers_async_maybe_retry } 662 f = f.flat_map { refresh_maybe_retry(:schema) } if @connection_options.synchronize_schema? 663 f = f.fallback do |error| 664 @logger.debug("Connection to #{host.ip} failed " \ 665 "(#{error.class.name}: #{error.message})") 666 667 case error 668 when Errors::HostError, Errors::TimeoutError 669 errors ||= {} 670 errors[host] = error 671 connect_to_first_available(plan, errors) 672 else 673 Ione::Future.failed(error) 674 end 675 end 676 677 f.on_complete do |connection_future| 678 @logger.info('Control connection established') if connection_future.resolved? 679 end 680 681 f 682 end
connect_to_host(host)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 684 def connect_to_host(host) 685 @connector.connect(host) 686 end
handle_schema_change(change)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 886 def handle_schema_change(change) 887 timer = nil 888 expiration_timer = nil 889 890 synchronize do 891 # If change is nil, it means we want to set up timers (if there are pending 892 # changes). Otherwise, we definitely have a change and want to set up timers. 893 # Also, we only want to set up timers if we're not in the middle of a full 894 # refresh. 895 @schema_changes << change if change 896 897 unless @schema_changes.empty? || @refresh_schema_future 898 @io_reactor.cancel_timer(@schema_refresh_timer) if @schema_refresh_timer 899 timer = @schema_refresh_timer = 900 @io_reactor.schedule_timer(@connection_options.schema_refresh_delay) 901 902 unless @schema_refresh_window 903 @schema_refresh_window = 904 @io_reactor.schedule_timer(@connection_options.schema_refresh_timeout) 905 expiration_timer = @schema_refresh_window 906 end 907 end 908 end 909 910 expiration_timer.on_value do 911 schema_changes = nil 912 913 synchronize do 914 @io_reactor.cancel_timer(@schema_refresh_timer) 915 916 @schema_refresh_window = nil 917 @schema_refresh_timer = nil 918 919 schema_changes = @schema_changes 920 @schema_changes = ::Array.new 921 end 922 923 process_schema_changes(schema_changes) 924 end if expiration_timer 925 926 timer.on_value do 927 schema_changes = nil 928 929 synchronize do 930 @io_reactor.cancel_timer(@schema_refresh_window) 931 932 @schema_refresh_window = nil 933 @schema_refresh_timer = nil 934 935 schema_changes = @schema_changes 936 @schema_changes = ::Array.new 937 end 938 939 process_schema_changes(schema_changes) 940 end if timer 941 942 nil 943 end
peer_ip(data, host_address)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 688 def peer_ip(data, host_address) 689 peer = data['peer'] 690 691 return nil unless peer && data['host_id'] && data['data_center'] && data['rack'] && data['tokens'] 692 693 rpc_address = data['rpc_address'] 694 695 if rpc_address.nil? 696 @logger.info("The system.peers row for '#{data['peer']}' has no rpc_address. This is likely " \ 697 'a gossip or snitch issue. This host will be ignored.') 698 return nil 699 end 700 701 if peer == host_address || rpc_address == host_address 702 # Some DSE versions were inserting a line for the local node in peers (with mostly null values). 703 # This has been fixed, but if we detect that's the case, ignore it as it's not really a big deal. 704 705 @logger.debug("System.peers on node #{host_address} has a line for itself. This is not normal but is a " \ 706 'known problem of some DSE versions. Ignoring the entry.') 707 return nil 708 end 709 710 ip = rpc_address 711 ip = peer if ip == '0.0.0.0' 712 713 @address_resolver.resolve(ip) 714 end
process_schema_changes(schema_changes)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 716 def process_schema_changes(schema_changes) 717 refresh_keyspaces = ::Hash.new 718 refresh_tables_and_views = ::Hash.new 719 refresh_types = ::Hash.new 720 721 # This hash is of the form <keyspace, [Change (for function changes)]> 722 refresh_functions = ::Hash.new 723 724 # This hash is of the form <keyspace, [Change (for aggregate changes)]> 725 refresh_aggregates = ::Hash.new 726 727 schema_changes.each do |change| 728 keyspace = change.keyspace 729 730 next if refresh_keyspaces.key?(keyspace) 731 732 case change.target 733 when Protocol::Constants::SCHEMA_CHANGE_TARGET_KEYSPACE 734 refresh_tables_and_views.delete(keyspace) 735 refresh_types.delete(keyspace) 736 refresh_functions.delete(keyspace) 737 refresh_aggregates.delete(keyspace) 738 refresh_keyspaces[keyspace] = true 739 when Protocol::Constants::SCHEMA_CHANGE_TARGET_TABLE 740 # We can't distinguish between table and view change events, so refresh both. 741 tables_and_views = refresh_tables_and_views[keyspace] ||= ::Hash.new 742 tables_and_views[change.name] = true 743 when Protocol::Constants::SCHEMA_CHANGE_TARGET_UDT 744 types = refresh_types[keyspace] ||= ::Hash.new 745 types[change.name] = true 746 when Protocol::Constants::SCHEMA_CHANGE_TARGET_FUNCTION 747 functions = refresh_functions[keyspace] ||= [] 748 functions << change 749 when Protocol::Constants::SCHEMA_CHANGE_TARGET_AGGREGATE 750 aggregates = refresh_aggregates[keyspace] ||= [] 751 aggregates << change 752 end 753 end 754 755 futures = ::Array.new 756 757 refresh_keyspaces.each_key do |keyspace| 758 futures << refresh_maybe_retry(:keyspace, keyspace) 759 end 760 761 refresh_tables_and_views.each do |(keyspace, tables_and_views)| 762 tables_and_views.each_key do |table_or_view| 763 futures << refresh_maybe_retry(:table, keyspace, table_or_view) 764 futures << refresh_maybe_retry(:materialized_view, keyspace, table_or_view) 765 end 766 end 767 768 refresh_types.each do |(keyspace, types)| 769 types.each_key do |type| 770 futures << refresh_maybe_retry(:type, keyspace, type) 771 end 772 end 773 774 refresh_functions.each do |(keyspace, function_changes)| 775 function_changes.each do |change| 776 futures << 777 refresh_maybe_retry(:function, keyspace, change.name, change.arguments) 778 end 779 end 780 781 refresh_aggregates.each do |(keyspace, aggregate_changes)| 782 aggregate_changes.each do |change| 783 futures << 784 refresh_maybe_retry(:aggregate, keyspace, change.name, change.arguments) 785 end 786 end 787 788 Ione::Future.all(*futures) 789 end
reconnect_async(schedule)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 172 def reconnect_async(schedule) 173 timeout = schedule.next 174 175 @logger.debug("Reestablishing control connection in #{timeout} seconds") 176 177 f = @io_reactor.schedule_timer(timeout) 178 f = f.flat_map do 179 if synchronize { @status == :reconnecting } 180 plan = @load_balancing_policy.plan(nil, VOID_STATEMENT, VOID_OPTIONS) 181 connect_to_first_available(plan) 182 else 183 Ione::Future.resolved 184 end 185 end 186 f.fallback do |e| 187 @logger.error("Control connection failed (#{e.class.name}: #{e.message})") 188 189 return Ione::Future.resolved unless synchronize { @status == :reconnecting } 190 191 # We're reconnecting... 192 reconnect_async(schedule) 193 end 194 end
refresh_aggregate_async(keyspace_name, aggregate_name, aggregate_args)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 369 def refresh_aggregate_async(keyspace_name, aggregate_name, aggregate_args) 370 connection = @connection 371 372 return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil? 373 374 @logger.info('Refreshing user-defined aggregate ' \ 375 "\"#{keyspace_name}.#{aggregate_name}\"") 376 377 # aggregate_args is an array of string, and we need an array of parsed types. 378 parsed_aggregate_args = @schema_fetcher.parse_argument_types(connection, 379 keyspace_name, 380 aggregate_args) 381 @schema_fetcher.fetch_aggregate(connection, 382 keyspace_name, 383 aggregate_name, 384 parsed_aggregate_args).map do |aggregate| 385 if aggregate 386 @schema.replace_aggregate(aggregate) 387 else 388 @schema.delete_aggregate(keyspace_name, aggregate_name, parsed_aggregate_args) 389 end 390 391 @logger.info('Completed refreshing user-defined aggregate ' \ 392 "\"#{keyspace_name}.#{aggregate_name}(#{aggregate_args.join(',')})\"") 393 end 394 end
refresh_function_async(keyspace_name, function_name, function_args)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 343 def refresh_function_async(keyspace_name, function_name, function_args) 344 connection = @connection 345 346 return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil? 347 348 @logger.info('Refreshing user-defined function ' \ 349 "\"#{keyspace_name}.#{function_name}\"") 350 351 # function_args is an array of string, and we need an array of parsed types. 352 parsed_function_args = 353 @schema_fetcher.parse_argument_types(connection, keyspace_name, function_args) 354 @schema_fetcher.fetch_function(connection, 355 keyspace_name, 356 function_name, 357 parsed_function_args).map do |function| 358 if function 359 @schema.replace_function(function) 360 else 361 @schema.delete_function(keyspace_name, function_name, parsed_function_args) 362 end 363 364 @logger.info('Completed refreshing user-defined function ' \ 365 "\"#{keyspace_name}.#{function_name}(#{function_args.join(',')})\"") 366 end 367 end
refresh_host_async(address)
click to toggle source
@param address [IPAddr] address node address, as reported from a C* event. Thus it is not resolved
relative to the client, but rather is the address that other nodes would use to communicate with this node.
# File lib/cassandra/cluster/control_connection.rb 571 def refresh_host_async(address) 572 connection = @connection 573 return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil? 574 575 ip = address.to_s 576 577 @logger.info("Refreshing host metadata: #{ip}") 578 579 request = if ip == connection.host 580 SELECT_LOCAL 581 else 582 Protocol::QueryRequest.new(SELECT_PEER_QUERY % ip, 583 EMPTY_LIST, 584 EMPTY_LIST, 585 :one) 586 end 587 588 send_select_request(connection, request).map do |rows| 589 raise Errors::InternalError, "Unable to find host metadata: #{ip}" if rows.empty? 590 591 @logger.info("Completed refreshing host metadata: #{ip}") 592 address = if ip == connection.host 593 @address_resolver.resolve(address) 594 else 595 peer_ip(rows.first, connection.host) 596 end 597 @registry.host_found(address, rows.first) 598 599 self 600 end 601 rescue => e 602 @logger.error("Refreshing host metadata failed (#{e.class.name}: #{e.message})") 603 end
refresh_host_async_maybe_retry(address)
click to toggle source
@param address [IPAddr] address node address, as reported from a C* event. Thus it is not resolved
relative to the client, but rather is the address that other nodes would use to communicate with this node.
# File lib/cassandra/cluster/control_connection.rb 521 def refresh_host_async_maybe_retry(address) 522 synchronize do 523 return Ione::Future.resolved if @refreshing_hosts || @refreshing_host[address] 524 @refreshing_host[address] = true 525 end 526 527 refresh_host_async(address).fallback do |e| 528 case e 529 when Errors::HostError, Errors::TimeoutError 530 refresh_host_async_retry(address, e, @reconnection_policy.schedule) 531 else 532 connection = @connection 533 connection && connection.close(e) 534 535 Ione::Future.failed(e) 536 end 537 end.map do 538 synchronize do 539 @refreshing_host.delete(address) 540 end 541 end 542 end
refresh_host_async_retry(address, error, schedule)
click to toggle source
@param address [IPAddr] address node address, as reported from a C* event. Thus it is not resolved
relative to the client, but rather is the address that other nodes would use to communicate with this node.
# File lib/cassandra/cluster/control_connection.rb 547 def refresh_host_async_retry(address, error, schedule) 548 timeout = schedule.next 549 @logger.info("Failed to refresh host #{address} (#{error.class.name}: " \ 550 "#{error.message}), retrying in #{timeout}") 551 552 timer = @io_reactor.schedule_timer(timeout) 553 timer.flat_map do 554 refresh_host_async(address).fallback do |e| 555 case e 556 when Errors::HostError, Errors::TimeoutError 557 refresh_host_async_retry(address, e, schedule) 558 else 559 connection = @connection 560 connection && connection.close(e) 561 562 Ione::Future.failed(e) 563 end 564 end 565 end 566 end
refresh_host_status(host)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 491 def refresh_host_status(host) 492 @connector.refresh_status(host) 493 end
refresh_host_status_with_retry(original_timer, host, schedule)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 495 def refresh_host_status_with_retry(original_timer, host, schedule) 496 timer = nil 497 498 synchronize do 499 timer = @refreshing_statuses[host] 500 501 # host must have been lost/up or timer was rescheduled 502 return Ione::Future.resolved if timer.nil? || timer != original_timer 503 504 timeout = schedule.next 505 506 @logger.debug("Checking host #{host.ip} in #{timeout} seconds") 507 508 @refreshing_statuses[host] = timer = @io_reactor.schedule_timer(timeout) 509 end 510 511 timer.on_value do 512 refresh_host_status(host).fallback do |_e| 513 refresh_host_status_with_retry(timer, host, schedule) 514 end 515 end 516 end
refresh_keyspace_async(keyspace_name)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 271 def refresh_keyspace_async(keyspace_name) 272 connection = @connection 273 274 return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil? 275 276 @logger.info("Refreshing keyspace \"#{keyspace_name}\"") 277 278 @schema_fetcher.fetch_keyspace(connection, keyspace_name).map do |keyspace| 279 if keyspace 280 @schema.replace_keyspace(keyspace) 281 else 282 @schema.delete_keyspace(keyspace_name) 283 end 284 285 @logger.info("Completed refreshing keyspace \"#{keyspace_name}\"") 286 end 287 end
refresh_materialized_view_async(keyspace_name, view_name)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 307 def refresh_materialized_view_async(keyspace_name, view_name) 308 connection = @connection 309 310 return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil? 311 312 @logger.info("Refreshing materialized view \"#{keyspace_name}.#{view_name}\"") 313 314 @schema_fetcher.fetch_materialized_view(connection, keyspace_name, view_name).map do |view| 315 if view 316 @schema.replace_materialized_view(view) 317 else 318 @schema.delete_materialized_view(keyspace_name, view_name) 319 end 320 321 @logger.info("Completed refreshing materialized view \"#{keyspace_name}.#{view_name}\"") 322 end 323 end
refresh_maybe_retry(what, *args)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 791 def refresh_maybe_retry(what, *args) 792 send(:"refresh_#{what}_async", *args).fallback do |e| 793 case e 794 when Errors::HostError, Errors::TimeoutError 795 refresh_retry(what, e, @reconnection_policy.schedule, *args) 796 else 797 connection = @connection 798 connection && connection.close(e) 799 800 Ione::Future.failed(e) 801 end 802 end 803 end
refresh_metadata_async()
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 471 def refresh_metadata_async 472 connection = @connection 473 474 return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil? 475 476 @logger.info("Refreshing connected host's metadata") 477 478 send_select_request(connection, SELECT_LOCAL).map do |local| 479 raise Errors::InternalError, "Unable to fetch connected host's metadata" if local.empty? 480 481 data = local.first 482 @registry.host_found(@address_resolver.resolve(IPAddr.new(connection.host)), data) 483 @metadata.update(data) 484 485 @logger.info("Completed refreshing connected host's metadata") 486 487 nil 488 end 489 end
refresh_peers_async()
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 440 def refresh_peers_async 441 connection = @connection 442 443 return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil? 444 445 @logger.info('Refreshing peers metadata') 446 447 send_select_request(connection, SELECT_PEERS).map do |peers| 448 @logger.debug("#{peers.size} peer(s) found") 449 450 ips = ::Set.new 451 452 peers.shuffle! 453 peers.each do |data| 454 ip = peer_ip(data, connection.host) 455 next unless ip 456 ips << ip 457 @registry.host_found(ip, data) 458 end 459 460 @registry.each_host do |host| 461 next if host.ip == connection.host 462 @registry.host_lost(host.ip) unless ips.include?(host.ip) 463 end 464 465 @logger.info('Completed refreshing peers metadata') 466 467 nil 468 end 469 end
refresh_peers_async_maybe_retry()
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 396 def refresh_peers_async_maybe_retry 397 synchronize do 398 return Ione::Future.resolved if @refreshing_hosts 399 @refreshing_hosts = true 400 end 401 402 refresh_peers_async.fallback do |e| 403 case e 404 when Errors::HostError, Errors::TimeoutError 405 refresh_peers_async_retry(e, @reconnection_policy.schedule) 406 else 407 connection = @connection 408 connection && connection.close(e) 409 410 Ione::Future.failed(e) 411 end 412 end.map do 413 synchronize do 414 @refreshing_hosts = false 415 end 416 end 417 end
refresh_peers_async_retry(error, schedule)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 419 def refresh_peers_async_retry(error, schedule) 420 timeout = schedule.next 421 @logger.info("Failed to refresh hosts (#{error.class.name}: " \ 422 "#{error.message}), retrying in #{timeout}") 423 424 timer = @io_reactor.schedule_timer(timeout) 425 timer.flat_map do 426 refresh_peers_async.fallback do |e| 427 case e 428 when Errors::HostError, Errors::TimeoutError 429 refresh_peers_async_retry(e, schedule) 430 else 431 connection = @connection 432 connection && connection.close(e) 433 434 Ione::Future.failed(e) 435 end 436 end 437 end 438 end
refresh_retry(what, error, schedule, *args)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 805 def refresh_retry(what, error, schedule, *args) 806 timeout = schedule.next 807 @logger.info("Failed to refresh #{what} #{args.inspect} " \ 808 "(#{error.class.name}: #{error.message}), retrying in #{timeout}") 809 810 timer = @io_reactor.schedule_timer(timeout) 811 timer.flat_map do 812 send(:"refresh_#{what}_async", *args).fallback do |e| 813 case e 814 when Errors::HostError, Errors::TimeoutError 815 refresh_retry(what, e, schedule, *args) 816 else 817 connection = @connection 818 connection && connection.close(e) 819 820 Ione::Future.failed(e) 821 end 822 end 823 end 824 end
refresh_schema_async()
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 257 def refresh_schema_async 258 connection = @connection 259 260 @logger.info('Refreshing schema') 261 262 return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil? 263 264 @schema_fetcher.fetch(connection).map do |keyspaces| 265 @schema.replace(keyspaces) 266 @metadata.rebuild_token_map 267 @logger.info('Schema refreshed') 268 end 269 end
refresh_schema_async_wrapper()
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 826 def refresh_schema_async_wrapper 827 # This is kinda tricky. We want to start refreshing the schema asynchronously. 828 # However, if we're already in the process of doing so, return the future 829 # representing that result rather than starting another schema refresh. 830 # 831 # A few other nuances while a refresh is in progress: 832 # * if a new attempt is made to refresh, keep track of that and schedule another 833 # refresh after the current one completes. 834 # * we don't want schema_change events to be processed since the full refresh 835 # may overwrite the results of handling the schema_change events with older 836 # data. That said, we don't want to lose track of schema_change events; just 837 # delay processing them until after the full refresh is done. 838 # 839 # Finally, when a full refresh begins, clear out any pending changes in 840 # @schema_changes because the full refresh subsumes them. This has two benefits: 841 # 1. avoid round trips to Cassandra to get details related to those schema 842 # changes. 843 # 2. avoid race conditions where those updates may return older data than our 844 # full refresh and might win as last writer with that potentially older data. 845 synchronize do 846 if @refresh_schema_future 847 @pending_schema_refresh = true 848 return @refresh_schema_future 849 end 850 851 # Fresh refresh; prep this connection! 852 853 # Since we're starting a new refresh, there can be no pending refresh request. 854 @pending_schema_refresh = false 855 856 # Clear outstanding schema changes and timers. 857 @schema_changes = [] 858 @io_reactor.cancel_timer(@schema_refresh_timer) if @schema_refresh_timer 859 @schema_refresh_timer = nil 860 @io_reactor.cancel_timer(@schema_refresh_window) if @schema_refresh_window 861 @schema_refresh_window = nil 862 863 # Start refreshing.. 864 @refresh_schema_future = refresh_maybe_retry(:schema) 865 @refresh_schema_future.on_complete do 866 pending = false 867 synchronize do 868 # We're done refreshing. If we have a pending refresh, launch it now. 869 @refresh_schema_future = nil 870 pending = @pending_schema_refresh 871 @pending_schema_refresh = false 872 unless pending 873 # Restore timers if there are pending schema changes. 874 handle_schema_change(nil) 875 end 876 end 877 878 refresh_schema_async_wrapper if pending 879 end 880 881 # Return the (now cached) future 882 @refresh_schema_future 883 end 884 end
refresh_table_async(keyspace_name, table_name)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 289 def refresh_table_async(keyspace_name, table_name) 290 connection = @connection 291 292 return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil? 293 294 @logger.info("Refreshing table \"#{keyspace_name}.#{table_name}\"") 295 296 @schema_fetcher.fetch_table(connection, keyspace_name, table_name).map do |table| 297 if table 298 @schema.replace_table(table) 299 else 300 @schema.delete_table(keyspace_name, table_name) 301 end 302 303 @logger.info("Completed refreshing table \"#{keyspace_name}.#{table_name}\"") 304 end 305 end
refresh_type_async(keyspace_name, type_name)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 325 def refresh_type_async(keyspace_name, type_name) 326 connection = @connection 327 328 return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil? 329 330 @logger.info("Refreshing user-defined type \"#{keyspace_name}.#{type_name}\"") 331 332 @schema_fetcher.fetch_type(connection, keyspace_name, type_name).map do |type| 333 if type 334 @schema.replace_type(type) 335 else 336 @schema.delete_type(keyspace_name, type_name) 337 end 338 339 @logger.info("Completed refreshing user-defined type \"#{keyspace_name}.#{type_name}\"") 340 end 341 end
register_async()
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 196 def register_async 197 connection = @connection 198 199 return Ione::Future.failed(Errors::ClientError.new('Not connected')) if connection.nil? 200 201 request = Protocol::RegisterRequest.new( 202 Protocol::TopologyChangeEventResponse::TYPE, 203 Protocol::StatusChangeEventResponse::TYPE 204 ) 205 206 request.events << Protocol::SchemaChangeEventResponse::TYPE if @connection_options.synchronize_schema? 207 208 f = connection.send_request(request) 209 f = f.map do |r| 210 case r 211 when Protocol::ReadyResponse 212 nil 213 when Protocol::ErrorResponse 214 raise r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :one, 0) 215 else 216 raise Errors::InternalError, "Unexpected response #{r.inspect}" 217 end 218 end 219 f.map do 220 connection.on_event do |event| 221 @logger.debug("Event received #{event}") 222 223 if event.type == 'SCHEMA_CHANGE' 224 handle_schema_change(event) 225 else 226 case event.change 227 when 'UP' 228 address = @address_resolver.resolve(event.address) 229 if @registry.has_host?(address) 230 @registry.host_up(address) 231 else 232 refresh_host_async_maybe_retry(event.address) 233 refresh_schema_async_wrapper 234 end 235 when 'DOWN' 236 # RUBY-164: Don't mark host down if there are active connections. We have 237 # logic in connector.rb to call host_down when all connections to a node are lost, 238 # so that covers the requirement. 239 when 'NEW_NODE' 240 address = @address_resolver.resolve(event.address) 241 242 unless @registry.has_host?(address) 243 refresh_host_async_maybe_retry(event.address) 244 refresh_schema_async_wrapper 245 end 246 when 'REMOVED_NODE' 247 @registry.host_lost(@address_resolver.resolve(event.address)) 248 refresh_schema_async_wrapper 249 end 250 end 251 end 252 253 self 254 end 255 end
send_select_request(connection, request)
click to toggle source
# File lib/cassandra/cluster/control_connection.rb 945 def send_select_request(connection, request) 946 backtrace = caller 947 connection.send_request(request).map do |r| 948 case r 949 when Protocol::RowsResultResponse 950 r.rows 951 when Protocol::ErrorResponse 952 e = r.to_error(nil, VOID_STATEMENT, VOID_OPTIONS, EMPTY_LIST, :one, 0) 953 e.set_backtrace(backtrace) 954 raise e 955 else 956 raise Errors::InternalError, "Unexpected response #{r.inspect}", caller 957 end 958 end 959 end