module Sqeduler::Worker::Synchronization
Module that provides common synchronization infrastructure of workers across multiple hosts `Sqeduler::BaseWorker.synchronize_jobs`.
Constants
- SCHEDULE_COLLISION_MARKER
callback for when the job expiration is too short, less < time it took perform the actual work
Public Class Methods
prepended(base)
click to toggle source
# File lib/sqeduler/worker/synchronization.rb, line 10 def self.prepended(base) if base.ancestors.include?(Sqeduler::Worker::Callbacks) raise "Sqeduler::Worker::Callbacks must be the last module that you prepend." end base.extend(ClassMethods) base.class_attribute :synchronize_jobs_mode base.class_attribute :synchronize_jobs_expiration base.class_attribute :synchronize_jobs_timeout end
Public Instance Methods
perform(*args)
click to toggle source
rubocop:enable Style/Documentation
Calls superclass method
# File lib/sqeduler/worker/synchronization.rb, line 41 def perform(*args) if self.class.synchronize_jobs_mode == :one_at_a_time perform_locked(self.class.sync_lock_key(*args)) do perform_timed do super end end else super end end
Private Instance Methods
on_lock_timeout(key)
click to toggle source
callback for when a lock cannot be obtained
Calls superclass method
# File lib/sqeduler/worker/synchronization.rb, line 56 def on_lock_timeout(key) Service.logger.warn( "#{self.class.name} unable to acquire lock '#{key}'. Aborting." ) super if defined?(super) end
on_schedule_collision(duration)
click to toggle source
Calls superclass method
# File lib/sqeduler/worker/synchronization.rb, line 66 def on_schedule_collision(duration) Service.logger.warn( format( SCHEDULE_COLLISION_MARKER, self.class.name, time_duration(duration), self.class.synchronize_jobs_expiration ) ) super if defined?(super) end
perform_locked(sync_lock_key, &work)
click to toggle source
# File lib/sqeduler/worker/synchronization.rb, line 83 def perform_locked(sync_lock_key, &work) RedisLock.with_lock( sync_lock_key, :expiration => self.class.synchronize_jobs_expiration, :timeout => self.class.synchronize_jobs_timeout, &work ) rescue RedisLock::LockTimeoutError on_lock_timeout(sync_lock_key) end
perform_timed(&block)
click to toggle source
# File lib/sqeduler/worker/synchronization.rb, line 78 def perform_timed(&block) duration = Benchmark.realtime(&block) on_schedule_collision(duration) if duration > self.class.synchronize_jobs_expiration end
time_duration(timespan)
click to toggle source
rubocop:disable Metrics/AbcSize
# File lib/sqeduler/worker/synchronization.rb, line 95 def time_duration(timespan) rest, secs = timespan.divmod(60) # self is the time difference t2 - t1 rest, mins = rest.divmod(60) days, hours = rest.divmod(24) result = [] result << "#{days} Days" if days > 0 result << "#{hours} Hours" if hours > 0 result << "#{mins} Minutes" if mins > 0 result << "#{secs.round(2)} Seconds" if secs > 0 result.join(" ") end