class RedisCluster
RedisCluster
is a client for redis-cluster huh
Constants
- NOOP
- VERSION
Attributes
cluster[R]
cluster_opts[R]
middlewares[R]
redis_opts[R]
Public Class Methods
new(seeds, redis_opts: nil, cluster_opts: nil)
click to toggle source
Calls superclass method
# File lib/redis-cluster.rb, line 18 def initialize(seeds, redis_opts: nil, cluster_opts: nil) @cluster_opts = cluster_opts || {} @redis_opts = redis_opts || {} @middlewares = Middlewares.new client_creater = method(:create_client) @cluster = Cluster.new(seeds, cluster_opts, &client_creater) super() end
Public Instance Methods
call(*args, &block)
click to toggle source
# File lib/redis-cluster.rb, line 49 def call(*args, &block) middlewares.invoke(:call, self, *args) do _call(*args, &block) end end
close()
click to toggle source
# File lib/redis-cluster.rb, line 41 def close safely{ cluster.close } end
connected?()
click to toggle source
# File lib/redis-cluster.rb, line 37 def connected? cluster.connected? end
logger()
click to toggle source
# File lib/redis-cluster.rb, line 29 def logger cluster_opts[:logger] end
pipeline?()
click to toggle source
# File lib/redis-cluster.rb, line 45 def pipeline? !@pipeline.nil? end
pipelined(*args, &block)
click to toggle source
# File lib/redis-cluster.rb, line 55 def pipelined(*args, &block) middlewares.invoke(:pipelined, self, *args) do _pipelined(*args, &block) end end
silent?()
click to toggle source
# File lib/redis-cluster.rb, line 33 def silent? cluster_opts[:silent] end
Private Instance Methods
_call(keys, command, opts = {})
click to toggle source
# File lib/redis-cluster.rb, line 65 def _call(keys, command, opts = {}) opts[:transform] ||= NOOP slot = cluster.slot_for(keys) safely do if pipeline? call_pipeline(slot, command, opts) else call_immediately(slot, command, opts) end end end
_pipelined() { || ... }
click to toggle source
# File lib/redis-cluster.rb, line 78 def _pipelined safely do return yield if pipeline? begin @pipeline = [] yield try = 3 while !@pipeline.empty? && try.positive? try -= 1 moved = false mapped_future = map_pipeline(@pipeline) @pipeline = [] mapped_future.each do |url, futures| leftover, error = do_pipelined(url, futures) moved ||= (error == :moved || error == :down) @pipeline.concat(leftover) end cluster.reset if moved end @pipeline.first.value unless @pipeline.empty? ensure @pipeline = nil end end end
call_immediately(slot, command, transform:, read: false)
click to toggle source
# File lib/redis-cluster.rb, line 117 def call_immediately(slot, command, transform:, read: false) try = 3 asking = false reply = nil mode = read ? :read : :write client = cluster.client_for(mode, slot) while try.positive? begin try -= 1 client.push([:asking]) if asking reply = client.call(command) err, url = scan_reply(reply) return transform.call(reply) unless err cluster.reset if err == :moved asking = err == :ask client = cluster[url] rescue LoadingStateError, Redis::CannotConnectError => e if e.is_a?(Redis::CannotConnectError) asking = false cluster.reset end client = cluster.client_for(mode, slot) reply = e end end raise reply end
call_pipeline(slot, command, opts)
click to toggle source
# File lib/redis-cluster.rb, line 150 def call_pipeline(slot, command, opts) Future.new(slot, command, opts[:transform]).tap do |future| @pipeline << future end end
create_client(url)
click to toggle source
# File lib/redis-cluster.rb, line 219 def create_client(url) host, port = url.split(':', 2) Client.new(redis_opts.merge(host: host, port: port)).tap do |c| c.middlewares = middlewares end end
do_pipelined(url, futures)
click to toggle source
# File lib/redis-cluster.rb, line 166 def do_pipelined(url, futures) error = nil leftover = [] rev_index = {} idx = 0 client = cluster[url] futures.each_with_index do |future, i| if future.asking client.push([:asking]) idx += 1 end rev_index[idx] = i client.push(future.command) idx += 1 end client.commit.each_with_index do |reply, i| next unless rev_index[i] future = futures[rev_index[i]] future.value = reply err, url = scan_reply(reply) next unless err error ||= :moved if err == :moved future.asking = err == :ask future.url = url leftover << future end [leftover, error] rescue LoadingStateError, Redis::CannotConnectError => e # reset url and asking when connection refused futures.each{ |f| f.url = nil; f.asking = false } [futures, e.is_a?(LoadingStateError) ? :loading : :down] end
map_pipeline(pipe)
click to toggle source
# File lib/redis-cluster.rb, line 156 def map_pipeline(pipe) futures = ::Hash.new{ |h, k| h[k] = [] } pipe.each do |future| url = future.url || cluster.client_for(:write, future.slot).url futures[url] << future end futures end
safely() { || ... }
click to toggle source
# File lib/redis-cluster.rb, line 110 def safely synchronize{ yield } if block_given? rescue StandardError => e logger&.error(e) raise e unless silent? end
scan_reply(reply)
click to toggle source
# File lib/redis-cluster.rb, line 208 def scan_reply(reply) if reply.is_a?(Redis::CommandError) err, _slot, url = reply.to_s.split raise reply if err != 'MOVED' && err != 'ASK' [err.downcase.to_sym, url] elsif reply.is_a?(::RuntimeError) raise reply end end