class SidekiqUniqueJobs::TimerTask
@see [Concurrent::TimerTask] www.rubydoc.info/gems/concurrent-ruby/Concurrent/TimerTask
Private Instance Methods
execute_task(completion)
click to toggle source
@!visibility private
# File lib/sidekiq_unique_jobs/timer_task.rb, line 30 def execute_task(completion) # rubocop:disable Metrics/MethodLength return nil unless @running.true? timeout_task = -> { timeout_task(completion) } Concurrent::ScheduledTask.execute( timeout_interval, args: [completion], &timeout_task ) @thread_completed = Concurrent::Event.new @value = @reason = nil @executor.post do @value = @task.call(self) rescue Exception => ex # rubocop:disable Lint/RescueException @reason = ex ensure @thread_completed.set end @thread_completed.wait if completion.try? schedule_next_task time = Time.now observers.notify_observers do [time, value, @reason] end end nil end
ns_initialize(opts, &task)
click to toggle source
# File lib/sidekiq_unique_jobs/timer_task.rb, line 9 def ns_initialize(opts, &task) set_deref_options(opts) self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL @run_now = opts[:now] || opts[:run_now] @executor = Concurrent::RubySingleThreadExecutor.new @running = Concurrent::AtomicBoolean.new(false) @task = task @value = nil self.observers = Concurrent::Collection::CopyOnNotifyObserverSet.new end
schedule_next_task(interval = execution_interval)
click to toggle source
# File lib/sidekiq_unique_jobs/timer_task.rb, line 23 def schedule_next_task(interval = execution_interval) exec_task = ->(completion) { execute_task(completion) } Concurrent::ScheduledTask.execute(interval, args: [Concurrent::Event.new], &exec_task) nil end
timeout_task(completion)
click to toggle source
@!visibility private
# File lib/sidekiq_unique_jobs/timer_task.rb, line 64 def timeout_task(completion) return unless @running.true? return unless completion.try? @executor.kill @executor.wait_for_termination @executor = Concurrent::RubySingleThreadExecutor.new @thread_completed.set schedule_next_task observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new) end