class Sidekiq::LimitFetch::Global::Semaphore

Constants

PREFIX

Attributes

local_busy[R]

Public Class Methods

new(name) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 7
def initialize(name)
  @name = name
  @lock = Mutex.new
  @local_busy = 0
end

Public Instance Methods

acquire() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 45
def acquire
  Selector.acquire([@name], namespace).size > 0
end
block() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 95
def block
  redis {|it| it.set "#{PREFIX}:block:#@name", true }
end
block_except(*queues) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 99
def block_except(*queues)
  raise ArgumentError if queues.empty?
  redis {|it| it.set "#{PREFIX}:block:#@name", queues.join(',') }
end
blocking?() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 108
def blocking?
  redis {|it| it.get "#{PREFIX}:block:#@name" }
end
busy() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 53
def busy
  redis {|it| it.llen "#{PREFIX}:busy:#@name" }
end
busy_processes() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 57
def busy_processes
  redis {|it| it.lrange "#{PREFIX}:busy:#@name", 0, -1 }
end
decrease_busy() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 66
def decrease_busy
  decrease_local_busy
  redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid }
end
decrease_local_busy() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 116
def decrease_local_busy
  @lock.synchronize { @local_busy -= 1 }
end
explain() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 124
    def explain
      <<-END.gsub(/^ {8}/, '')
        Current sidekiq process: #{Selector.uuid}

          All processes:
        #{Monitor.all_processes.join "\n"}

          Stale processes:
        #{Monitor.old_processes.join "\n"}

          Locked queue processes:
        #{probed_processes.sort.join "\n"}

          Busy queue processes:
        #{busy_processes.sort.join "\n"}

          Limit:
        #{limit.inspect}

          Process limit:
        #{process_limit.inspect}

          Blocking:
        #{blocking?}
      END
    end
increase_busy() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 61
def increase_busy
  increase_local_busy
  redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid }
end
increase_local_busy() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 112
def increase_local_busy
  @lock.synchronize { @local_busy += 1 }
end
limit() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 13
def limit
  value = redis {|it| it.get "#{PREFIX}:limit:#@name" }
  value.to_i if value
end
limit=(value) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 18
def limit=(value)
  @limit_changed = true

  if value
    redis {|it| it.set "#{PREFIX}:limit:#@name", value }
  else
    redis {|it| it.del "#{PREFIX}:limit:#@name" }
  end
end
limit_changed?() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 28
def limit_changed?
  @limit_changed
end
local_busy?() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 120
def local_busy?
  @local_busy > 0
end
pause() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 79
def pause
  redis {|it| it.set "#{PREFIX}:pause:#@name", true }
end
pause_for_ms(ms) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 83
def pause_for_ms ms
  redis {|it| it.psetex "#{PREFIX}:pause:#@name", ms, true }
end
paused?() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 91
def paused?
  redis {|it| it.get "#{PREFIX}:pause:#@name" }
end
probed() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 71
def probed
  redis {|it| it.llen "#{PREFIX}:probed:#@name" }
end
probed_processes() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 75
def probed_processes
  redis {|it| it.lrange "#{PREFIX}:probed:#@name", 0, -1 }
end
process_limit() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 32
def process_limit
  value = redis {|it| it.get "#{PREFIX}:process_limit:#@name" }
  value.to_i if value
end
process_limit=(value) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 37
def process_limit=(value)
  if value
    redis {|it| it.set "#{PREFIX}:process_limit:#@name", value }
  else
    redis {|it| it.del "#{PREFIX}:process_limit:#@name" }
  end
end
release() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 49
def release
  redis {|it| it.lrem "#{PREFIX}:probed:#@name", 1, Selector.uuid }
end
remove_lock!(process) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 158
def remove_lock!(process)
  redis do |it|
    it.lrem "#{PREFIX}:probed:#@name", 0, process
    it.lrem "#{PREFIX}:busy:#@name", 0, process
  end
end
remove_locks_except!(processes) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 151
def remove_locks_except!(processes)
  locked_processes = probed_processes.uniq
  (locked_processes - processes).each do |dead_process|
    remove_lock! dead_process
  end
end
unblock() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 104
def unblock
  redis {|it| it.del "#{PREFIX}:block:#@name" }
end
unpause() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 87
def unpause
  redis {|it| it.del "#{PREFIX}:pause:#@name" }
end

Private Instance Methods

namespace() click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 171
def namespace
  Sidekiq::LimitFetch::Queues.namespace
end
redis(&block) click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 167
def redis(&block)
  Sidekiq.redis(&block)
end