class Roundhouse::RoundRobinFetch
Constants
- UnitOfWork
Public Class Methods
bulk_requeue(inprogress, options)
click to toggle source
By leaving this as a class method, it can be pluggable and used by the Manager
actor. Making it an instance method will make it async to the Fetcher
actor
# File lib/roundhouse/fetch.rb, line 108 def self.bulk_requeue(inprogress, options) return if inprogress.empty? Roundhouse.logger.debug { "Re-queueing terminated jobs" } jobs_to_requeue = {} inprogress.each do |unit_of_work| jobs_to_requeue[unit_of_work.queue_id] ||= [] jobs_to_requeue[unit_of_work.queue_id] << unit_of_work.message end Roundhouse.redis do |conn| conn.pipelined do jobs_to_requeue.each do |queue_id, jobs| Roundhouse::Monitor.requeue(conn, queue_id, jobs) end end # REFACTOR NOTE: This has to happen outside the pipelining since # we need to read. We can refactor to put this back # after converting the Monitor operations as EVAL scripts jobs_to_requeue.keys.each do |queue_id| Roundhouse::Monitor.push(conn, queue_id) end end Roundhouse.logger.info("Pushed #{inprogress.size} messages back to Redis") rescue => ex Roundhouse.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}") end
new(options = nil)
click to toggle source
# File lib/roundhouse/fetch.rb, line 97 def initialize(options = nil) # ignore options end
Public Instance Methods
retrieve_work()
click to toggle source
# File lib/roundhouse/fetch.rb, line 101 def retrieve_work work = Roundhouse.redis { |conn| Roundhouse::Monitor.maybe_next_job(conn) } UnitOfWork.new(*work) if work end