class Cassandra::Cluster::Schema::ReplicationStrategies::NetworkTopology

@private

Public Instance Methods

replication_map(token_hosts, token_ring, replication_options) click to toggle source
   # File lib/cassandra/cluster/schema/replication_strategies/network_topology.rb
26 def replication_map(token_hosts, token_ring, replication_options)
27   racks                  = ::Hash.new
28   datacenter_token_rings = ::Hash.new
29   size                   = token_ring.size
30 
31   token_ring.each_with_index do |token, i|
32     host = token_hosts[token]
33 
34     racks[host.datacenter] ||= ::Set.new
35     racks[host.datacenter].add(host.rack)
36 
37     datacenter_token_rings[host.datacenter]  ||= {}
38     datacenter_token_rings[host.datacenter][i] = token
39   end
40 
41   replication_map = ::Hash.new
42 
43   token_ring.each_with_index do |token, i|
44     replicas = ::Set.new
45     visited  = ::Hash.new
46     skipped  = ::Hash.new
47 
48     replication_options.each do |datacenter, factor|
49       ring = datacenter_token_rings[datacenter]
50       next unless ring
51       factor = begin
52                  [Integer(factor), ring.size].min
53                rescue
54                  next
55                end
56 
57       total_racks    = racks[datacenter].size
58       visited_racks  = visited[datacenter] ||= ::Set.new
59       skipped_hosts  = skipped[datacenter] ||= ::Set.new
60       added_replicas = ::Set.new
61 
62       size.times do |j|
63         break if added_replicas.size >= factor
64 
65         tk = ring[(i + j) % size]
66         next unless tk
67         host = token_hosts[tk]
68         rack = host.rack
69 
70         # unknown rack or seen all racks
71         if rack.nil? || visited_racks.size == total_racks
72           replicas << host
73           added_replicas << host
74         elsif visited_racks.include?(rack)
75           skipped_hosts << host
76         else
77           replicas << host
78           visited_racks << rack
79           added_replicas << host
80 
81           if visited_racks.size == total_racks
82             skipped_hosts.each do |skipped_host|
83               break if added_replicas.size >= factor
84 
85               replicas << skipped_host
86               added_replicas << host
87             end
88           end
89         end
90       end
91     end
92 
93     replication_map[token] = replicas.to_a.freeze
94   end
95 
96   replication_map
97 end