class Roundhouse::Monitor
This class implements two things:
1. A turntable semaphore - the fetcher can pop the next available exclusive right to something (such as API request with a given auth token) 2. Track which access right is temporarily suspended
Constants
- ACTIVE
- BUCKETS
- EMPTY
- IN_ROTATION
- LUA_MAYBE_TURNTABLE_PUSH
- LUA_POP_JOB
- LUA_RESUME_QUEUE
- LUA_TURNTABLE_PUSH
- MAYBE_TURNTABLE_PUSH
- POP_JOB
- QUEUE
- RESUME_QUEUE
- SCHEDULE
- STATUS
- SUSPENDED
- TURNTABLE
This helps catch problems with key names at runtime
- TURNTABLE_PUSH
- TURNTABLE_TIMEOUT
Number of seconds to block on turntable
- WOLVERINE_CONFIG
Public Class Methods
# File lib/roundhouse/monitor.rb, line 82 def activate(conn, q_id) set_queue_status(conn, q_id, ACTIVE) end
# File lib/roundhouse/monitor.rb, line 111 def bucket_num(q_id) q_id.to_i / 1000 end
Only push onto turntable is status is empty If empty, push into turntable and set to active Subsequent job pushes would see the active queue and not push into turntable Used after pushing a job, and conditionally putting it into rotation
# File lib/roundhouse/monitor.rb, line 103 def maybe_add_to_rotation(conn, q_id) MAYBE_TURNTABLE_PUSH.call conn, [status_bucket(q_id), TURNTABLE, BUCKETS, IN_ROTATION], [q_id, bucket_num(q_id)] end
# File lib/roundhouse/monitor.rb, line 49 def maybe_next_job(conn) queue_id = pop(conn) return nil unless queue_id job = pop_job(conn, queue_id) return queue_id, job if job return nil end
Find the first active queue Return nil if nothing is there. The fetcher is responsible for polling.
# File lib/roundhouse/monitor.rb, line 29 def pop(conn) _, q_id = conn.brpop(TURNTABLE, TURNTABLE_TIMEOUT) conn.srem(IN_ROTATION, q_id) return q_id if queue_status(conn, q_id) == ACTIVE return nil end
Atomically pop job If nothing is in the queue, set queue status to empty
# File lib/roundhouse/monitor.rb, line 60 def pop_job(conn, q_id) POP_JOB.call conn, ["#{QUEUE}:#{q_id}", status_bucket(q_id)], [q_id] end
Atomic push Push queue into turntable if and only if queue status is active
# File lib/roundhouse/monitor.rb, line 38 def push(conn, q_id) # NOTE: this version of redis-namespace has a bug when you do keys: argv: params TURNTABLE_PUSH.call conn, [status_bucket(q_id), TURNTABLE, IN_ROTATION], [q_id] end
# File lib/roundhouse/monitor.rb, line 64 def push_job(conn, payloads) return schedule(conn, payloads) if payloads.first['at'] q_id = payloads.first['queue_id'] now = Time.now.to_f to_push = payloads.map do |entry| entry['enqueued_at'.freeze] = now Roundhouse.dump_json(entry) end conn.lpush("#{QUEUE}:#{q_id}", to_push) maybe_add_to_rotation(conn, q_id) end
# File lib/roundhouse/monitor.rb, line 94 def queue_status(conn, q_id) conn.hget(status_bucket(q_id), q_id).to_i || EMPTY end
TODO: This needs to be in a lua script and atomic Need to test how well this scales too. Should be testing about 10,000 queues
# File lib/roundhouse/monitor.rb, line 118 def rebuild_turntable! Roundhouse.redis do |conn| buckets = conn.smembers(BUCKETS) queues = conn.pipelined do buckets.each { |bucket| conn.hgetall("#{STATUS}:#{bucket}") } end all_queue_ids = queues.map(&:keys).flatten queue_len_res = conn.pipelined do all_queue_ids.each { |queue| conn.llen("#{QUEUE}:#{queue}") } end queue_len = all_queue_ids.zip(queue_len_res) status = queues.inject({}) { |a,x| a.merge(x) } conn.multi do conn.del(TURNTABLE) conn.del(IN_ROTATION) queue_len.each do |(q_id, len)| s = status[q_id].to_i case s when SUSPENDED then next when ACTIVE then if len > 0 conn.lpush(TURNTABLE, q_id) conn.sadd(IN_ROTATION, q_id) else set_queue_status(conn, q_id, EMPTY, false) end when EMPTY then next if len <= 0 conn.lpush(TURNTABLE, q_id) conn.sadd(IN_ROTATION, q_id) set_queue_status(conn, q_id, ACTIVE, false) else set_queue_status(conn, q_id, SUSPENDED, false) end end end end end
Bulk requeue (push from right). Usually done via Client
, when Roundhouse
is terminating
# File lib/roundhouse/monitor.rb, line 45 def requeue(conn, q_id, jobs) conn.rpush("#{QUEUE}:#{q_id}", jobs) end
# File lib/roundhouse/monitor.rb, line 90 def resume(conn, q_id) RESUME_QUEUE.call conn, [status_bucket(q_id), TURNTABLE, IN_ROTATION], [q_id] end
# File lib/roundhouse/monitor.rb, line 78 def set_queue_is_empty(conn, q_id) set_queue_status(conn, q_id, EMPTY) end
# File lib/roundhouse/monitor.rb, line 107 def status_bucket(q_id) "#{STATUS}:#{bucket_num(q_id)}" end
# File lib/roundhouse/monitor.rb, line 86 def suspend(conn, q_id) set_queue_status(conn, q_id, SUSPENDED) end
Private Class Methods
# File lib/roundhouse/monitor.rb, line 163 def schedule(conn, payloads) conn.zadd(SCHEDULE.freeze, payloads.map do |hash| at = hash.delete('at'.freeze).to_s [at, Roundhouse.dump_json(hash)] end ) end
# File lib/roundhouse/monitor.rb, line 170 def set_queue_status(conn, q_id, status, add_bucket = true) conn.sadd(BUCKETS, bucket_num(q_id)) if add_bucket conn.hset(status_bucket(q_id), q_id, status) end