class Chewy::Strategy::DelayedSidekiq::Scheduler
Constants
- DEFAULT_LATENCY
- DEFAULT_MARGIN
- DEFAULT_QUEUE
- DEFAULT_TTL
- FALLBACK_FIELDS
- FIELDS_IDS_SEPARATOR
- IDS_SEPARATOR
- KEY_PREFIX
Attributes
ids[R]
options[R]
type[R]
Public Class Methods
new(type, ids, options = {})
click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 56 def initialize(type, ids, options = {}) @type = type @ids = ids @options = options end
Public Instance Methods
postpone()
click to toggle source
the diagram:
inputs: latency == 2 reindex_time = Time.current Parallel OR Sequential triggers of reindex: | What is going on in reindex store (Redis): -------------------------------------------------------------------------------------------------- | process 1 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1] Schedule.new(CitiesIndex, [1]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & schedule a DelayedSidekiq::Worker at 1679347869 (at + 3) | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347866 score and reindex all ids with zpoped keys | chewy:delayed_sidekiq:CitiesIndex:1679347866 | | process 2 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2] Schedule.new(CitiesIndex, [2]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & do not schedule a new worker | | process 1 (reindex_time + (latency - 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] Schedule.new(CitiesIndex, [3]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] | & do not schedule a new worker | | process 2 (reindex_time + (latency + 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] Schedule.new(CitiesIndex, [4]).postpone | chewy:delayed_sidekiq:CitiesIndex:1679347868 = [4] | chewy:delayed_sidekiq:timechunks = [ | { score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"} | { score: 1679347868, "chewy:delayed_sidekiq:CitiesIndex:1679347868"} | ] | & schedule a DelayedSidekiq::Worker at 1679347871 (at + 3) | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347868 score and reindex all ids with zpoped keys | chewy:delayed_sidekiq:CitiesIndex:1679347866 (in case of failed previous reindex), | chewy:delayed_sidekiq:CitiesIndex:1679347868
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 98 def postpone ::Sidekiq.redis do |redis| # do the redis stuff in a single command to avoid concurrency issues if redis.eval(LUA_SCRIPT, keys: [timechunk_key, timechunks_key], argv: [serialize_data, at, ttl]) ::Sidekiq::Client.push( 'queue' => sidekiq_queue, 'at' => at + margin, 'class' => Chewy::Strategy::DelayedSidekiq::Worker, 'args' => [type_name, at] ) end end end
Private Instance Methods
at()
click to toggle source
this method returns predictable value that jumps by latency value another words each latency seconds it return the same value
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 118 def at @at ||= begin schedule_at = latency.seconds.from_now.to_f (schedule_at - (schedule_at % latency)).to_i end end
fields()
click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 126 def fields options[:update_fields].presence || [FALLBACK_FIELDS] end
latency()
click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 146 def latency strategy_config.latency || DEFAULT_LATENCY end
margin()
click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 150 def margin strategy_config.margin || DEFAULT_MARGIN end
serialize_data()
click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 138 def serialize_data [ids.join(IDS_SEPARATOR), fields.join(IDS_SEPARATOR)].join(FIELDS_IDS_SEPARATOR) end
sidekiq_queue()
click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 158 def sidekiq_queue Chewy.settings.dig(:sidekiq, :queue) || DEFAULT_QUEUE end
strategy_config()
click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 162 def strategy_config type.strategy_config.delayed_sidekiq end
timechunk_key()
click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 134 def timechunk_key "#{KEY_PREFIX}:#{type_name}:#{at}" end
timechunks_key()
click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 130 def timechunks_key "#{KEY_PREFIX}:#{type_name}:timechunks" end
ttl()
click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 154 def ttl strategy_config.ttl || DEFAULT_TTL end
type_name()
click to toggle source
# File lib/chewy/strategy/delayed_sidekiq/scheduler.rb, line 142 def type_name type.name end