class Sidekiq::LimitFetch::Global::Semaphore
Constants
- PREFIX
Attributes
local_busy[R]
Public Class Methods
new(name)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 11 def initialize(name) @name = name @lock = Mutex.new @local_busy = 0 end
Public Instance Methods
acquire()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 49 def acquire Selector.acquire([@name], namespace).size.positive? end
block()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 99 def block redis { |it| it.set "#{PREFIX}:block:#{@name}", '1' } end
block_except(*queues)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 103 def block_except(*queues) raise ArgumentError if queues.empty? redis { |it| it.set "#{PREFIX}:block:#{@name}", queues.join(',') } end
blocking?()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 113 def blocking? redis { |it| it.get "#{PREFIX}:block:#{@name}" } == '1' end
busy()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 57 def busy redis { |it| it.llen "#{PREFIX}:busy:#{@name}" } end
busy_processes()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 61 def busy_processes redis { |it| it.lrange "#{PREFIX}:busy:#{@name}", 0, -1 } end
clear_limits()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 117 def clear_limits redis do |it| %w[block busy limit pause probed process_limit].each do |key| it.del "#{PREFIX}:#{key}:#{@name}" end end end
decrease_busy()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 70 def decrease_busy decrease_local_busy redis { |it| it.lrem "#{PREFIX}:busy:#{@name}", 1, Selector.uuid } end
decrease_local_busy()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 129 def decrease_local_busy @lock.synchronize { @local_busy -= 1 } end
explain()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 137 def explain <<-INFO.gsub(/^ {8}/, '') Current sidekiq process: #{Selector.uuid} All processes: #{Monitor.all_processes.join "\n"} Stale processes: #{Monitor.old_processes.join "\n"} Locked queue processes: #{probed_processes.sort.join "\n"} Busy queue processes: #{busy_processes.sort.join "\n"} Limit: #{limit.inspect} Process limit: #{process_limit.inspect} Blocking: #{blocking?} INFO end
increase_busy()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 65 def increase_busy increase_local_busy redis { |it| it.rpush "#{PREFIX}:busy:#{@name}", Selector.uuid } end
increase_local_busy()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 125 def increase_local_busy @lock.synchronize { @local_busy += 1 } end
limit()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 17 def limit value = redis { |it| it.get "#{PREFIX}:limit:#{@name}" } value&.to_i end
limit=(value)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 22 def limit=(value) @limit_changed = true if value redis { |it| it.set "#{PREFIX}:limit:#{@name}", value } else redis { |it| it.del "#{PREFIX}:limit:#{@name}" } end end
limit_changed?()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 32 def limit_changed? @limit_changed end
local_busy?()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 133 def local_busy? @local_busy.positive? end
pause()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 83 def pause redis { |it| it.set "#{PREFIX}:pause:#{@name}", '1' } end
pause_for_ms(milliseconds)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 87 def pause_for_ms(milliseconds) redis { |it| it.psetex "#{PREFIX}:pause:#{@name}", milliseconds, 1 } end
paused?()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 95 def paused? redis { |it| it.get "#{PREFIX}:pause:#{@name}" } == '1' end
probed()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 75 def probed redis { |it| it.llen "#{PREFIX}:probed:#{@name}" } end
probed_processes()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 79 def probed_processes redis { |it| it.lrange "#{PREFIX}:probed:#{@name}", 0, -1 } end
process_limit()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 36 def process_limit value = redis { |it| it.get "#{PREFIX}:process_limit:#{@name}" } value&.to_i end
process_limit=(value)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 41 def process_limit=(value) if value redis { |it| it.set "#{PREFIX}:process_limit:#{@name}", value } else redis { |it| it.del "#{PREFIX}:process_limit:#{@name}" } end end
release()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 53 def release redis { |it| it.lrem "#{PREFIX}:probed:#{@name}", 1, Selector.uuid } end
remove_lock!(process)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 171 def remove_lock!(process) redis do |it| it.lrem "#{PREFIX}:probed:#{@name}", 0, process it.lrem "#{PREFIX}:busy:#{@name}", 0, process end end
remove_locks_except!(processes)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 164 def remove_locks_except!(processes) locked_processes = probed_processes.uniq (locked_processes - processes).each do |dead_process| remove_lock! dead_process end end
unblock()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 109 def unblock redis { |it| it.del "#{PREFIX}:block:#{@name}" } end
unpause()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 91 def unpause redis { |it| it.del "#{PREFIX}:pause:#{@name}" } end
Private Instance Methods
namespace()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 184 def namespace Sidekiq::LimitFetch::Queues.namespace end
redis(&block)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 180 def redis(&block) Sidekiq.redis(&block) end