class RedisCluster::Cluster
Cluster
implement redis cluster logic for client.
Constants
- CROSSSLOT_ERROR
- HASH_SLOTS
- XMODEM_CRC16_LOOKUP
Attributes
Public Class Methods
# File lib/redis_cluster/cluster.rb, line 14 def initialize(seeds, cluster_opts = {}, &block) @options = cluster_opts @slots = [] @clients = {} @replicas = nil @client_creater = block @buffer = [] init_client(seeds) end
Public Instance Methods
# File lib/redis_cluster/cluster.rb, line 75 def [](url) clients[url] ||= create_client(url) end
# File lib/redis_cluster/cluster.rb, line 38 def client_for(operation, slot) mode = operation == :read ? read_mode : :master case mode when :master slots[slot].first when :slave pick_client(slots[slot], skip: 1) || slots[slot].first when :master_slave pick_client(slots[slot]) end end
# File lib/redis_cluster/cluster.rb, line 51 def close clients.values.each(&:close) end
# File lib/redis_cluster/cluster.rb, line 55 def connected? clients.values.all?(&:connected?) end
# File lib/redis_cluster/cluster.rb, line 25 def force_cluster? options[:force_cluster] || false end
# File lib/redis_cluster/cluster.rb, line 79 def inspect "#<RedisCluster cluster v#{RedisCluster::VERSION}>" end
# File lib/redis_cluster/cluster.rb, line 59 def random clients.values.sample end
# File lib/redis_cluster/cluster.rb, line 29 def read_mode options[:read_mode] || :master end
# File lib/redis_cluster/cluster.rb, line 63 def reset try = 3 begin try -= 1 client = random slots_and_clients(client) rescue StandardError => e clients.delete(client.url) try.positive? ? retry : (raise e) end end
# File lib/redis_cluster/cluster.rb, line 33 def slot_for(keys) slot = [keys].flatten.map{ |k| _slot_for(k) }.uniq slot.size == 1 ? slot.first : (raise CROSSSLOT_ERROR) end
Private Instance Methods
Return Redis::Client for a given key. Modified from github.com/antirez/redis-rb-cluster/blob/master/cluster.rb#L104-L117
# File lib/redis_cluster/cluster.rb, line 188 def _slot_for(key) key = key.to_s if (s = key.index('{')) if (e = key.index('}', s + 1)) && e != s + 1 key = key[s + 1..e - 1] end end crc16(key) % HASH_SLOTS end
This is the CRC16 algorithm used by Redis Cluster
to hash keys. Implementation according to CCITT standards. Copied from github.com/antirez/redis-rb-cluster/blob/master/crc16.rb
This is actually the XMODEM CRC 16 algorithm, using the following parameters:
Name : “XMODEM”, also known as “ZMODEM”, “CRC-16/ACORN” Width : 16 bit Poly : 1021 (That is actually x^16 + x^12 + x^5 + 1) Initialization : 0000 Reflect Input byte : False Reflect Output CRC : False Xor constant to output CRC : 0000 Output for “123456789” : 31C3
# File lib/redis_cluster/cluster.rb, line 178 def crc16(bytes) crc = 0 bytes.each_byte do |b| crc = ((crc << 8) & 0xffff) ^ XMODEM_CRC16_LOOKUP[((crc >> 8) ^ b) & 0xff] end crc end
# File lib/redis_cluster/cluster.rb, line 151 def create_client(url) if client_creater client_creater.call(url) else host, port = url.split(':', 2) Client.new(host: host, port: port) end end
# File lib/redis_cluster/cluster.rb, line 133 def init_client(seeds) try = seeds.count err = nil while try.positive? try -= 1 begin client = create_client(seeds[try]) slots_and_clients(client) return rescue StandardError => e err = e end end raise err end
# File lib/redis_cluster/cluster.rb, line 85 def pick_client(pool, skip: 0) unhealthy_count = 0 (skip...pool.length).each do |i| if pool[i].healthy @buffer[i - skip] = pool[i] else unhealthy_count += 1 end end buffer_length = pool.length - skip - unhealthy_count return nil if buffer_length.zero? i = rand(buffer_length) @buffer[i] end
# File lib/redis_cluster/cluster.rb, line 103 def slots_and_clients(client) replicas = ::Hash.new{ |h, k| h[k] = [] } result = client.call(%i[cluster slots]) if result.is_a?(StandardError) if result.message.eql?('ERR This instance has cluster support disabled') && !force_cluster? host, port = client.url.split(':', 2) result = [[0, HASH_SLOTS - 1, [host, port, nil], [host, port, nil]]] else raise result end end result.each do |arr| arr[2..-1].each_with_index do |a, i| cli = self["#{a[0]}:#{a[1]}"] replicas[arr[0]] << cli cli.call([:readonly]) if i.nonzero? end (arr[0]..arr[1]).each do |slot| slots[slot] = replicas[arr[0]] end end @buffer = Array.new(clients.length) if clients.length > @buffer.length @replicas = replicas end