class Gru::Adapters::RedisAdapter

Attributes

client[R]

Public Class Methods

new(settings) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 8
def initialize(settings)
  @settings = settings
  @client = initialize_client(settings.client_settings)
end

Public Instance Methods

expire_workers() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 39
def expire_workers
  removable = {}
  workers = max_host_workers
  workers.each do |worker, count|
    removable[worker] = with_worker_counts(worker,count) do |total|
      if expire_worker?(worker)
        total -= 1 if expire_worker(worker)
      end
      total
    end
  end
  removable
end
provision_workers() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 25
def provision_workers
  available = {}
  workers = max_host_workers
  workers.each do |worker, count|
    available[worker] = with_worker_counts(worker,count) do |total|
      if reserve_worker?(worker)
        total += 1 if reserve_worker(worker)
      end
      total
    end
  end
  available
end
release_presumed_dead_worker_hosts() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 68
def release_presumed_dead_worker_hosts
  return false unless manage_heartbeat?
  update_heartbeat
  presumed_dead_worker_hosts.each_pair do |hostname,timestamp|
    lock_key = "#{gru_key}:removing_dead_host:#{hostname}"
    if send_message(:setnx,lock_key,Time.now.to_i)
      remove_worker_host(hostname)
      send_message(:hdel,heartbeat_key,hostname)
      send_message(:del,lock_key)
      return true
    end
  end
  false
end
release_workers() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 53
def release_workers
  workers = max_host_workers
  workers.keys.each do |worker|
    host_count = local_running_count(worker)
    global_count = host_count
    host_count.times do
      global_count = send_message(:hincrby, global_workers_running_key, worker, -1) if global_count > 0
      host_count = send_message(:hincrby, host_workers_running_key,worker,-1) if host_count > 0
    end
  end
  send_message(:del, host_workers_running_key)
  send_message(:del, host_max_worker_key)
  send_message(:hdel, heartbeat_key, hostname)
end
remove_stale_worker_entries() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 83
def remove_stale_worker_entries
  stale_keys = max_global_workers.keys - @settings.cluster_maximums.keys
  stale_keys.each do |key|
    send_message(:hdel, global_max_worker_key, key)
  end
end
set_worker_counts() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 13
def set_worker_counts
  set_rebalance_flag(@settings.rebalance_flag)
  set_presume_host_dead_after(@settings.presume_host_dead_after)
  release_workers
  register_workers(@settings.host_maximums)
  set_max_worker_counts(@settings.host_maximums)
  register_global_workers(@settings.cluster_maximums)
  set_max_global_worker_counts(@settings.cluster_maximums)
  remove_stale_worker_entries
  update_heartbeat if manage_heartbeat?
end

Private Instance Methods

adjust_workers(worker,amount) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 174
def adjust_workers(worker,amount)
  lock_key = "#{gru_key}:locks:#{worker}"
  if send_message(:set,lock_key,Time.now.to_i, { nx: true, px: 10000 })
    send_message(:hincrby,host_workers_running_key,worker,amount)
    send_message(:hincrby,global_workers_running_key,worker,amount)
    send_message(:del,lock_key)
    return true
  end
  false
end
expire_worker(worker) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 170
def expire_worker(worker)
  adjust_workers(worker,-1)
end
expire_worker?(worker) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 209
def expire_worker?(worker)
  host_running,global_running,host_max,global_max = worker_counts(worker)
  result = false
  if rebalance_cluster?
    result = host_running.to_i > max_workers_per_host(global_max,host_max)
  else
    result = host_running.to_i > host_max.to_i
  end
   (result || global_running.to_i > global_max.to_i) && host_running.to_i >= 0 ||
     total_host_running > @settings.max_worker_processes_per_host
end
global_key() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 286
def global_key
  "#{gru_key}:global"
end
global_max_worker_key() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 278
def global_max_worker_key
  "#{global_key}:max_workers"
end
global_workers_running_key() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 282
def global_workers_running_key
  "#{global_key}:workers_running"
end
gru_host_count() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 247
def gru_host_count
  send_message(:keys,"#{gru_key}:*:workers_running").count - 1
end
gru_key() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 298
def gru_key
  "GRU:#{@settings.environment_name}:#{@settings.cluster_name}"
end
heartbeat_key() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 294
def heartbeat_key
  "#{gru_key}:heartbeats"
end
host_key() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 290
def host_key
  "#{gru_key}:#{hostname}"
end
host_max_worker_key() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 270
def host_max_worker_key
  "#{host_key}:max_workers"
end
host_workers_running_key() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 274
def host_workers_running_key
  "#{host_key}:workers_running"
end
hostname() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 302
def hostname
  @hostname ||= Socket.gethostname
end
initialize_client(config=nil) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 310
def initialize_client(config=nil)
  Redis.new(config || {})
end
local_running_count(worker) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 239
def local_running_count(worker)
  send_message(:hget,host_workers_running_key,worker).to_i
end
make_global_workers_count_non_negative(worker) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 261
def make_global_workers_count_non_negative(worker)
  send_message(:hset, global_workers_running_key, worker, 0)
end
manage_heartbeat?() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 133
def manage_heartbeat?
  @settings.manage_worker_heartbeats
end
max_global_workers() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 189
def max_global_workers
  send_message(:hgetall,global_max_worker_key)
