class Gru::Adapters::RedisAdapter
Attributes
client[R]
Public Class Methods
new(settings)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 8 def initialize(settings) @settings = settings @client = initialize_client(settings.client_settings) end
Public Instance Methods
expire_workers()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 39 def expire_workers removable = {} workers = max_host_workers workers.each do |worker, count| removable[worker] = with_worker_counts(worker,count) do |total| if expire_worker?(worker) total -= 1 if expire_worker(worker) end total end end removable end
provision_workers()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 25 def provision_workers available = {} workers = max_host_workers workers.each do |worker, count| available[worker] = with_worker_counts(worker,count) do |total| if reserve_worker?(worker) total += 1 if reserve_worker(worker) end total end end available end
release_presumed_dead_worker_hosts()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 68 def release_presumed_dead_worker_hosts return false unless manage_heartbeat? update_heartbeat presumed_dead_worker_hosts.each_pair do |hostname,timestamp| lock_key = "#{gru_key}:removing_dead_host:#{hostname}" if send_message(:setnx,lock_key,Time.now.to_i) remove_worker_host(hostname) send_message(:hdel,heartbeat_key,hostname) send_message(:del,lock_key) return true end end false end
release_workers()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 53 def release_workers workers = max_host_workers workers.keys.each do |worker| host_count = local_running_count(worker) global_count = host_count host_count.times do global_count = send_message(:hincrby, global_workers_running_key, worker, -1) if global_count > 0 host_count = send_message(:hincrby, host_workers_running_key,worker,-1) if host_count > 0 end end send_message(:del, host_workers_running_key) send_message(:del, host_max_worker_key) send_message(:hdel, heartbeat_key, hostname) end
remove_stale_worker_entries()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 83 def remove_stale_worker_entries stale_keys = max_global_workers.keys - @settings.cluster_maximums.keys stale_keys.each do |key| send_message(:hdel, global_max_worker_key, key) end end
set_worker_counts()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 13 def set_worker_counts set_rebalance_flag(@settings.rebalance_flag) set_presume_host_dead_after(@settings.presume_host_dead_after) release_workers register_workers(@settings.host_maximums) set_max_worker_counts(@settings.host_maximums) register_global_workers(@settings.cluster_maximums) set_max_global_worker_counts(@settings.cluster_maximums) remove_stale_worker_entries update_heartbeat if manage_heartbeat? end
Private Instance Methods
adjust_workers(worker,amount)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 174 def adjust_workers(worker,amount) lock_key = "#{gru_key}:locks:#{worker}" if send_message(:set,lock_key,Time.now.to_i, { nx: true, px: 10000 }) send_message(:hincrby,host_workers_running_key,worker,amount) send_message(:hincrby,global_workers_running_key,worker,amount) send_message(:del,lock_key) return true end false end
expire_worker(worker)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 170 def expire_worker(worker) adjust_workers(worker,-1) end
expire_worker?(worker)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 209 def expire_worker?(worker) host_running,global_running,host_max,global_max = worker_counts(worker) result = false if rebalance_cluster? result = host_running.to_i > max_workers_per_host(global_max,host_max) else result = host_running.to_i > host_max.to_i end (result || global_running.to_i > global_max.to_i) && host_running.to_i >= 0 || total_host_running > @settings.max_worker_processes_per_host end
global_key()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 286 def global_key "#{gru_key}:global" end
global_max_worker_key()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 278 def global_max_worker_key "#{global_key}:max_workers" end
global_workers_running_key()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 282 def global_workers_running_key "#{global_key}:workers_running" end
gru_host_count()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 247 def gru_host_count send_message(:keys,"#{gru_key}:*:workers_running").count - 1 end
gru_key()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 298 def gru_key "GRU:#{@settings.environment_name}:#{@settings.cluster_name}" end
heartbeat_key()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 294 def heartbeat_key "#{gru_key}:heartbeats" end
host_key()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 290 def host_key "#{gru_key}:#{hostname}" end
host_max_worker_key()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 270 def host_max_worker_key "#{host_key}:max_workers" end
host_workers_running_key()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 274 def host_workers_running_key "#{host_key}:workers_running" end
hostname()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 302 def hostname @hostname ||= Socket.gethostname end
initialize_client(config=nil)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 310 def initialize_client(config=nil) Redis.new(config || {}) end
local_running_count(worker)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 239 def local_running_count(worker) send_message(:hget,host_workers_running_key,worker).to_i end
make_global_workers_count_non_negative(worker)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 261 def make_global_workers_count_non_negative(worker) send_message(:hset, global_workers_running_key, worker, 0) end
manage_heartbeat?()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 133 def manage_heartbeat? @settings.manage_worker_heartbeats end
max_global_workers()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 189 def max_global_workers send_message(:hgetall,global_max_worker_key) end
max_host_workers()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 185 def max_host_workers send_message(:hgetall,host_max_worker_key) end
max_workers_per_host(global_worker_max_count,host_max)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 251 def max_workers_per_host(global_worker_max_count,host_max) host_count = gru_host_count rebalance_count = host_count > 0 ? (global_worker_max_count.to_i/host_count.to_f).ceil : host_max.to_i rebalance_count <= host_max.to_i && host_count > 1 ? rebalance_count : host_max.to_i end
presume_host_dead_after()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 265 def presume_host_dead_after dead_after_number_of_seconds = send_message(:get,"#{gru_key}:presume_host_dead_after").to_i dead_after_number_of_seconds > 0 ? dead_after_number_of_seconds : 120 end
presumed_dead_worker_hosts()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 235 def presumed_dead_worker_hosts workers_with_heartbeats.select{ |hostname, timestamp| timestamp.to_i + presume_host_dead_after < Time.now.to_i} end
rebalance_cluster?()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 257 def rebalance_cluster? send_message(:get,"#{gru_key}:rebalance") == "true" end
register_global_worker(worker,count)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 121 def register_global_worker(worker,count) send_message(:hsetnx,global_workers_running_key,worker,count) end
register_global_workers(workers)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 96 def register_global_workers(workers) workers.each {|worker, count| register_global_worker(worker,0) } end
register_worker(worker,count)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 117 def register_worker(worker,count) send_message(:hsetnx,host_workers_running_key,worker,count) end
register_workers(workers)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 92 def register_workers(workers) workers.each {|worker, count| register_worker(worker,0) } end
remove_worker_host(hostname)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 141 def remove_worker_host(hostname) workers = send_message(:hgetall, "#{gru_key}:#{hostname}:workers_running") workers.each_pair do |worker_name, count| local_count, global_count = Integer(count), Integer(count) Integer(count).times do local_count = send_message(:hincrby,"#{gru_key}:#{hostname}:workers_running",worker_name,-1) if local_count > 0 global_count = send_message(:hincrby,global_workers_running_key,worker_name,-1) if global_count > 0 end end send_message(:del,"#{gru_key}:#{hostname}:workers_running") end
reserve_worker(worker)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 166 def reserve_worker(worker) adjust_workers(worker,1) end
reserve_worker?(worker)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 197 def reserve_worker?(worker) host_running,global_running,host_max,global_max = worker_counts(worker) result = false if rebalance_cluster? result = host_running.to_i < max_workers_per_host(global_max,host_max) else result = host_running.to_i < host_max.to_i end result && global_running.to_i < global_max.to_i && total_host_running < @settings.max_worker_processes_per_host end
reset_removed_global_worker_counts(workers)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 153 def reset_removed_global_worker_counts(workers) global_max = max_host_workers global_max.each_pair do |worker, count| set_max_global_worker_count(worker,0) unless workers[worker] end end
send_message(action,*args)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 306 def send_message(action,*args) @client.send(action,*args) end
set_max_global_worker_count(worker,count)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 129 def set_max_global_worker_count(worker,count) send_message(:hset,global_max_worker_key,worker,count) end
set_max_global_worker_counts(workers)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 104 def set_max_global_worker_counts(workers) reset_removed_global_worker_counts(workers) workers.each_pair{|worker,count| set_max_global_worker_count(worker,count) } end
set_max_worker_count(worker,count)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 125 def set_max_worker_count(worker,count) send_message(:hset,host_max_worker_key,worker,count) end
set_max_worker_counts(workers)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 100 def set_max_worker_counts(workers) workers.each_pair {|worker,count| set_max_worker_count(worker,count) } end
set_presume_host_dead_after(seconds)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 113 def set_presume_host_dead_after(seconds) send_message(:set,"#{gru_key}:presume_host_dead_after",seconds) end
set_rebalance_flag(rebalance)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 109 def set_rebalance_flag(rebalance) send_message(:set,"#{gru_key}:rebalance",rebalance) end
total_host_running()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 243 def total_host_running send_message(:hgetall,host_workers_running_key).values.reduce(0) {|sum, value| sum + value.to_i } end
update_heartbeat()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 137 def update_heartbeat send_message(:hset,heartbeat_key,hostname,Time.now.to_i) end
with_worker_counts(worker,count,&block)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 160 def with_worker_counts(worker,count,&block) Integer(count).times.reduce(0) do |total| block.call(total) end end
worker_counts(worker)
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 221 def worker_counts(worker) counts = @client.multi do |multi| multi.hget(host_workers_running_key,worker) multi.hget(global_workers_running_key,worker) multi.hget(host_max_worker_key,worker) multi.hget(global_max_worker_key,worker) end if counts[1].to_i <0 make_global_workers_count_non_negative(worker) counts[1] = send_message(:hget, global_workers_running_key, worker) end counts end
workers_with_heartbeats()
click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 193 def workers_with_heartbeats send_message(:hgetall, heartbeat_key) end