class Dalli::Ring

An implementation of a consistent hash ring, designed to minimize the cache miss impact of adding or removing servers from the ring. That is, adding or removing a server from the ring should impact the key -> server mapping of ~ 1/N of the stored keys where N is the number of servers in the ring. This is done by creating a large number of “points” per server, distributed over the space 0x00000000 - 0xFFFFFFFF. For a given key, we calculate the CRC32 hash, and find the nearest “point” that is less than or equal to the the key's hash. In this implemetation, each “point” is represented by a Dalli::Ring::Entry.

Constants

POINTS_PER_SERVER

The number of entries on the continuum created per server in an equally weighted scenario.

Attributes

continuum[RW]
servers[RW]

Public Class Methods

new(servers, options) click to toggle source
# File lib/dalli/ring.rb, line 25
def initialize(servers, options)
  @servers = servers
  @continuum = nil
  @continuum = build_continuum(servers) if servers.size > 1

  threadsafe! unless options[:threadsafe] == false
  @failover = options[:failover] != false
end

Public Instance Methods

close() click to toggle source
# File lib/dalli/ring.rb, line 94
def close
  @servers.each(&:close)
end
keys_grouped_by_server(key_arr) click to toggle source
# File lib/dalli/ring.rb, line 63
def keys_grouped_by_server(key_arr)
  key_arr.group_by do |key|
    server_for_key(key)
  rescue Dalli::RingError
    Dalli.logger.debug { "unable to get key #{key}" }
    nil
  end
end
lock() { || ... } click to toggle source
# File lib/dalli/ring.rb, line 72
def lock
  @servers.each(&:lock!)
  begin
    yield
  ensure
    @servers.each(&:unlock!)
  end
end
pipeline_consume_and_ignore_responses() click to toggle source
# File lib/dalli/ring.rb, line 81
def pipeline_consume_and_ignore_responses
  @servers.each do |s|
    s.request(:noop)
  rescue Dalli::NetworkError
    # Ignore this error, as it indicates the socket is unavailable
    # and there's no need to flush
  end
end
server_for_key(key) click to toggle source
# File lib/dalli/ring.rb, line 34
def server_for_key(key)
  server = if @continuum
             server_from_continuum(key)
           else
             @servers.first
           end

  # Note that the call to alive? has the side effect of initializing
  # the socket
  return server if server&.alive?

  raise Dalli::RingError, 'No server available'
end
server_from_continuum(key) click to toggle source
# File lib/dalli/ring.rb, line 48
def server_from_continuum(key)
  hkey = hash_for(key)
  20.times do |try|
    server = server_for_hash_key(hkey)

    # Note that the call to alive? has the side effect of initializing
    # the socket
    return server if server.alive?
    break unless @failover

    hkey = hash_for("#{try}#{key}")
  end
  nil
end
socket_timeout() click to toggle source
# File lib/dalli/ring.rb, line 90
def socket_timeout
  @servers.first.socket_timeout
end

Private Instance Methods

build_continuum(servers) click to toggle source
# File lib/dalli/ring.rb, line 125
def build_continuum(servers)
  continuum = []
  total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
  servers.each do |server|
    entry_count_for(server, servers.size, total_weight).times do |idx|
      hash = Digest::SHA1.hexdigest("#{server.name}:#{idx}")
      value = Integer("0x#{hash[0..7]}")
      continuum << Dalli::Ring::Entry.new(value, server)
    end
  end
  continuum.sort_by(&:value)
end
entry_count_for(server, total_servers, total_weight) click to toggle source
# File lib/dalli/ring.rb, line 110
def entry_count_for(server, total_servers, total_weight)
  ((total_servers * POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor
end
hash_for(key) click to toggle source
# File lib/dalli/ring.rb, line 106
def hash_for(key)
  Zlib.crc32(key)
end
server_for_hash_key(hash_key) click to toggle source
# File lib/dalli/ring.rb, line 114
def server_for_hash_key(hash_key)
  # Find the closest index in the Ring with value <= the given value
  entryidx = @continuum.bsearch_index { |entry| entry.value > hash_key }
  if entryidx.nil?
    entryidx = @continuum.size - 1
  else
    entryidx -= 1
  end
  @continuum[entryidx].server
end
threadsafe!() click to toggle source
# File lib/dalli/ring.rb, line 100
def threadsafe!
  @servers.each do |s|
    s.extend(Dalli::Threadsafe)
  end
end