module Cassandra

This file monkey-patches Module to have an attr_boolean method to make it easy for classes to define boolean instance variables with “foo?” reader methods. Inspired by stackoverflow.com/questions/4013591/attr-reader-with-question-mark-in-a-name

Use this module to mark domain object classes as custom type implementations for custom-type columns in C*. This module has no logic of its own, but indicates that the marked class has certain methods. @private

Constants

CLUSTER_OPTIONS
CONSISTENCIES

A list of all supported request consistencies @see www.datastax.com/documentation/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html Consistency

levels in Apache Cassandra 2.0

@see www.datastax.com/documentation/cassandra/1.2/cassandra/dml/dml_config_consistency_c.html Consistency

levels in Apache Cassandra 1.2

@see Cassandra::Session#execute_async

DATE_OFFSET

@private ensures that: ::Date.jd(DATE_OFFSET, ::Date::GREGORIAN)

> -5877641-06-23

::Date.jd(DATE_OFFSET + 2 ** 31, ::Date::GREGORIAN)

> 1970-1-1

::Date.jd(DATE_OFFSET + 2 ** 32, ::Date::GREGORIAN)

> 5881580-07-12

EMPTY_LIST

@private

Murmur3
NOT_SET

@private

NO_HOSTS

@private

NULL_BYTE

@private

SERIAL_CONSISTENCIES

A list of all supported serial consistencies @see Cassandra::Session#execute_async

SSL_CLASSES

@private

VERSION
VOID_OPTIONS

@private

VOID_STATEMENT

@private

WRITE_TYPES

A list of all possible write types that a {Cassandra::Errors::WriteTimeoutError} can have.

@see github.com/apache/cassandra/blob/cassandra-2.0.16/doc/native_protocol_v2.spec#L872-L887 Description of

possible types of writes in Apache Cassandra native protocol spec v1

Public Class Methods

cluster(options = {}) click to toggle source

Creates a {Cassandra::Cluster Cluster instance}.

@option options [Array<String, IPAddr>] :hosts (['127.0.0.1']) a list of

initial addresses. Note that the entire list of cluster members will be
discovered automatically once a connection to any hosts from the original
list is successful.

@option options [Integer] :port (9042) cassandra native protocol port.

@option options [Boolean] :nodelay (true) when set to `true`, disables

nagle algorithm.

@option options [String] :datacenter (nil) name of current datacenter.

First datacenter found will be assumed current by default. Note that you
can skip this option if you specify only hosts from the local datacenter
in `:hosts` option.

@option options [Boolean] :shuffle_replicas (true) whether replicas list