end
max_host_workers() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 185
def max_host_workers
  send_message(:hgetall,host_max_worker_key)
end
max_workers_per_host(global_worker_max_count,host_max) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 251
def max_workers_per_host(global_worker_max_count,host_max)
  host_count = gru_host_count
  rebalance_count = host_count > 0 ? (global_worker_max_count.to_i/host_count.to_f).ceil : host_max.to_i
  rebalance_count <= host_max.to_i && host_count > 1 ? rebalance_count : host_max.to_i
end
presume_host_dead_after() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 265
def presume_host_dead_after
  dead_after_number_of_seconds = send_message(:get,"#{gru_key}:presume_host_dead_after").to_i
  dead_after_number_of_seconds > 0 ? dead_after_number_of_seconds : 120
end
presumed_dead_worker_hosts() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 235
def presumed_dead_worker_hosts
  workers_with_heartbeats.select{ |hostname, timestamp| timestamp.to_i + presume_host_dead_after < Time.now.to_i}
end
rebalance_cluster?() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 257
def rebalance_cluster?
  send_message(:get,"#{gru_key}:rebalance") == "true"
end
register_global_worker(worker,count) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 121
def register_global_worker(worker,count)
  send_message(:hsetnx,global_workers_running_key,worker,count)
end
register_global_workers(workers) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 96
def register_global_workers(workers)
  workers.each {|worker, count| register_global_worker(worker,0) }
end
register_worker(worker,count) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 117
def register_worker(worker,count)
  send_message(:hsetnx,host_workers_running_key,worker,count)
end
register_workers(workers) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 92
def register_workers(workers)
  workers.each {|worker, count| register_worker(worker,0) }
end
remove_worker_host(hostname) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 141
def remove_worker_host(hostname)
  workers = send_message(:hgetall, "#{gru_key}:#{hostname}:workers_running")
  workers.each_pair do |worker_name, count|
    local_count, global_count = Integer(count), Integer(count)
    Integer(count).times do
      local_count = send_message(:hincrby,"#{gru_key}:#{hostname}:workers_running",worker_name,-1) if local_count > 0
      global_count = send_message(:hincrby,global_workers_running_key,worker_name,-1) if global_count > 0
    end
  end
  send_message(:del,"#{gru_key}:#{hostname}:workers_running")
end
reserve_worker(worker) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 166
def reserve_worker(worker)
  adjust_workers(worker,1)
end
reserve_worker?(worker) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 197
def reserve_worker?(worker)
  host_running,global_running,host_max,global_max = worker_counts(worker)
  result = false
  if rebalance_cluster?
    result = host_running.to_i < max_workers_per_host(global_max,host_max)
  else
    result = host_running.to_i < host_max.to_i
  end
  result && global_running.to_i < global_max.to_i &&
    total_host_running < @settings.max_worker_processes_per_host
end
reset_removed_global_worker_counts(workers) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 153
def reset_removed_global_worker_counts(workers)
  global_max = max_host_workers
  global_max.each_pair do |worker, count|
    set_max_global_worker_count(worker,0) unless workers[worker]
  end
end
send_message(action,*args) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 306
def send_message(action,*args)
  @client.send(action,*args)
end
set_max_global_worker_count(worker,count) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 129
def set_max_global_worker_count(worker,count)
  send_message(:hset,global_max_worker_key,worker,count)
end
set_max_global_worker_counts(workers) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 104
def set_max_global_worker_counts(workers)
  reset_removed_global_worker_counts(workers)
  workers.each_pair{|worker,count| set_max_global_worker_count(worker,count) }
end
set_max_worker_count(worker,count) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 125
def set_max_worker_count(worker,count)
  send_message(:hset,host_max_worker_key,worker,count)
end
set_max_worker_counts(workers) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 100
def set_max_worker_counts(workers)
  workers.each_pair {|worker,count| set_max_worker_count(worker,count) }
end
set_presume_host_dead_after(seconds) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 113
def set_presume_host_dead_after(seconds)
  send_message(:set,"#{gru_key}:presume_host_dead_after",seconds)
end
set_rebalance_flag(rebalance) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 109
def set_rebalance_flag(rebalance)
  send_message(:set,"#{gru_key}:rebalance",rebalance)
end
total_host_running() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 243
def total_host_running
  send_message(:hgetall,host_workers_running_key).values.reduce(0) {|sum, value| sum + value.to_i }
end
update_heartbeat() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 137
def update_heartbeat
  send_message(:hset,heartbeat_key,hostname,Time.now.to_i)
end
with_worker_counts(worker,count,&block) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 160
def with_worker_counts(worker,count,&block)
  Integer(count).times.reduce(0) do |total|
    block.call(total)
  end
end
worker_counts(worker) click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 221
def worker_counts(worker)
  counts = @client.multi do |multi|
    multi.hget(host_workers_running_key,worker)
    multi.hget(global_workers_running_key,worker)
    multi.hget(host_max_worker_key,worker)
    multi.hget(global_max_worker_key,worker)
  end
  if counts[1].to_i <0
    make_global_workers_count_non_negative(worker)
    counts[1] = send_message(:hget, global_workers_running_key, worker)
  end
  counts
end
workers_with_heartbeats() click to toggle source
# File lib/gru/adapters/redis_adapter.rb, line 193
def workers_with_heartbeats
  send_message(:hgetall, heartbeat_key)
end