class AtomicSidekiq::AtomicFetch
Constants
- DEFAULT_COLLECTION_INTERVAL
- DEFAULT_EXPIRATION_TIME
- DEFAULT_POLL_INTERVAL
- IN_FLIGHT_KEY_PREFIX
Attributes
collection_interval[R]
expiration_time[R]
ignored_queues[R]
keymaker[R]
poll_interval[R]
queues[R]
retrieve_op[R]
strictly_ordered_queues[R]
Public Class Methods
new(options, in_flight_keymaker: nil)
click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 9 def initialize(options, in_flight_keymaker: nil) @keymaker = in_flight_keymaker || InFlightKeymaker.new(IN_FLIGHT_KEY_PREFIX) @retrieve_op = AtomicOperation::Retrieve.new( in_flight_keymaker: keymaker ) @queues ||= options[:queues].map { |q| "queue:#{q}" } @strictly_ordered_queues = !!options[:strict] @@next_collection ||= Time.now configure_atomic_fetch(options.fetch(:atomic_fetch, {})) end
Public Instance Methods
retrieve_work()
click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 24 def retrieve_work collect_dead_jobs! work = retrieve_op.perform(ordered_queues, expire_at) return UnitOfWork.new(*work, in_flight_keymaker: keymaker) if work sleep(poll_interval) nil end
Private Instance Methods
collect_dead_jobs!()
click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 57 def collect_dead_jobs! return if @@next_collection > Time.now @@next_collection = Time.now + collection_interval DeadJobCollector.collect!( ordered_queues, in_flight_keymaker: keymaker, skip_recovery_queues: ignored_queues ) end
configure_atomic_fetch(options)
click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 40 def configure_atomic_fetch(options) @expiration_time = options[:expiration_time] || DEFAULT_EXPIRATION_TIME @collection_interval = options[:collection_wait_time] || DEFAULT_COLLECTION_INTERVAL @poll_interval = options[:poll_interval] || DEFAULT_POLL_INTERVAL @ignored_queues = (options[:ignored_queues] || []) .map { |q| "queue:#{q}" } end
expire_at()
click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 68 def expire_at Time.now.utc.to_i + expiration_time end
ordered_queues()
click to toggle source
# File lib/atomic_sidekiq/atomic_fetch.rb, line 49 def ordered_queues if strictly_ordered_queues queues else queues.shuffle.uniq end end