class Roundhouse::Monitor

This class implements two things:

1. A turntable semaphore - the fetcher can pop the next available
   exclusive right to something (such as API request with a given
   auth token)
2. Track which access right is temporarily suspended

Constants

ACTIVE
BUCKETS
EMPTY
IN_ROTATION
LUA_MAYBE_TURNTABLE_PUSH
LUA_POP_JOB
LUA_RESUME_QUEUE
LUA_TURNTABLE_PUSH
MAYBE_TURNTABLE_PUSH
POP_JOB
QUEUE
RESUME_QUEUE
SCHEDULE
STATUS
SUSPENDED
TURNTABLE

This helps catch problems with key names at runtime

TURNTABLE_PUSH
TURNTABLE_TIMEOUT

Number of seconds to block on turntable

WOLVERINE_CONFIG

Public Class Methods

activate(conn, q_id) click to toggle source
# File lib/roundhouse/monitor.rb, line 82
def activate(conn, q_id)
  set_queue_status(conn, q_id, ACTIVE)
end
bucket_num(q_id) click to toggle source
# File lib/roundhouse/monitor.rb, line 111
def bucket_num(q_id)
  q_id.to_i / 1000
end
maybe_add_to_rotation(conn, q_id) click to toggle source

Only push onto turntable is status is empty If empty, push into turntable and set to active Subsequent job pushes would see the active queue and not push into turntable Used after pushing a job, and conditionally putting it into rotation

# File lib/roundhouse/monitor.rb, line 103
def maybe_add_to_rotation(conn, q_id)
  MAYBE_TURNTABLE_PUSH.call conn, [status_bucket(q_id), TURNTABLE, BUCKETS, IN_ROTATION], [q_id, bucket_num(q_id)]
end
maybe_next_job(conn) click to toggle source
# File lib/roundhouse/monitor.rb, line 49
def maybe_next_job(conn)
  queue_id = pop(conn)
  return nil unless queue_id

  job = pop_job(conn, queue_id)
  return queue_id, job if job
  return nil
end
pop(conn) click to toggle source

Find the first active queue Return nil if nothing is there. The fetcher is responsible for polling.

# File lib/roundhouse/monitor.rb, line 29
def pop(conn)
  _, q_id = conn.brpop(TURNTABLE, TURNTABLE_TIMEOUT)
  conn.srem(IN_ROTATION, q_id)
  return q_id if queue_status(conn, q_id) == ACTIVE
  return nil
end
pop_job(conn, q_id) click to toggle source

Atomically pop job If nothing is in the queue, set queue status to empty

# File lib/roundhouse/monitor.rb, line 60
def pop_job(conn, q_id)
  POP_JOB.call conn, ["#{QUEUE}:#{q_id}", status_bucket(q_id)], [q_id]
end
push(conn, q_id) click to toggle source

Atomic push Push queue into turntable if and only if queue status is active

# File lib/roundhouse/monitor.rb, line 38
def push(conn, q_id)
  # NOTE: this version of redis-namespace has a bug when you do keys: argv: params
  TURNTABLE_PUSH.call conn, [status_bucket(q_id), TURNTABLE, IN_ROTATION], [q_id]
end
push_job(conn, payloads) click to toggle source
# File lib/roundhouse/monitor.rb, line 64
def push_job(conn, payloads)
  return schedule(conn, payloads) if payloads.first['at']

  q_id = payloads.first['queue_id']
  now = Time.now.to_f
  to_push = payloads.map do |entry|
    entry['enqueued_at'.freeze] = now
    Roundhouse.dump_json(entry)
  end
  conn.lpush("#{QUEUE}:#{q_id}", to_push)

  maybe_add_to_rotation(conn, q_id)
end
queue_status(conn, q_id) click to toggle source
# File lib/roundhouse/monitor.rb, line 94
def queue_status(conn, q_id)
  conn.hget(status_bucket(q_id), q_id).to_i || EMPTY
end
rebuild_turntable!() click to toggle source

TODO: This needs to be in a lua script and atomic Need to test how well this scales too. Should be testing about 10,000 queues

# File lib/roundhouse/monitor.rb, line 118
def rebuild_turntable!
  Roundhouse.redis do |conn|
    buckets = conn.smembers(BUCKETS)
    queues = conn.pipelined do
      buckets.each { |bucket| conn.hgetall("#{STATUS}:#{bucket}") }
    end

    all_queue_ids = queues.map(&:keys).flatten
    queue_len_res = conn.pipelined do
      all_queue_ids.each { |queue| conn.llen("#{QUEUE}:#{queue}") }
    end

    queue_len = all_queue_ids.zip(queue_len_res)
    status = queues.inject({}) { |a,x| a.merge(x) }

    conn.multi do
      conn.del(TURNTABLE)
      conn.del(IN_ROTATION)
      queue_len.each do |(q_id, len)|
        s = status[q_id].to_i

        case s
        when SUSPENDED then next
        when ACTIVE then
          if len > 0
            conn.lpush(TURNTABLE, q_id)
            conn.sadd(IN_ROTATION, q_id)
          else
            set_queue_status(conn, q_id, EMPTY, false)
          end
        when EMPTY then
          next if len <= 0
          conn.lpush(TURNTABLE, q_id)
          conn.sadd(IN_ROTATION, q_id)
          set_queue_status(conn, q_id, ACTIVE, false)
        else
          set_queue_status(conn, q_id, SUSPENDED, false)
        end
      end
    end
  end
end
requeue(conn, q_id, jobs) click to toggle source

Bulk requeue (push from right). Usually done via Client, when Roundhouse is terminating

# File lib/roundhouse/monitor.rb, line 45
def requeue(conn, q_id, jobs)
  conn.rpush("#{QUEUE}:#{q_id}", jobs)
end
resume(conn, q_id) click to toggle source
# File lib/roundhouse/monitor.rb, line 90
def resume(conn, q_id)
  RESUME_QUEUE.call conn, [status_bucket(q_id), TURNTABLE, IN_ROTATION], [q_id]
end
set_queue_is_empty(conn, q_id) click to toggle source
# File lib/roundhouse/monitor.rb, line 78
def set_queue_is_empty(conn, q_id)
  set_queue_status(conn, q_id, EMPTY)
end
status_bucket(q_id) click to toggle source
# File lib/roundhouse/monitor.rb, line 107
def status_bucket(q_id)
  "#{STATUS}:#{bucket_num(q_id)}"
end
suspend(conn, q_id) click to toggle source
# File lib/roundhouse/monitor.rb, line 86
def suspend(conn, q_id)
  set_queue_status(conn, q_id, SUSPENDED)
end

Private Class Methods

schedule(conn, payloads) click to toggle source
# File lib/roundhouse/monitor.rb, line 163
def schedule(conn, payloads)
  conn.zadd(SCHEDULE.freeze, payloads.map do |hash|
        at = hash.delete('at'.freeze).to_s
        [at, Roundhouse.dump_json(hash)]
    end )
end
set_queue_status(conn, q_id, status, add_bucket = true) click to toggle source
# File lib/roundhouse/monitor.rb, line 170
def set_queue_status(conn, q_id, status, add_bucket = true)
  conn.sadd(BUCKETS, bucket_num(q_id)) if add_bucket
  conn.hset(status_bucket(q_id), q_id, status)
end