class Chewy::Strategy::DelayedSidekiq::Scheduler

Constants

DEFAULT_LATENCY
DEFAULT_MARGIN
DEFAULT_QUEUE
DEFAULT_TTL
FALLBACK_FIELDS
FIELDS_IDS_SEPARATOR
IDS_SEPARATOR
KEY_PREFIX

Attributes

ids[R]
options[R]
type[R]

Public Class Methods

new(type, ids, options = {}) click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 56
def initialize(type, ids, options = {})
  @type = type
  @ids = ids
  @options = options
end

Public Instance Methods

postpone() click to toggle source

the diagram:

inputs:
latency == 2
reindex_time = Time.current

Parallel OR Sequential triggers of reindex:          |  What is going on in reindex store (Redis):
--------------------------------------------------------------------------------------------------
                                                     |
process 1 (reindex_time):                            |  chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1]
  Schedule.new(CitiesIndex, [1]).postpone            |  chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}]
                                                     |  & schedule a DelayedSidekiq::Worker at 1679347869 (at + 3)
                                                     |    it will zpop chewy:delayed_sidekiq:timechunks up to 1679347866 score and reindex all ids with zpoped keys
                                                     |      chewy:delayed_sidekiq:CitiesIndex:1679347866
                                                     |
                                                     |
process 2 (reindex_time):                            |  chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2]
  Schedule.new(CitiesIndex, [2]).postpone            |  chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}]
                                                     |  & do not schedule a new worker
                                                     |
                                                     |
process 1 (reindex_time + (latency - 1).seconds):    |  chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3]
  Schedule.new(CitiesIndex, [3]).postpone            |  chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}]
                                                     |  & do not schedule a new worker
                                                     |
                                                     |
process 2 (reindex_time + (latency + 1).seconds):    |  chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3]
  Schedule.new(CitiesIndex, [4]).postpone            |  chewy:delayed_sidekiq:CitiesIndex:1679347868 = [4]
                                                     |  chewy:delayed_sidekiq:timechunks = [
                                                     |    { score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}
                                                     |    { score: 1679347868, "chewy:delayed_sidekiq:CitiesIndex:1679347868"}
                                                     |  ]
                                                     |  & schedule a DelayedSidekiq::Worker at 1679347871 (at + 3)
                                                     |    it will zpop chewy:delayed_sidekiq:timechunks up to 1679347868 score and reindex all ids with zpoped keys
                                                     |      chewy:delayed_sidekiq:CitiesIndex:1679347866 (in case of failed previous reindex),
                                                     |      chewy:delayed_sidekiq:CitiesIndex:1679347868
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 98
def postpone
  ::Sidekiq.redis do |redis|
    # do the redis stuff in a single command to avoid concurrency issues
    if redis.eval(LUA_SCRIPT, keys: [timechunk_key, timechunks_key], argv: [serialize_data, at, ttl])
      ::Sidekiq::Client.push(
        'queue' => sidekiq_queue,
        'at' => at + margin,
        'class' => Chewy::Strategy::DelayedSidekiq::Worker,
        'args' => [type_name, at]
      )
    end
  end
end

Private Instance Methods

at() click to toggle source

this method returns predictable value that jumps by latency value another words each latency seconds it return the same value

# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 118
def at
  @at ||= begin
    schedule_at = latency.seconds.from_now.to_f

    (schedule_at - (schedule_at % latency)).to_i
  end
end
fields() click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 126
def fields
  options[:update_fields].presence || [FALLBACK_FIELDS]
end
latency() click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 146
def latency
  strategy_config.latency || DEFAULT_LATENCY
end
margin() click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 150
def margin
  strategy_config.margin || DEFAULT_MARGIN
end
serialize_data() click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 138
def serialize_data
  [ids.join(IDS_SEPARATOR), fields.join(IDS_SEPARATOR)].join(FIELDS_IDS_SEPARATOR)
end
sidekiq_queue() click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 158
def sidekiq_queue
  Chewy.settings.dig(:sidekiq, :queue) || DEFAULT_QUEUE
end
strategy_config() click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 162
def strategy_config
  type.strategy_config.delayed_sidekiq
end
timechunk_key() click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 134
def timechunk_key
  "#{KEY_PREFIX}:#{type_name}:#{at}"
end
timechunks_key() click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 130
def timechunks_key
  "#{KEY_PREFIX}:#{type_name}:timechunks"
end
ttl() click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 154
def ttl
  strategy_config.ttl || DEFAULT_TTL
end
type_name() click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 142
def type_name
  type.name
end