class RedisCluster::Cluster

Cluster implement redis cluster logic for client.

Constants

CROSSSLOT_ERROR
HASH_SLOTS
XMODEM_CRC16_LOOKUP

Attributes

client_creater[R]
clients[R]
options[R]
replicas[R]
slots[R]

Public Class Methods

new(seeds, cluster_opts = {}, &block) click to toggle source
# File lib/redis_cluster/cluster.rb, line 14
def initialize(seeds, cluster_opts = {}, &block)
  @options = cluster_opts
  @slots = []
  @clients = {}
  @replicas = nil
  @client_creater = block

  @buffer = []
  init_client(seeds)
end

Public Instance Methods

[](url) click to toggle source
# File lib/redis_cluster/cluster.rb, line 75
def [](url)
  clients[url] ||= create_client(url)
end
client_for(operation, slot) click to toggle source
# File lib/redis_cluster/cluster.rb, line 38
def client_for(operation, slot)
  mode = operation == :read ? read_mode : :master

  case mode
  when :master
    slots[slot].first
  when :slave
    pick_client(slots[slot], skip: 1) || slots[slot].first
  when :master_slave
    pick_client(slots[slot])
  end
end
close() click to toggle source
# File lib/redis_cluster/cluster.rb, line 51
def close
  clients.values.each(&:close)
end
connected?() click to toggle source
# File lib/redis_cluster/cluster.rb, line 55
def connected?
  clients.values.all?(&:connected?)
end
force_cluster?() click to toggle source
# File lib/redis_cluster/cluster.rb, line 25
def force_cluster?
  options[:force_cluster] || false
end
inspect() click to toggle source
# File lib/redis_cluster/cluster.rb, line 79
def inspect
  "#<RedisCluster cluster v#{RedisCluster::VERSION}>"
end
random() click to toggle source
# File lib/redis_cluster/cluster.rb, line 59
def random
  clients.values.sample
end
read_mode() click to toggle source
# File lib/redis_cluster/cluster.rb, line 29
def read_mode
  options[:read_mode] || :master
end
reset() click to toggle source
# File lib/redis_cluster/cluster.rb, line 63
def reset
  try = 3
  begin
    try -= 1
    client = random
    slots_and_clients(client)
  rescue StandardError => e
    clients.delete(client.url)
    try.positive? ? retry : (raise e)
  end
end
slot_for(keys) click to toggle source
# File lib/redis_cluster/cluster.rb, line 33
def slot_for(keys)
  slot = [keys].flatten.map{ |k| _slot_for(k) }.uniq
  slot.size == 1 ? slot.first : (raise CROSSSLOT_ERROR)
end

Private Instance Methods

_slot_for(key) click to toggle source

Return Redis::Client for a given key. Modified from github.com/antirez/redis-rb-cluster/blob/master/cluster.rb#L104-L117

# File lib/redis_cluster/cluster.rb, line 188
def _slot_for(key)
  key = key.to_s
  if (s = key.index('{'))
    if (e = key.index('}', s + 1)) && e != s + 1
      key = key[s + 1..e - 1]
    end
  end
  crc16(key) % HASH_SLOTS
end
crc16(bytes) click to toggle source

This is the CRC16 algorithm used by Redis Cluster to hash keys. Implementation according to CCITT standards. Copied from github.com/antirez/redis-rb-cluster/blob/master/crc16.rb

This is actually the XMODEM CRC 16 algorithm, using the following parameters:

Name : “XMODEM”, also known as “ZMODEM”, “CRC-16/ACORN” Width : 16 bit Poly : 1021 (That is actually x^16 + x^12 + x^5 + 1) Initialization : 0000 Reflect Input byte : False Reflect Output CRC : False Xor constant to output CRC : 0000 Output for “123456789” : 31C3

# File lib/redis_cluster/cluster.rb, line 178
def crc16(bytes)
  crc = 0
  bytes.each_byte do |b|
    crc = ((crc << 8) & 0xffff) ^ XMODEM_CRC16_LOOKUP[((crc >> 8) ^ b) & 0xff]
  end
  crc
end
create_client(url) click to toggle source
# File lib/redis_cluster/cluster.rb, line 151
def create_client(url)
  if client_creater
    client_creater.call(url)
  else
    host, port = url.split(':', 2)
    Client.new(host: host, port: port)
  end
end
init_client(seeds) click to toggle source
# File lib/redis_cluster/cluster.rb, line 133
def init_client(seeds)
  try = seeds.count
  err = nil

  while try.positive?
    try -= 1
    begin
      client = create_client(seeds[try])
      slots_and_clients(client)
      return
    rescue StandardError => e
      err = e
    end
  end

  raise err
end
pick_client(pool, skip: 0) click to toggle source
# File lib/redis_cluster/cluster.rb, line 85
def pick_client(pool, skip: 0)
  unhealthy_count = 0

  (skip...pool.length).each do |i|
    if pool[i].healthy
      @buffer[i - skip] = pool[i]
    else
      unhealthy_count += 1
    end
  end

  buffer_length = pool.length - skip - unhealthy_count
  return nil if buffer_length.zero?

  i = rand(buffer_length)
  @buffer[i]
end
slots_and_clients(client) click to toggle source
# File lib/redis_cluster/cluster.rb, line 103
def slots_and_clients(client)
  replicas = ::Hash.new{ |h, k| h[k] = [] }

  result = client.call(%i[cluster slots])
  if result.is_a?(StandardError)
    if result.message.eql?('ERR This instance has cluster support disabled') &&
       !force_cluster?
      host, port = client.url.split(':', 2)
      result = [[0, HASH_SLOTS - 1, [host, port, nil], [host, port, nil]]]
    else
      raise result
    end
  end

  result.each do |arr|
    arr[2..-1].each_with_index do |a, i|
      cli = self["#{a[0]}:#{a[1]}"]
      replicas[arr[0]] << cli
      cli.call([:readonly]) if i.nonzero?
    end

    (arr[0]..arr[1]).each do |slot|
      slots[slot] = replicas[arr[0]]
    end
  end

  @buffer = Array.new(clients.length) if clients.length > @buffer.length
  @replicas = replicas
end