class Roundhouse::RoundRobinFetch

Constants

UnitOfWork

Public Class Methods

bulk_requeue(inprogress, options) click to toggle source

By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor

# File lib/roundhouse/fetch.rb, line 108
def self.bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Roundhouse.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue_id] ||= []
    jobs_to_requeue[unit_of_work.queue_id] << unit_of_work.message
  end

  Roundhouse.redis do |conn|
    conn.pipelined do
      jobs_to_requeue.each do |queue_id, jobs|
        Roundhouse::Monitor.requeue(conn, queue_id, jobs)
      end
    end
    # REFACTOR NOTE: This has to happen outside the pipelining since
    # we need to read. We can refactor to put this back
    # after converting the Monitor operations as EVAL scripts
    jobs_to_requeue.keys.each do |queue_id|
      Roundhouse::Monitor.push(conn, queue_id)
    end
  end
  Roundhouse.logger.info("Pushed #{inprogress.size} messages back to Redis")
rescue => ex
  Roundhouse.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end
new(options = nil) click to toggle source
# File lib/roundhouse/fetch.rb, line 97
def initialize(options = nil)
  # ignore options
end

Public Instance Methods

retrieve_work() click to toggle source
# File lib/roundhouse/fetch.rb, line 101
def retrieve_work
  work = Roundhouse.redis { |conn| Roundhouse::Monitor.maybe_next_job(conn) }
  UnitOfWork.new(*work) if work
end