class RailsFailover::Redis::Handler

Constants

PRIMARY_LOADED_STATUS
PRIMARY_ROLE_STATUS
SOFT_DISCONNECT_POLL_SECONDS
SOFT_DISCONNECT_TIMEOUT_SECONDS
VERIFY_FREQUENCY_BUFFER_PRECENT

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/rails_failover/redis/handler.rb, line 19
def initialize
  @primaries_down = Concurrent::Map.new
  @clients = Concurrent::Map.new

  super() # Monitor#initialize
end

Public Instance Methods

deregister_client(client) click to toggle source
# File lib/rails_failover/redis/handler.rb, line 41
def deregister_client(client)
  id = client.options[:id]
  clients_for_id(id).delete(client)
end
primaries_down_count() click to toggle source
# File lib/rails_failover/redis/handler.rb, line 50
def primaries_down_count
  primaries_down.size
end
primary_down?(options) click to toggle source
# File lib/rails_failover/redis/handler.rb, line 46
def primary_down?(options)
  primaries_down[options[:id]]
end
register_client(client) click to toggle source
# File lib/rails_failover/redis/handler.rb, line 36
def register_client(client)
  id = client.options[:id]
  clients_for_id(id).put_if_absent(client, true)
end
verify_primary(options) click to toggle source
# File lib/rails_failover/redis/handler.rb, line 26
def verify_primary(options)
  primary_down(options)

  mon_synchronize do
    return if @thread&.alive?
    logger&.warn "Failover for Redis has been initiated"
    @thread = Thread.new { loop_until_all_up }
  end
end

Private Instance Methods

all_primaries_up() click to toggle source
# File lib/rails_failover/redis/handler.rb, line 101
def all_primaries_up
  primaries_down.empty?
end
clients() click to toggle source
# File lib/rails_failover/redis/handler.rb, line 133
def clients
  ancestor_pids = nil
  clients_for_pid = @clients.compute_if_absent(Process.pid) do
    ancestor_pids = @clients.keys
    Concurrent::Map.new
  end
  ancestor_pids&.each { |k| @clients.delete(k) }
  clients_for_pid
end
clients_for_id(id) click to toggle source
# File lib/rails_failover/redis/handler.rb, line 129
def clients_for_id(id)
  clients.compute_if_absent(id) { Concurrent::Map.new }
end
disconnect_clients(options, role) click to toggle source
# File lib/rails_failover/redis/handler.rb, line 149
def disconnect_clients(options, role)
  id = options[:id]

  matched_clients = clients_for_id(id)&.keys
    &.filter { |c| c.connection.rails_failover_role == role }
    &.to_set

  return if matched_clients.nil? || matched_clients.empty?

  # This is not ideal, but the mutex we need is contained
  # in the ::Redis instance, not the Redis::Client
  ObjectSpace.each_object(::Redis) do |redis|
    # When subscribed, Redis#_client is not a Redis::Client
    # Instance variable is the only reliable way
    client = redis.instance_variable_get(:@original_client)
    next if !matched_clients.include?(client)
    soft_disconnect(redis, client, role)
  end
end
ensure_primary_clients_disconnected() click to toggle source
# File lib/rails_failover/redis/handler.rb, line 143
def ensure_primary_clients_disconnected
  primaries_down.each do |key, options|
    disconnect_clients(options, RailsFailover::Redis::PRIMARY)
  end
end
logger() click to toggle source
# File lib/rails_failover/redis/handler.rb, line 197
def logger
  RailsFailover::Redis.logger
end
loop_until_all_up() click to toggle source
# File lib/rails_failover/redis/handler.rb, line 56
def loop_until_all_up
  loop do
    ensure_primary_clients_disconnected
    try_fallback_to_primary

    if all_primaries_up
      logger&.warn "Fallback to primary for Redis has been completed."
      break
    end
  end
end
primaries_down() click to toggle source
# File lib/rails_failover/redis/handler.rb, line 115
def primaries_down
  ancestor_pids = nil
  value = @primaries_down.compute_if_absent(Process.pid) do
    ancestor_pids = @primaries_down.keys
    @primaries_down.values.first || Concurrent::Map.new
  end

  ancestor_pids&.each do |pid|
    @primaries_down.delete(pid)&.each { |id, options| verify_primary(options) }
  end

  value
end
primary_down(options) click to toggle source
# File lib/rails_failover/redis/handler.rb, line 110
def primary_down(options)
  already_down = primaries_down.put_if_absent(options[:id], options.dup)
  RailsFailover::Redis.on_failover_callback!(options[:id]) if !already_down
end
primary_up(options) click to toggle source
# File lib/rails_failover/redis/handler.rb, line 105
def primary_up(options)
  already_up = !primaries_down.delete(options[:id])
  RailsFailover::Redis.on_fallback_callback!(options[:id]) if !already_up
end
soft_disconnect(redis, client, role) click to toggle source

Calling .disconnect can cause a running subscribe() to block forever Therefore try to acquire the lock

# File lib/rails_failover/redis/handler.rb, line 171
def soft_disconnect(redis, client, role)
  has_lock = redis.mon_try_enter

  if !has_lock
    begin
      client.connection.shutdown_socket
    rescue => e
      logger&.warn "Redis shutdown_socket for (#{role}) failed with #{e.class} '#{e.message}'"
    end

    waiting_since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    loop do # Keep trying
      break if has_lock = redis.mon_try_enter
      break if !client.connection.connected? # Disconnected by other thread
      break if client.connection.rails_failover_role != role # Reconnected by other thread
      time_now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      break if time_now > waiting_since + SOFT_DISCONNECT_TIMEOUT_SECONDS
      sleep SOFT_DISCONNECT_POLL_SECONDS
    end
  end

  client.disconnect if client.connection&.rails_failover_role == role
ensure
  redis.mon_exit if has_lock
end
try_fallback_to_primary() click to toggle source
# File lib/rails_failover/redis/handler.rb, line 68
def try_fallback_to_primary
  frequency = RailsFailover::Redis.verify_primary_frequency_seconds
  sleep(frequency * ((rand(VERIFY_FREQUENCY_BUFFER_PRECENT) + 100) / 100.0))

  active_primaries_keys = {}

  primaries_down.each do |key, options|
    info = nil
    options = options.dup

    begin
      options[:driver] = options[:original_driver]
      primary_client = ::Redis::Client.new(options)
      logger&.debug "Checking connection to primary server (#{key})"
      info = primary_client.call([:info])
    rescue => e
      logger&.debug "Connection to primary server (#{key}) failed with '#{e.message}'"
    ensure
      primary_client&.disconnect
    end

    if info && info.include?(PRIMARY_LOADED_STATUS) && info.include?(PRIMARY_ROLE_STATUS)
      active_primaries_keys[key] = options
      logger&.debug "Primary server (#{key}) is active, disconnecting clients from replica"
    end
  end

  active_primaries_keys.each do |key, options|
    primary_up(options)
    disconnect_clients(options, RailsFailover::Redis::REPLICA)
  end
end