class Soda::Fetcher

Constants

SLEEP

If there's an error fetching from a queue, we should sleep.

Attributes

mutex[R]
paused[R]
queues[R]

Public Class Methods

new() click to toggle source

Uses a weighted round robin approach to selecting which queue to use. See: en.wikipedia.org/wiki/Weighted_round_robin

# File lib/soda/fetcher.rb, line 10
def initialize
  @mutex  = Mutex.new
  @queues = weigh_queues
  @paused = []
end

Public Instance Methods

fetch() click to toggle source
# File lib/soda/fetcher.rb, line 16
def fetch
  unpause

  queue = next!
  if queue
    msgs = pop(queue)
    msgs.tap do
      pause(queue, queue.options[:sleep]) if msgs.count.zero?
    end
  end
rescue Aws::SQS::Errors::ServiceError
  pause(queue, SLEEP) unless queue.nil?

  raise
end

Private Instance Methods

next!() click to toggle source
# File lib/soda/fetcher.rb, line 45
def next!
  mutex.synchronize do
    queues.shift.tap do |q|
      queues.push(q) unless q.nil?
    end
  end
end
pause(queue, sleep) click to toggle source
# File lib/soda/fetcher.rb, line 66
def pause(queue, sleep)
  mutex.synchronize do
    if queues.delete(queue)
      paused << [Time.now + sleep, queue]
      logger.info(%(paused fetching from "%s" for %d second(s)) % [queue.name, sleep])
    end
  end
end
pop(queue) click to toggle source
# File lib/soda/fetcher.rb, line 36
def pop(queue)
  start = now
  logger.debug(%(fetching from "%s") % queue.name)

  (queue.pop || []).tap do |msgs|
    logger.debug(%(fetched %d message(s) from "%s" (%fms)) % [msgs.count, queue.name, (now - start)])
  end
end
unpause() click to toggle source
# File lib/soda/fetcher.rb, line 53
def unpause
  mutex.synchronize do
    paused.each do |wakeup, q|
      if wakeup <= Time.now
        paused.delete([wakeup, q])
        queues.concat(weigh_queues([q]))

        logger.info(%(un-paused fetching from "%s") % q.name)
      end
    end
  end
end
weigh_queues(queues = Soda.queues) click to toggle source
# File lib/soda/fetcher.rb, line 75
def weigh_queues(queues = Soda.queues)
  [].tap do |weighted|
    queues.each do |queue|
      weighted.concat([queue] * queue.weight)
    end
  end
end