class AtomicSidekiq::AtomicFetch

Constants

DEFAULT_COLLECTION_INTERVAL
DEFAULT_EXPIRATION_TIME
DEFAULT_POLL_INTERVAL
IN_FLIGHT_KEY_PREFIX

Attributes

collection_interval[R]
expiration_time[R]
ignored_queues[R]
keymaker[R]
poll_interval[R]
queues[R]
retrieve_op[R]
strictly_ordered_queues[R]

Public Class Methods

new(options, in_flight_keymaker: nil) click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 9
def initialize(options, in_flight_keymaker: nil)
  @keymaker = in_flight_keymaker ||
              InFlightKeymaker.new(IN_FLIGHT_KEY_PREFIX)

  @retrieve_op = AtomicOperation::Retrieve.new(
    in_flight_keymaker: keymaker
  )

  @queues ||= options[:queues].map { |q| "queue:#{q}" }
  @strictly_ordered_queues = !!options[:strict]
  @@next_collection ||= Time.now

  configure_atomic_fetch(options.fetch(:atomic_fetch, {}))
end

Public Instance Methods

retrieve_work() click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 24
def retrieve_work
  collect_dead_jobs!
  work = retrieve_op.perform(ordered_queues, expire_at)
  return UnitOfWork.new(*work, in_flight_keymaker: keymaker) if work

  sleep(poll_interval)
  nil
end

Private Instance Methods

collect_dead_jobs!() click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 57
def collect_dead_jobs!
  return if @@next_collection > Time.now

  @@next_collection = Time.now + collection_interval
  DeadJobCollector.collect!(
    ordered_queues,
    in_flight_keymaker: keymaker,
    skip_recovery_queues: ignored_queues
  )
end
configure_atomic_fetch(options) click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 40
def configure_atomic_fetch(options)
  @expiration_time = options[:expiration_time] || DEFAULT_EXPIRATION_TIME
  @collection_interval = options[:collection_wait_time] ||
                         DEFAULT_COLLECTION_INTERVAL
  @poll_interval = options[:poll_interval] || DEFAULT_POLL_INTERVAL
  @ignored_queues = (options[:ignored_queues] || [])
                    .map { |q| "queue:#{q}" }
end
expire_at() click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 68
def expire_at
  Time.now.utc.to_i + expiration_time
end
ordered_queues() click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 49
def ordered_queues
  if strictly_ordered_queues
    queues
  else
    queues.shuffle.uniq
  end
end