class Soda::Fetcher
Constants
- SLEEP
If there's an error fetching from a queue, we should sleep.
Attributes
mutex[R]
paused[R]
queues[R]
Public Class Methods
new()
click to toggle source
Uses a weighted round robin approach to selecting which queue to use. See: en.wikipedia.org/wiki/Weighted_round_robin
# File lib/soda/fetcher.rb, line 10 def initialize @mutex = Mutex.new @queues = weigh_queues @paused = [] end
Public Instance Methods
fetch()
click to toggle source
# File lib/soda/fetcher.rb, line 16 def fetch unpause queue = next! if queue msgs = pop(queue) msgs.tap do pause(queue, queue.options[:sleep]) if msgs.count.zero? end end rescue Aws::SQS::Errors::ServiceError pause(queue, SLEEP) unless queue.nil? raise end
Private Instance Methods
next!()
click to toggle source
# File lib/soda/fetcher.rb, line 45 def next! mutex.synchronize do queues.shift.tap do |q| queues.push(q) unless q.nil? end end end
pause(queue, sleep)
click to toggle source
# File lib/soda/fetcher.rb, line 66 def pause(queue, sleep) mutex.synchronize do if queues.delete(queue) paused << [Time.now + sleep, queue] logger.info(%(paused fetching from "%s" for %d second(s)) % [queue.name, sleep]) end end end
pop(queue)
click to toggle source
# File lib/soda/fetcher.rb, line 36 def pop(queue) start = now logger.debug(%(fetching from "%s") % queue.name) (queue.pop || []).tap do |msgs| logger.debug(%(fetched %d message(s) from "%s" (%fms)) % [msgs.count, queue.name, (now - start)]) end end
unpause()
click to toggle source
# File lib/soda/fetcher.rb, line 53 def unpause mutex.synchronize do paused.each do |wakeup, q| if wakeup <= Time.now paused.delete([wakeup, q]) queues.concat(weigh_queues([q])) logger.info(%(un-paused fetching from "%s") % q.name) end end end end
weigh_queues(queues = Soda.queues)
click to toggle source
# File lib/soda/fetcher.rb, line 75 def weigh_queues(queues = Soda.queues) [].tap do |weighted| queues.each do |queue| weighted.concat([queue] * queue.weight) end end end