class Roundhouse::Fetcher

The Fetcher blocks on Redis, waiting for a message to process from the queues. It gets the message and hands it to the Manager to assign to a ready Processor.

Constants

TIMEOUT

Attributes

down[R]

Public Class Methods

new(mgr, options) click to toggle source
# File lib/roundhouse/fetch.rb, line 19
def initialize(mgr, options)
  @down = nil
  @mgr = mgr
  @strategy = Fetcher.strategy.new(options)
end

Private Class Methods

done!() click to toggle source

Ugh. Say hello to a bloody hack. Can’t find a clean way to get the fetcher to just stop processing its mailbox when shutdown starts.

# File lib/roundhouse/fetch.rb, line 78
def self.done!
  @done = true
  Roundhouse.logger.debug 'Fetcher: setting done'
end
done?() click to toggle source
# File lib/roundhouse/fetch.rb, line 87
def self.done?
  defined?(@done) && @done
end
reset() click to toggle source
# File lib/roundhouse/fetch.rb, line 83
def self.reset # testing only
  @done = nil
end
strategy() click to toggle source
# File lib/roundhouse/fetch.rb, line 91
def self.strategy
  Roundhouse.options[:fetch] || RoundRobinFetch
end

Public Instance Methods

fetch() click to toggle source

Fetching is straightforward: the Manager makes a fetch request for each idle processor when Roundhouse starts and then issues a new fetch request every time a Processor finishes a message.

Because we have to shut down cleanly, we can’t block forever and we can’t loop forever. Instead we reschedule a new fetch if the current fetch turned up nothing.

# File lib/roundhouse/fetch.rb, line 33
def fetch
  watchdog('Fetcher#fetch died') do
    logger.debug 'Fetcher terminating in #fetch' and return if Roundhouse::Fetcher.done?

    begin
      work = @strategy.retrieve_work
      ::Roundhouse.logger.info("Redis is online, #{Time.now - @down} sec downtime") if @down
      @down = nil

      if work
        @mgr.async.assign(work)
      else
        after(0) { fetch }
      end
    rescue => ex
      handle_fetch_exception(ex)
    end

  end
end

Private Instance Methods

handle_fetch_exception(ex) click to toggle source
# File lib/roundhouse/fetch.rb, line 60
def handle_fetch_exception(ex)
  if !@down
    logger.error("Error fetching message: #{ex}")
    ex.backtrace.each do |bt|
      logger.error(bt)
    end
  end
  @down ||= Time.now
  pause
  after(0) { fetch }
rescue Celluloid::TaskTerminated
  # If redis is down when we try to shut down, all the fetch backlog
  # raises these errors.  Haven't been able to figure out what I'm doing wrong.
end
pause() click to toggle source
# File lib/roundhouse/fetch.rb, line 56
def pause
  sleep(TIMEOUT)
end