class Sidekiq::LimitFetch::Global::Semaphore

Constants

PREFIX

Attributes

local_busy[R]

Public Class Methods

new(name) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 11
def initialize(name)
  @name = name
  @lock = Mutex.new
  @local_busy = 0
end

Public Instance Methods

acquire() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 49
def acquire
  Selector.acquire([@name], namespace).size.positive?
end
block() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 99
def block
  redis { |it| it.set "#{PREFIX}:block:#{@name}", '1' }
end
block_except(*queues) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 103
def block_except(*queues)
  raise ArgumentError if queues.empty?

  redis { |it| it.set "#{PREFIX}:block:#{@name}", queues.join(',') }
end
blocking?() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 113
def blocking?
  redis { |it| it.get "#{PREFIX}:block:#{@name}" } == '1'
end
busy() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 57
def busy
  redis { |it| it.llen "#{PREFIX}:busy:#{@name}" }
end
busy_processes() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 61
def busy_processes
  redis { |it| it.lrange "#{PREFIX}:busy:#{@name}", 0, -1 }
end
clear_limits() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 117
def clear_limits
  redis do |it|
    %w[block busy limit pause probed process_limit].each do |key|
      it.del "#{PREFIX}:#{key}:#{@name}"
    end
  end
end
decrease_busy() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 70
def decrease_busy
  decrease_local_busy
  redis { |it| it.lrem "#{PREFIX}:busy:#{@name}", 1, Selector.uuid }
end
decrease_local_busy() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 129
def decrease_local_busy
  @lock.synchronize { @local_busy -= 1 }
end
explain() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 137
        def explain
          <<-INFO.gsub(/^ {8}/, '')
        Current sidekiq process: #{Selector.uuid}

          All processes:
        #{Monitor.all_processes.join "\n"}

          Stale processes:
        #{Monitor.old_processes.join "\n"}

          Locked queue processes:
        #{probed_processes.sort.join "\n"}

          Busy queue processes:
        #{busy_processes.sort.join "\n"}

          Limit:
        #{limit.inspect}

          Process limit:
        #{process_limit.inspect}

          Blocking:
        #{blocking?}
          INFO
        end
increase_busy() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 65
def increase_busy
  increase_local_busy
  redis { |it| it.rpush "#{PREFIX}:busy:#{@name}", Selector.uuid }
end
increase_local_busy() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 125
def increase_local_busy
  @lock.synchronize { @local_busy += 1 }
end
limit() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 17
def limit
  value = redis { |it| it.get "#{PREFIX}:limit:#{@name}" }
  value&.to_i
end
limit=(value) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 22
def limit=(value)
  @limit_changed = true

  if value
    redis { |it| it.set "#{PREFIX}:limit:#{@name}", value }
  else
    redis { |it| it.del "#{PREFIX}:limit:#{@name}" }
  end
end
limit_changed?() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 32
def limit_changed?
  @limit_changed
end
local_busy?() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 133
def local_busy?
  @local_busy.positive?
end
pause() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 83
def pause
  redis { |it| it.set "#{PREFIX}:pause:#{@name}", '1' }
end
pause_for_ms(milliseconds) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 87
def pause_for_ms(milliseconds)
  redis { |it| it.psetex "#{PREFIX}:pause:#{@name}", milliseconds, 1 }
end
paused?() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 95
def paused?
  redis { |it| it.get "#{PREFIX}:pause:#{@name}" } == '1'
end
probed() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 75
def probed
  redis { |it| it.llen "#{PREFIX}:probed:#{@name}" }
end
probed_processes() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 79
def probed_processes
  redis { |it| it.lrange "#{PREFIX}:probed:#{@name}", 0, -1 }
end
process_limit() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 36
def process_limit
  value = redis { |it| it.get "#{PREFIX}:process_limit:#{@name}" }
  value&.to_i
end
process_limit=(value) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 41
def process_limit=(value)
  if value
    redis { |it| it.set "#{PREFIX}:process_limit:#{@name}", value }
  else
    redis { |it| it.del "#{PREFIX}:process_limit:#{@name}" }
  end
end
release() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 53
def release
  redis { |it| it.lrem "#{PREFIX}:probed:#{@name}", 1, Selector.uuid }
end
remove_lock!(process) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 171
def remove_lock!(process)
  redis do |it|
    it.lrem "#{PREFIX}:probed:#{@name}", 0, process
    it.lrem "#{PREFIX}:busy:#{@name}", 0, process
  end
end
remove_locks_except!(processes) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 164
def remove_locks_except!(processes)
  locked_processes = probed_processes.uniq
  (locked_processes - processes).each do |dead_process|
    remove_lock! dead_process
  end
end
unblock() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 109
def unblock
  redis { |it| it.del "#{PREFIX}:block:#{@name}" }
end
unpause() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 91
def unpause
  redis { |it| it.del "#{PREFIX}:pause:#{@name}" }
end

Private Instance Methods

namespace() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 184
def namespace
  Sidekiq::LimitFetch::Queues.namespace
end
redis(&block) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 180
def redis(&block)
  Sidekiq.redis(&block)
end