class SidekiqUniqueJobs::TimerTask

@see [Concurrent::TimerTask] www.rubydoc.info/gems/concurrent-ruby/Concurrent/TimerTask

Private Instance Methods

execute_task(completion) click to toggle source

@!visibility private

# File lib/sidekiq_unique_jobs/timer_task.rb, line 30
def execute_task(completion) # rubocop:disable Metrics/MethodLength
  return nil unless @running.true?

  timeout_task = -> { timeout_task(completion) }

  Concurrent::ScheduledTask.execute(
    timeout_interval,
    args: [completion],
    &timeout_task
  )
  @thread_completed = Concurrent::Event.new

  @value = @reason  = nil
  @executor.post do
    @value = @task.call(self)
  rescue Exception => ex # rubocop:disable Lint/RescueException
    @reason = ex
  ensure
    @thread_completed.set
  end

  @thread_completed.wait

  if completion.try?
    schedule_next_task
    time = Time.now
    observers.notify_observers do
      [time, value, @reason]
    end
  end
  nil
end
ns_initialize(opts, &task) click to toggle source
# File lib/sidekiq_unique_jobs/timer_task.rb, line 9
def ns_initialize(opts, &task)
  set_deref_options(opts)

  self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
  self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
  @run_now  = opts[:now] || opts[:run_now]
  @executor = Concurrent::RubySingleThreadExecutor.new
  @running  = Concurrent::AtomicBoolean.new(false)
  @task     = task
  @value    = nil

  self.observers = Concurrent::Collection::CopyOnNotifyObserverSet.new
end
schedule_next_task(interval = execution_interval) click to toggle source
# File lib/sidekiq_unique_jobs/timer_task.rb, line 23
def schedule_next_task(interval = execution_interval)
  exec_task = ->(completion) { execute_task(completion) }
  Concurrent::ScheduledTask.execute(interval, args: [Concurrent::Event.new], &exec_task)
  nil
end
timeout_task(completion) click to toggle source

@!visibility private

# File lib/sidekiq_unique_jobs/timer_task.rb, line 64
def timeout_task(completion)
  return unless @running.true?
  return unless completion.try?

  @executor.kill
  @executor.wait_for_termination
  @executor = Concurrent::RubySingleThreadExecutor.new

  @thread_completed.set

  schedule_next_task
  observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new)
end