class Dalli::Ring
An implementation of a consistent hash ring, designed to minimize the cache miss impact of adding or removing servers from the ring. That is, adding or removing a server from the ring should impact the key -> server mapping of ~ 1/N of the stored keys where N is the number of servers in the ring. This is done by creating a large number of “points” per server, distributed over the space 0x00000000 - 0xFFFFFFFF. For a given key, we calculate the CRC32 hash, and find the nearest “point” that is less than or equal to the the key's hash. In this implemetation, each “point” is represented by a Dalli::Ring::Entry.
Constants
- POINTS_PER_SERVER
The number of entries on the continuum created per server in an equally weighted scenario.
Attributes
continuum[RW]
servers[RW]
Public Class Methods
new(servers, options)
click to toggle source
# File lib/dalli/ring.rb, line 25 def initialize(servers, options) @servers = servers @continuum = nil @continuum = build_continuum(servers) if servers.size > 1 threadsafe! unless options[:threadsafe] == false @failover = options[:failover] != false end
Public Instance Methods
close()
click to toggle source
# File lib/dalli/ring.rb, line 94 def close @servers.each(&:close) end
keys_grouped_by_server(key_arr)
click to toggle source
# File lib/dalli/ring.rb, line 63 def keys_grouped_by_server(key_arr) key_arr.group_by do |key| server_for_key(key) rescue Dalli::RingError Dalli.logger.debug { "unable to get key #{key}" } nil end end
lock() { || ... }
click to toggle source
# File lib/dalli/ring.rb, line 72 def lock @servers.each(&:lock!) begin yield ensure @servers.each(&:unlock!) end end
pipeline_consume_and_ignore_responses()
click to toggle source
# File lib/dalli/ring.rb, line 81 def pipeline_consume_and_ignore_responses @servers.each do |s| s.request(:noop) rescue Dalli::NetworkError # Ignore this error, as it indicates the socket is unavailable # and there's no need to flush end end
server_for_key(key)
click to toggle source
# File lib/dalli/ring.rb, line 34 def server_for_key(key) server = if @continuum server_from_continuum(key) else @servers.first end # Note that the call to alive? has the side effect of initializing # the socket return server if server&.alive? raise Dalli::RingError, 'No server available' end
server_from_continuum(key)
click to toggle source
# File lib/dalli/ring.rb, line 48 def server_from_continuum(key) hkey = hash_for(key) 20.times do |try| server = server_for_hash_key(hkey) # Note that the call to alive? has the side effect of initializing # the socket return server if server.alive? break unless @failover hkey = hash_for("#{try}#{key}") end nil end
socket_timeout()
click to toggle source
# File lib/dalli/ring.rb, line 90 def socket_timeout @servers.first.socket_timeout end
Private Instance Methods
build_continuum(servers)
click to toggle source
# File lib/dalli/ring.rb, line 125 def build_continuum(servers) continuum = [] total_weight = servers.inject(0) { |memo, srv| memo + srv.weight } servers.each do |server| entry_count_for(server, servers.size, total_weight).times do |idx| hash = Digest::SHA1.hexdigest("#{server.name}:#{idx}") value = Integer("0x#{hash[0..7]}") continuum << Dalli::Ring::Entry.new(value, server) end end continuum.sort_by(&:value) end
entry_count_for(server, total_servers, total_weight)
click to toggle source
# File lib/dalli/ring.rb, line 110 def entry_count_for(server, total_servers, total_weight) ((total_servers * POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor end
hash_for(key)
click to toggle source
# File lib/dalli/ring.rb, line 106 def hash_for(key) Zlib.crc32(key) end
server_for_hash_key(hash_key)
click to toggle source
# File lib/dalli/ring.rb, line 114 def server_for_hash_key(hash_key) # Find the closest index in the Ring with value <= the given value entryidx = @continuum.bsearch_index { |entry| entry.value > hash_key } if entryidx.nil? entryidx = @continuum.size - 1 else entryidx -= 1 end @continuum[entryidx].server end
threadsafe!()
click to toggle source
# File lib/dalli/ring.rb, line 100 def threadsafe! @servers.each do |s| s.extend(Dalli::Threadsafe) end end