class Roundhouse::Fetcher
The Fetcher
blocks on Redis, waiting for a message to process from the queues. It gets the message and hands it to the Manager
to assign to a ready Processor
.
Constants
- TIMEOUT
Attributes
down[R]
Public Class Methods
new(mgr, options)
click to toggle source
# File lib/roundhouse/fetch.rb, line 19 def initialize(mgr, options) @down = nil @mgr = mgr @strategy = Fetcher.strategy.new(options) end
Private Class Methods
done!()
click to toggle source
Ugh. Say hello to a bloody hack. Can’t find a clean way to get the fetcher to just stop processing its mailbox when shutdown starts.
# File lib/roundhouse/fetch.rb, line 78 def self.done! @done = true Roundhouse.logger.debug 'Fetcher: setting done' end
done?()
click to toggle source
# File lib/roundhouse/fetch.rb, line 87 def self.done? defined?(@done) && @done end
reset()
click to toggle source
# File lib/roundhouse/fetch.rb, line 83 def self.reset # testing only @done = nil end
strategy()
click to toggle source
# File lib/roundhouse/fetch.rb, line 91 def self.strategy Roundhouse.options[:fetch] || RoundRobinFetch end
Public Instance Methods
fetch()
click to toggle source
Fetching is straightforward: the Manager
makes a fetch request for each idle processor when Roundhouse
starts and then issues a new fetch request every time a Processor
finishes a message.
Because we have to shut down cleanly, we can’t block forever and we can’t loop forever. Instead we reschedule a new fetch if the current fetch turned up nothing.
# File lib/roundhouse/fetch.rb, line 33 def fetch watchdog('Fetcher#fetch died') do logger.debug 'Fetcher terminating in #fetch' and return if Roundhouse::Fetcher.done? begin work = @strategy.retrieve_work ::Roundhouse.logger.info("Redis is online, #{Time.now - @down} sec downtime") if @down @down = nil if work @mgr.async.assign(work) else after(0) { fetch } end rescue => ex handle_fetch_exception(ex) end end end
Private Instance Methods
handle_fetch_exception(ex)
click to toggle source
# File lib/roundhouse/fetch.rb, line 60 def handle_fetch_exception(ex) if !@down logger.error("Error fetching message: #{ex}") ex.backtrace.each do |bt| logger.error(bt) end end @down ||= Time.now pause after(0) { fetch } rescue Celluloid::TaskTerminated # If redis is down when we try to shut down, all the fetch backlog # raises these errors. Haven't been able to figure out what I'm doing wrong. end
pause()
click to toggle source
# File lib/roundhouse/fetch.rb, line 56 def pause sleep(TIMEOUT) end