found by the default Token-Aware Load Balancing Policy should be
shuffled. See {Cassandra::LoadBalancing::Policies::TokenAware#initialize Token-Aware Load Balancing Policy}.

@option options [Hash<String|Symbol, ExecutionProfile>] :execution_profiles (nil)

Hash of {Cassandra::Execution::Profile}s that are available for client use (e.g.
{Session#execute}, {Session#execute_async}, {Session#prepare}, and {Session#prepare_async}).

@option options [Numeric] :connect_timeout (10) connection timeout in

seconds. Setting value to `nil` will reset it to 5 seconds.

@option options [Numeric] :timeout (12) request execution timeout in

seconds. Setting value to `nil` will remove request timeout.

@option options [Numeric] :heartbeat_interval (30) how often should a

heartbeat be sent to determine if a connection is alive. Several things to
note about this option. Only one heartbeat request will ever be
outstanding on a given connection. Each heatbeat will be sent in at least
`:heartbeat_interval` seconds after the last request has been sent on a
given connection. Setting value to `nil` will remove connection timeout.

@option options [Numeric] :idle_timeout (60) period of inactivity after

which a connection is considered dead. Note that this value should be at
least a few times larger than `:heartbeat_interval`. Setting value to
`nil` will remove automatic connection termination.

@option options [String] :username (none) username to use for

authentication to cassandra. Note that you must also specify `:password`.

@option options [String] :password (none) password to use for

authentication to cassandra. Note that you must also specify `:username`.

@option options [Boolean, OpenSSL::SSL::SSLContext] :ssl (false) enable

default ssl authentication if `true` (not recommended). Also accepts an
initialized {OpenSSL::SSL::SSLContext}. Note that this option should be
ignored if `:server_cert`, `:client_cert`, `:private_key` or
`:passphrase` are given.

@option options [String] :server_cert (none) path to server certificate or

certificate authority file.

@option options [String] :client_cert (none) path to client certificate

file. Note that this option is only required when encryption is
configured to require client authentication.

@option options [String] :private_key (none) path to client private key.

Note that this option is only required when encryption is configured to
require client authentication.

@option options [String] :passphrase (none) passphrase for private key.

@option options [Symbol] :compression (none) compression to use. Must be

either `:snappy` or `:lz4`. Also note, that in order for compression to
work, you must install 'snappy' or 'lz4-ruby' gems.

@option options [Cassandra::LoadBalancing::Policy] :load_balancing_policy

default: token aware data center aware round robin.

@option options [Symbol] :address_resolution (:none) a pre-configured

address resolver to use. Must be one of `:none` or
`:ec2_multi_region`.

@option options [Integer] :connections_per_local_node (nil) Number of connections to

open to each local node; the value of this option directly correlates to the number
of requests the client can make to the local node concurrently. When `nil`, the
setting is `1` for nodes that use the v3 or later protocol, and `2` for nodes that
use the v2 or earlier protocol.

@option options [Integer] :connections_per_remote_node (1) Number of connections to

open to each remote node; the value of this option directly correlates to the
number of requests the client can make to the remote node concurrently.

@option options [Integer] :requests_per_connection (nil) Number of outstanding

requests to support on one connection. Depending on the types of requests, some may
get processed in parallel in the Cassandra node. When `nil`, the setting is `1024`
for nodes that use the v3 or later protocol, and `128` for nodes that use the
v2 or earlier protocol.

@option options [Integer] :protocol_version (nil) Version of protocol to speak to

nodes. By default, this is auto-negotiated to the highest common protocol version
that all nodes in `:hosts` speak.

@option options [Boolean] :allow_beta_protocol (false) whether the driver should attempt to speak to nodes

with a beta version of the newest protocol (which is still under development). USE WITH CAUTION!

@option options [Boolean, Cassandra::TimestampGenerator] :client_timestamps (false) whether the driver

should send timestamps for each executed statement and possibly which timestamp generator to use. Enabling this
setting helps mitigate Cassandra cluster clock skew because the timestamp of the client machine will be used.
This does not help mitigate application cluster clock skew. Also accepts an initialized
{Cassandra::TimestampGenerator}, `:simple` (indicating an instance of {Cassandra::TimestampGenerator::Simple}),
or `:monotonic` (indicating an instance of {Cassandra::TimestampGenerator::TickingOnDuplicate}). If set to true,
it defaults to {Cassandra::TimestampGenerator::Simple} for all Ruby flavors except JRuby. On JRuby, it defaults to
{Cassandra::TimestampGenerator::TickingOnDuplicate}.

@option options [Boolean] :synchronize_schema (true) whether the driver

should automatically keep schema metadata synchronized. When enabled, the
driver updates schema metadata after receiving schema change
notifications from Cassandra. Setting this setting to `false` disables
automatic schema updates. Schema metadata is used by the driver to
determine cluster partitioners as well as to find partition keys and
replicas of prepared statements, this information makes token aware load
balancing possible. One can still
{Cassandra::Cluster#refresh_schema refresh schema manually}.

@option options [Numeric] :schema_refresh_delay (1) the driver will wait

for `:schema_refresh_delay` before fetching metadata after receiving a
schema change event. This timer is restarted every time a new schema
change event is received. Finally, when the timer expires or a maximum
wait time of `:schema_refresh_timeout` has been reached, a schema refresh
attempt will be made and the timeout is reset.

@option options [Numeric] :schema_refresh_timeout (10) the maximum delay

before automatically refreshing schema. Such delay can occur whenever
multiple schema change events are continuously arriving within
`:schema_refresh_delay` interval.

@option options [Cassandra::Reconnection::Policy] :reconnection_policy

default: {Cassandra::Reconnection::Policies::Exponential Exponential}.
Note that the default policy is configured with `(0.5, 30, 2)`.

@option options [Cassandra::Retry::Policy] :retry_policy default:

{Cassandra::Retry::Policies::Default Default Retry Policy}.

@option options [Logger] :logger (none) logger. a {Logger} instance from the

standard library or any object responding to standard log methods
(`#debug`, `#info`, `#warn`, `#error` and `#fatal`).

@option options [Enumerable<Cassandra::Listener>] :listeners (none)

initial listeners. A list of initial cluster state listeners. Note that a
`:load_balancing` policy is automatically registered with the cluster.

@option options [Symbol] :consistency (:local_one) default consistency

to use for all requests. Must be one of {Cassandra::CONSISTENCIES}.

@option options [Boolean] :trace (false) whether or not to trace all

requests by default.

@option options [Integer] :page_size (10000) default page size for all

select queries. Set this value to `nil` to disable paging.

@option options [Hash{String => String}] :credentials (none) a hash of credentials -

to be used with [credentials authentication in cassandra 1.2](https://github.com/apache/cassandra/blob/cassandra-2.0.16/doc/native_protocol_v1.spec#L238-L250).
Note that if you specified `:username` and `:password` options, those credentials
are configured automatically.

@option options [Cassandra::Auth::Provider] :auth_provider (none) a custom auth

provider to be used with [SASL authentication in cassandra 2.0](https://github.com/apache/cassandra/blob/cassandra-2.0.16/doc/native_protocol_v2.spec#L257-L273).
Note that if you have specified `:username` and `:password`, then a
{Cassandra::Auth::Providers::Password Password Provider} will be used automatically.

@option options [Cassandra::Compression::Compressor] :compressor (none) a

custom compressor. Note that if you have specified `:compression`, an
appropriate compressor will be provided automatically.

@option options [Cassandra::AddressResolution::Policy]

:address_resolution_policy default:
{Cassandra::AddressResolution::Policies::None No Resolution Policy} a custom address
resolution policy. Note that if you have specified `:address_resolution`, an
appropriate address resolution policy will be provided automatically.

@option options [Object<#all, error, value, promise>] :futures_factory

default: {Cassandra::Future} a futures factory to assist with integration
into existing futures library. Note that promises returned by this object
must conform to {Cassandra::Promise} api, which is not yet public. Things
may change, use at your own risk.

@example Connecting to localhost

cluster = Cassandra.cluster

@example Configuring {Cassandra::Cluster}

cluster = Cassandra.cluster(
            username: username,
            password: password,
            hosts: ['10.0.1.1', '10.0.1.2', '10.0.1.3']
          )

@return [Cassandra::Cluster] a cluster instance

    # File lib/cassandra.rb
295 def self.cluster(options = {})
296   cluster_async(options).get
297 end
cluster_async(options = {}) click to toggle source

Creates a {Cassandra::Cluster Cluster instance}.

@see Cassandra.cluster

@return [Cassandra::Future<Cassandra::Cluster>] a future resolving to the

cluster instance.
    # File lib/cassandra.rb
305 def self.cluster_async(options = {})
306   options, hosts = validate_and_massage_options(options)
307 rescue => e
308   futures = options.fetch(:futures_factory) { return Future::Error.new(e) }
309   futures.error(e)
310 else
311   driver = Driver.new(options)
312   driver.connect(hosts)
313 end
validate_and_massage_options(options) click to toggle source

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/PerceivedComplexity @private

    # File lib/cassandra.rb
322 def self.validate_and_massage_options(options)
323   options = options.select do |key, _|
324     CLUSTER_OPTIONS.include?(key)
325   end
326 
327   if options.key?(:execution_profiles)
328     [:load_balancing_policy, :retry_policy, :timeout, :consistency].each do |opt|
329       raise ::ArgumentError, "#{opt} is not allowed when execution profiles are used" if options.key?(opt)
330     end
331   end
332 
333   has_username = options.key?(:username)
334   has_password = options.key?(:password)
335   if has_username || has_password
336     if has_username && !has_password
337       raise ::ArgumentError,
338             'both :username and :password options must be specified, ' \
339                 'but only :username given'
340     end
341 
342     if !has_username && has_password
343       raise ::ArgumentError,
344             'both :username and :password options must be specified, ' \
345                 'but only :password given'
346     end
347 
348     username = options.delete(:username)
349     password = options.delete(:password)
350 
351     Util.assert_instance_of(::String, username) do
352       ":username must be a String, #{username.inspect} given"
353     end
354     Util.assert_instance_of(::String, password) do
355       ":password must be a String, #{password.inspect} given"
356     end
357     Util.assert_not_empty(username) { ':username cannot be empty' }
358     Util.assert_not_empty(password) { ':password cannot be empty' }
359 
360     options[:credentials]   = {username: username, password: password}
361     options[:auth_provider] = Auth::Providers::Password.new(username, password)
362   end
363 
364   if options.key?(:credentials)
365     credentials = options[:credentials]
366 
367     Util.assert_instance_of(::Hash, credentials) do
368       ":credentials must be a hash, #{credentials.inspect} given"
369     end
370   end
371 
372   if options.key?(:auth_provider)
373     auth_provider = options[:auth_provider]
374 
375     Util.assert_responds_to(:create_authenticator, auth_provider) do
376       ":auth_provider #{auth_provider.inspect} must respond to " \
377           ":create_authenticator, but doesn't"
378     end
379   end
380 
381   has_client_cert = options.key?(:client_cert)
382   has_private_key = options.key?(:private_key)
383 
384   if has_client_cert || has_private_key
385     if has_client_cert && !has_private_key
386       raise ::ArgumentError,
387             'both :client_cert and :private_key options must be specified, ' \
388                 'but only :client_cert given'
389     end
390 
391     if !has_client_cert && has_private_key
392       raise ::ArgumentError,
393             'both :client_cert and :private_key options must be specified, ' \
394                 'but only :private_key given'
395     end
396 
397     Util.assert_instance_of(::String, options[:client_cert]) do
398       ":client_cert must be a string, #{options[:client_cert].inspect} given"
399     end
400     Util.assert_instance_of(::String, options[:private_key]) do
401       ":client_cert must be a string, #{options[:private_key].inspect} given"
402     end
403     client_cert = ::File.expand_path(options[:client_cert])
404     private_key = ::File.expand_path(options[:private_key])
405 
406     Util.assert_file_exists(client_cert) do
407       ":client_cert #{client_cert.inspect} doesn't exist"
408     end
409     Util.assert_file_exists(private_key) do
410       ":private_key #{private_key.inspect} doesn't exist"
411     end
412   end
413 
414   has_server_cert = options.key?(:server_cert)
415 
416   if has_server_cert
417     Util.assert_instance_of(::String, options[:server_cert]) do
418       ":server_cert must be a string, #{options[:server_cert].inspect} given"
419     end
420     server_cert = ::File.expand_path(options[:server_cert])
421 
422     Util.assert_file_exists(server_cert) do
423       ":server_cert #{server_cert.inspect} doesn't exist"
424     end
425   end
426 
427   if has_client_cert || has_server_cert
428     context = ::OpenSSL::SSL::SSLContext.new
429 
430     if has_server_cert
431       context.ca_file     = server_cert
432       context.verify_mode = ::OpenSSL::SSL::VERIFY_PEER
433     end
434 
435     if has_client_cert
436       context.cert = ::OpenSSL::X509::Certificate.new(File.read(client_cert))
437 
438       context.key = if options.key?(:passphrase)
439                       ::OpenSSL::PKey::RSA.new(File.read(private_key),
440                                                options[:passphrase])
441                     else
442                       ::OpenSSL::PKey::RSA.new(File.read(private_key))
443                     end
444     end
445 
446     options[:ssl] = context
447   end
448 
449   if options.key?(:ssl)
450     ssl = options[:ssl]
451 
452     Util.assert_instance_of_one_of(SSL_CLASSES, ssl) do
453       ":ssl must be a boolean or an OpenSSL::SSL::SSLContext, #{ssl.inspect} given"
454     end
455   end
456 
457   if options.key?(:compression)
458     compression = options.delete(:compression)
459 
460     case compression
461     when :snappy
462       options[:compressor] = Compression::Compressors::Snappy.new
463     when :lz4
464       options[:compressor] = Compression::Compressors::Lz4.new
465     else
466       raise ::ArgumentError,
467             ":compression must be either :snappy or :lz4, #{compression.inspect} given"
468     end
469   end
470 
471   if options.key?(:compressor)
472     compressor = options[:compressor]
473     methods    = [:algorithm, :compress?, :compress, :decompress]
474 
475     Util.assert_responds_to_all(methods, compressor) do
476       ":compressor #{compressor.inspect} must respond to #{methods.inspect}, " \
477           "but doesn't"
478     end
479   end
480 
481   if options.key?(:logger)
482     if options[:logger].nil?
483       # Delete the key because we want to fallback to the default logger in Driver.
484       options.delete(:logger)
485     else
486       # Validate
487       logger = options[:logger]
488       methods = [:debug, :info, :warn, :error, :fatal]
489 
490       Util.assert_responds_to_all(methods, logger) do
491         ":logger #{logger.inspect} must respond to #{methods.inspect}, but doesn't"
492       end
493     end
494   end
495 
496   if options.key?(:port)
497     unless options[:port].nil?
498       port = options[:port]
499       Util.assert_instance_of(::Integer, port)
500       Util.assert_one_of(1...2**16, port) do
501         ":port must be a valid ip port, #{port} given"
502       end
503     end
504   end
505 
506   options[:datacenter] = String(options[:datacenter]) if options.key?(:datacenter)
507 
508   if options.key?(:connect_timeout)
509     timeout = options[:connect_timeout]
510 
511     unless timeout.nil?
512       Util.assert_instance_of(::Numeric, timeout) do
513         ":connect_timeout must be a number of seconds, #{timeout.inspect} given"
514       end
515       Util.assert(timeout > 0) do
516         ":connect_timeout must be greater than 0, #{timeout} given"
517       end
518     end
519   end
520 
521   if options.key?(:execution_profiles)
522     Util.assert_instance_of(::Hash, options[:execution_profiles],
523                             ':execution_profiles must be a hash of <name,ExecutionProfile> entries.')
524   end
525 
526   if options.key?(:heartbeat_interval)
527     timeout = options[:heartbeat_interval]
528 
529     unless timeout.nil?
530       Util.assert_instance_of(::Numeric, timeout) do
531         ":heartbeat_interval must be a number of seconds, #{timeout.inspect} given"
532       end
533       Util.assert(timeout > 0) do
534         ":heartbeat_interval must be greater than 0, #{timeout} given"
535       end
536     end
537   end
538 
539   if options.key?(:idle_timeout)
540     timeout = options[:idle_timeout]
541 
542     unless timeout.nil?
543       Util.assert_instance_of(::Numeric, timeout) do
544         ":idle_timeout must be a number of seconds, #{timeout.inspect} given"
545       end
546       Util.assert(timeout > 0) do
547         ":idle_timeout must be greater than 0, #{timeout} given"
548       end
549     end
550   end
551 
552   if options.key?(:schema_refresh_delay)
553     timeout = options[:schema_refresh_delay]
554 
555     Util.assert_instance_of(::Numeric, timeout) do
556       ":schema_refresh_delay must be a number of seconds, #{timeout.inspect} given"
557     end
558     Util.assert(timeout > 0) do
559       ":schema_refresh_delay must be greater than 0, #{timeout} given"
560     end
561   end
562 
563   if options.key?(:schema_refresh_timeout)
564     timeout = options[:schema_refresh_timeout]
565 
566     Util.assert_instance_of(::Numeric, timeout) do
567       ":schema_refresh_timeout must be a number of seconds, #{timeout.inspect} given"
568     end
569     Util.assert(timeout > 0) do
570       ":schema_refresh_timeout must be greater than 0, #{timeout} given"
571     end
572   end
573 
574   if options.key?(:reconnection_policy)
575     reconnection_policy = options[:reconnection_policy]
576 
577     Util.assert_responds_to(:schedule, reconnection_policy) do
578       ":reconnection_policy #{reconnection_policy.inspect} must respond to " \
579           ":schedule, but doesn't"
580     end
581   end
582 
583   # Validate options that go in an execution profile. Instantiating one
584   # causes validation automatically.
585   Cassandra::Execution::Profile.new(options)
586 
587   options[:listeners] = Array(options[:listeners]) if options.key?(:listeners)
588   options[:nodelay] = !!options[:nodelay] if options.key?(:nodelay)
589   options[:trace] = !!options[:trace] if options.key?(:trace)
590   options[:shuffle_replicas] = !!options[:shuffle_replicas] if options.key?(:shuffle_replicas)
591   options[:allow_beta_protocol] = !!options[:allow_beta_protocol] if options.key?(:allow_beta_protocol)
592 
593   if options.key?(:page_size)
594     page_size = options[:page_size]
595 
596     unless page_size.nil?
597       page_size = options[:page_size]
598       Util.assert_instance_of(::Integer, page_size)
599       Util.assert_one_of(1...2**32, page_size) do
600         ":page_size must be a positive integer, #{page_size.inspect} given"
601       end
602     end
603   end
604 
605   if options.key?(:protocol_version)
606     protocol_version = options[:protocol_version]
607     unless protocol_version.nil?
608       Util.assert_instance_of(::Integer, protocol_version)
609       Util.assert_one_of(1..Cassandra::Protocol::Versions::MAX_SUPPORTED_VERSION, protocol_version,
610                          ':protocol_version must be a positive integer between 1 and ' \
611                          "#{Cassandra::Protocol::Versions::MAX_SUPPORTED_VERSION}, #{protocol_version.inspect} given")
612     end
613   end
614 
615   Util.assert(!(options[:allow_beta_protocol] && options[:protocol_version]),
616               'only one of :allow_beta_protocol and :protocol_version may be specified, both given')
617 
618   if options.key?(:futures_factory)
619     futures_factory = options[:futures_factory]
620     methods = [:error, :value, :promise, :all]
621 
622     Util.assert_responds_to_all(methods, futures_factory) do
623       ":futures_factory #{futures_factory.inspect} must respond to " \
624           "#{methods.inspect}, but doesn't"
625     end
626   end
627 
628   if options.key?(:address_resolution)
629     address_resolution = options.delete(:address_resolution)
630 
631     case address_resolution
632     when :none
633     # do nothing
634     when :ec2_multi_region
635       options[:address_resolution_policy] =
636         AddressResolution::Policies::EC2MultiRegion.new
637     else
638       raise ::ArgumentError,
639             ':address_resolution must be either :none or :ec2_multi_region, ' \
640               "#{address_resolution.inspect} given"
641     end
642   end
643 
644   if options.key?(:address_resolution_policy)
645     address_resolver = options[:address_resolution_policy]
646 
647     Util.assert_responds_to(:resolve, address_resolver) do
648       ':address_resolution_policy must respond to :resolve, ' \
649           "#{address_resolver.inspect} but doesn't"
650     end
651   end
652 
653   options[:synchronize_schema] = !!options[:synchronize_schema] if options.key?(:synchronize_schema)
654 
655   if options.key?(:client_timestamps)
656     timestamp_generator = case options[:client_timestamps]
657                           when true
658                             if RUBY_ENGINE == 'jruby'
659                               Cassandra::TimestampGenerator::TickingOnDuplicate.new
660                             else
661                               Cassandra::TimestampGenerator::Simple.new
662                             end
663                           when false
664                             nil
665                           when :simple
666                             Cassandra::TimestampGenerator::Simple.new
667                           when :monotonic
668                             Cassandra::TimestampGenerator::TickingOnDuplicate.new
669                           else
670                             # The value must be a generator instance.
671                             options[:client_timestamps]
672                           end
673 
674     if timestamp_generator
675       Util.assert_responds_to(:next, timestamp_generator) do
676         ":client_timestamps #{options[:client_timestamps].inspect} must be a boolean, :simple, :monotonic, or " \
677         'an object that responds to :next'
678       end
679     end
680     options.delete(:client_timestamps)
681     options[:timestamp_generator] = timestamp_generator
682   end
683 
684   if options.key?(:connections_per_local_node)
685     connections_per_node = options[:connections_per_local_node]
686 
687     unless connections_per_node.nil?
688       connections_per_node = options[:connections_per_local_node]
689       Util.assert_instance_of(::Integer, connections_per_node)
690       Util.assert_one_of(1...2**16, connections_per_node) do
691         ':connections_per_local_node must be a positive integer between ' \
692             "1 and 65535, #{connections_per_node.inspect} given"
693       end
694     end
695   end
696 
697   if options.key?(:connections_per_remote_node)
698     connections_per_node = options[:connections_per_remote_node]
699 
700     unless connections_per_node.nil?
701       connections_per_node = options[:connections_per_remote_node]
702       Util.assert_instance_of(::Integer, connections_per_node)
703       Util.assert_one_of(1...2**16, connections_per_node) do
704         ':connections_per_remote_node must be a positive integer between ' \
705             "1 and 65535, #{connections_per_node.inspect} given"
706       end
707     end
708   end
709 
710   if options.key?(:requests_per_connection)
711     requests_per_connection = options[:requests_per_connection]
712 
713     unless requests_per_connection.nil?
714       requests_per_connection = options[:requests_per_connection]
715       Util.assert_instance_of(::Integer, requests_per_connection)
716 
717       # v3 protocol says that max stream-id is 32767 (2^15-1). This setting might be
718       # used to talk to a v2 (or less) node, but then we'll adjust it down.
719 
720       Util.assert_one_of(1...2**15, requests_per_connection) do
721         ':requests_per_connection must be a positive integer, ' \
722             "#{requests_per_connection.inspect} given"
723       end
724     end
725   end
726 
727   # Get host addresses.
728   hosts = []
729 
730   Array(options.fetch(:hosts, '127.0.0.1')).each do |host|
731     case host
732     when ::IPAddr
733       hosts << host
734     when ::String # ip address or hostname
735       Resolv.each_address(host) do |ip|
736         hosts << ::IPAddr.new(ip)
737       end
738     else
739       raise ::ArgumentError, ":hosts must be String or IPAddr, #{host.inspect} given"
740     end
741   end
742 
743   if hosts.empty?
744     raise ::ArgumentError,
745           ":hosts #{options[:hosts].inspect} could not be resolved to any ip address"
746   end
747 
748   hosts.shuffle!
749 
750   [options, hosts]
751 end