class RedisCluster

RedisCluster is a client for redis-cluster huh

Constants

NOOP
VERSION

Attributes

cluster[R]
cluster_opts[R]
middlewares[R]
redis_opts[R]

Public Class Methods

new(seeds, redis_opts: nil, cluster_opts: nil) click to toggle source
Calls superclass method
# File lib/redis-cluster.rb, line 18
def initialize(seeds, redis_opts: nil, cluster_opts: nil)
  @cluster_opts = cluster_opts || {}
  @redis_opts = redis_opts || {}
  @middlewares = Middlewares.new

  client_creater = method(:create_client)
  @cluster = Cluster.new(seeds, cluster_opts, &client_creater)

  super()
end

Public Instance Methods

call(*args, &block) click to toggle source
# File lib/redis-cluster.rb, line 49
def call(*args, &block)
  middlewares.invoke(:call, self, *args) do
    _call(*args, &block)
  end
end
close() click to toggle source
# File lib/redis-cluster.rb, line 41
def close
  safely{ cluster.close }
end
connected?() click to toggle source
# File lib/redis-cluster.rb, line 37
def connected?
  cluster.connected?
end
logger() click to toggle source
# File lib/redis-cluster.rb, line 29
def logger
  cluster_opts[:logger]
end
pipeline?() click to toggle source
# File lib/redis-cluster.rb, line 45
def pipeline?
  !@pipeline.nil?
end
pipelined(*args, &block) click to toggle source
# File lib/redis-cluster.rb, line 55
def pipelined(*args, &block)
  middlewares.invoke(:pipelined, self, *args) do
    _pipelined(*args, &block)
  end
end
silent?() click to toggle source
# File lib/redis-cluster.rb, line 33
def silent?
  cluster_opts[:silent]
end

Private Instance Methods

_call(keys, command, opts = {}) click to toggle source
# File lib/redis-cluster.rb, line 65
def _call(keys, command, opts = {})
  opts[:transform] ||= NOOP
  slot = cluster.slot_for(keys)

  safely do
    if pipeline?
      call_pipeline(slot, command, opts)
    else
      call_immediately(slot, command, opts)
    end
  end
end
_pipelined() { || ... } click to toggle source
# File lib/redis-cluster.rb, line 78
def _pipelined
  safely do
    return yield if pipeline?

    begin
      @pipeline = []
      yield

      try = 3
      while !@pipeline.empty? && try.positive?
        try -= 1
        moved = false
        mapped_future = map_pipeline(@pipeline)

        @pipeline = []
        mapped_future.each do |url, futures|
          leftover, error = do_pipelined(url, futures)
          moved ||= (error == :moved || error == :down)

          @pipeline.concat(leftover)
        end

        cluster.reset if moved
      end

      @pipeline.first.value unless @pipeline.empty?
    ensure
      @pipeline = nil
    end
  end
end
call_immediately(slot, command, transform:, read: false) click to toggle source
# File lib/redis-cluster.rb, line 117
def call_immediately(slot, command, transform:, read: false)
  try = 3
  asking = false
  reply = nil
  mode = read ? :read : :write
  client = cluster.client_for(mode, slot)

  while try.positive?
    begin
      try -= 1

      client.push([:asking]) if asking
      reply = client.call(command)

      err, url = scan_reply(reply)
      return transform.call(reply) unless err

      cluster.reset if err == :moved
      asking = err == :ask
      client = cluster[url]
    rescue LoadingStateError, Redis::CannotConnectError => e
      if e.is_a?(Redis::CannotConnectError)
        asking = false
        cluster.reset
      end
      client = cluster.client_for(mode, slot)
      reply = e
    end
  end

  raise reply
end
call_pipeline(slot, command, opts) click to toggle source
# File lib/redis-cluster.rb, line 150
def call_pipeline(slot, command, opts)
  Future.new(slot, command, opts[:transform]).tap do |future|
    @pipeline << future
  end
end
create_client(url) click to toggle source
# File lib/redis-cluster.rb, line 219
def create_client(url)
  host, port = url.split(':', 2)
  Client.new(redis_opts.merge(host: host, port: port)).tap do |c|
    c.middlewares = middlewares
  end
end
do_pipelined(url, futures) click to toggle source
# File lib/redis-cluster.rb, line 166
def do_pipelined(url, futures)
  error = nil
  leftover = []

  rev_index = {}
  idx = 0
  client = cluster[url]

  futures.each_with_index do |future, i|
    if future.asking
      client.push([:asking])
      idx += 1
    end

    rev_index[idx] = i
    client.push(future.command)
    idx += 1
  end

  client.commit.each_with_index do |reply, i|
    next unless rev_index[i]

    future = futures[rev_index[i]]
    future.value = reply

    err, url = scan_reply(reply)
    next unless err

    error ||= :moved if err == :moved
    future.asking = err == :ask
    future.url = url
    leftover << future
  end

  [leftover, error]
rescue LoadingStateError, Redis::CannotConnectError => e
  # reset url and asking when connection refused
  futures.each{ |f| f.url = nil; f.asking = false }

  [futures, e.is_a?(LoadingStateError) ? :loading : :down]
end
map_pipeline(pipe) click to toggle source
# File lib/redis-cluster.rb, line 156
def map_pipeline(pipe)
  futures = ::Hash.new{ |h, k| h[k] = [] }
  pipe.each do |future|
    url = future.url || cluster.client_for(:write, future.slot).url
    futures[url] << future
  end

  futures
end
safely() { || ... } click to toggle source
# File lib/redis-cluster.rb, line 110
def safely
  synchronize{ yield } if block_given?
rescue StandardError => e
  logger&.error(e)
  raise e unless silent?
end
scan_reply(reply) click to toggle source
# File lib/redis-cluster.rb, line 208
def scan_reply(reply)
  if reply.is_a?(Redis::CommandError)
    err, _slot, url = reply.to_s.split
    raise reply if err != 'MOVED' && err != 'ASK'

    [err.downcase.to_sym, url]
  elsif reply.is_a?(::RuntimeError)
    raise reply
  end
end