class Dalli::Ring
An implementation of a consistent hash ring, designed to minimize the cache miss impact of adding or removing servers from the ring. That is, adding or removing a server from the ring should impact the key -> server mapping of ~ 1/N of the stored keys where N is the number of servers in the ring. This is done by creating a large number of “points” per server, distributed over the space 0x00000000 - 0xFFFFFFFF. For a given key, we calculate the CRC32 hash, and find the nearest “point” that is less than or equal to the the key’s hash. In this implemetation, each “point” is represented by a Dalli::Ring::Entry
.
Constants
- POINTS_PER_SERVER
-
The number of entries on the continuum created per server in an equally weighted scenario.
Attributes
Public Class Methods
Source
# File lib/dalli/ring.rb, line 26 def initialize(servers, options) @servers = servers @continuum = nil @continuum = build_continuum(servers) if servers.size > 1 threadsafe! unless options[:threadsafe] == false @failover = options[:failover] != false end
Public Instance Methods
Source
# File lib/dalli/ring.rb, line 64 def keys_grouped_by_server(key_arr) key_arr.group_by do |key| server_for_key(key) rescue Dalli::RingError Dalli.logger.debug { "unable to get key #{key}" } nil end end
Source
# File lib/dalli/ring.rb, line 73 def lock @servers.each(&:lock!) begin yield ensure @servers.each(&:unlock!) end end
Source
# File lib/dalli/ring.rb, line 82 def pipeline_consume_and_ignore_responses @servers.each do |s| s.request(:noop) rescue Dalli::NetworkError # Ignore this error, as it indicates the socket is unavailable # and there's no need to flush end end
Source
# File lib/dalli/ring.rb, line 35 def server_for_key(key) server = if @continuum server_from_continuum(key) else @servers.first end # Note that the call to alive? has the side effect of initializing # the socket return server if server&.alive? raise Dalli::RingError, 'No server available' end
Source
# File lib/dalli/ring.rb, line 49 def server_from_continuum(key) hkey = hash_for(key) 20.times do |try| server = server_for_hash_key(hkey) # Note that the call to alive? has the side effect of initializing # the socket return server if server.alive? break unless @failover hkey = hash_for("#{try}#{key}") end nil end
Source
# File lib/dalli/ring.rb, line 91 def socket_timeout @servers.first.socket_timeout end
Private Instance Methods
Source
# File lib/dalli/ring.rb, line 126 def build_continuum(servers) continuum = [] total_weight = servers.inject(0) { |memo, srv| memo + srv.weight } servers.each do |server| entry_count_for(server, servers.size, total_weight).times do |idx| hash = Digest::SHA1.hexdigest("#{server.name}:#{idx}") value = Integer("0x#{hash[0..7]}") continuum << Dalli::Ring::Entry.new(value, server) end end continuum.sort_by(&:value) end
Source
# File lib/dalli/ring.rb, line 111 def entry_count_for(server, total_servers, total_weight) ((total_servers * POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor end
Source
# File lib/dalli/ring.rb, line 115 def server_for_hash_key(hash_key) # Find the closest index in the Ring with value <= the given value entryidx = @continuum.bsearch_index { |entry| entry.value > hash_key } if entryidx.nil? entryidx = @continuum.size - 1 else entryidx -= 1 end @continuum[entryidx].server end
Source
# File lib/dalli/ring.rb, line 101 def threadsafe! @servers.each do |s| s.extend(Dalli::Threadsafe) end end