class Sidekiq::LimitFetch::Global::Semaphore
Constants
- PREFIX
Attributes
local_busy[R]
Public Class Methods
new(name)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 7 def initialize(name) @name = name @lock = Mutex.new @local_busy = 0 end
Public Instance Methods
acquire()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 45 def acquire Selector.acquire([@name], namespace).size > 0 end
block()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 95 def block redis {|it| it.set "#{PREFIX}:block:#@name", true } end
block_except(*queues)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 99 def block_except(*queues) raise ArgumentError if queues.empty? redis {|it| it.set "#{PREFIX}:block:#@name", queues.join(',') } end
blocking?()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 108 def blocking? redis {|it| it.get "#{PREFIX}:block:#@name" } end
busy()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 53 def busy redis {|it| it.llen "#{PREFIX}:busy:#@name" } end
busy_processes()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 57 def busy_processes redis {|it| it.lrange "#{PREFIX}:busy:#@name", 0, -1 } end
decrease_busy()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 66 def decrease_busy decrease_local_busy redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid } end
decrease_local_busy()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 116 def decrease_local_busy @lock.synchronize { @local_busy -= 1 } end
explain()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 124 def explain <<-END.gsub(/^ {8}/, '') Current sidekiq process: #{Selector.uuid} All processes: #{Monitor.all_processes.join "\n"} Stale processes: #{Monitor.old_processes.join "\n"} Locked queue processes: #{probed_processes.sort.join "\n"} Busy queue processes: #{busy_processes.sort.join "\n"} Limit: #{limit.inspect} Process limit: #{process_limit.inspect} Blocking: #{blocking?} END end
increase_busy()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 61 def increase_busy increase_local_busy redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid } end
increase_local_busy()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 112 def increase_local_busy @lock.synchronize { @local_busy += 1 } end
limit()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 13 def limit value = redis {|it| it.get "#{PREFIX}:limit:#@name" } value.to_i if value end
limit=(value)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 18 def limit=(value) @limit_changed = true if value redis {|it| it.set "#{PREFIX}:limit:#@name", value } else redis {|it| it.del "#{PREFIX}:limit:#@name" } end end
limit_changed?()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 28 def limit_changed? @limit_changed end
local_busy?()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 120 def local_busy? @local_busy > 0 end
pause()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 79 def pause redis {|it| it.set "#{PREFIX}:pause:#@name", true } end
pause_for_ms(ms)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 83 def pause_for_ms ms redis {|it| it.psetex "#{PREFIX}:pause:#@name", ms, true } end
paused?()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 91 def paused? redis {|it| it.get "#{PREFIX}:pause:#@name" } end
probed()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 71 def probed redis {|it| it.llen "#{PREFIX}:probed:#@name" } end
probed_processes()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 75 def probed_processes redis {|it| it.lrange "#{PREFIX}:probed:#@name", 0, -1 } end
process_limit()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 32 def process_limit value = redis {|it| it.get "#{PREFIX}:process_limit:#@name" } value.to_i if value end
process_limit=(value)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 37 def process_limit=(value) if value redis {|it| it.set "#{PREFIX}:process_limit:#@name", value } else redis {|it| it.del "#{PREFIX}:process_limit:#@name" } end end
release()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 49 def release redis {|it| it.lrem "#{PREFIX}:probed:#@name", 1, Selector.uuid } end
remove_lock!(process)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 158 def remove_lock!(process) redis do |it| it.lrem "#{PREFIX}:probed:#@name", 0, process it.lrem "#{PREFIX}:busy:#@name", 0, process end end
remove_locks_except!(processes)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 151 def remove_locks_except!(processes) locked_processes = probed_processes.uniq (locked_processes - processes).each do |dead_process| remove_lock! dead_process end end
unblock()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 104 def unblock redis {|it| it.del "#{PREFIX}:block:#@name" } end
unpause()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 87 def unpause redis {|it| it.del "#{PREFIX}:pause:#@name" } end
Private Instance Methods
namespace()
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 171 def namespace Sidekiq::LimitFetch::Queues.namespace end
redis(&block)
click to toggle source
# File lib/sidekiq/limit_fetch/global/semaphore.rb, line 167 def redis(&block) Sidekiq.redis(&block) end