module Sidekiq::LimitFetch::Global::Selector
Constants
- MUTEX_FOR_UUID
Public Instance Methods
acquire(queues, namespace)
click to toggle source
# File lib/sidekiq/limit_fetch/global/selector.rb, line 11 def acquire(queues, namespace) redis_eval :acquire, [namespace, uuid, queues] end
release(queues, namespace)
click to toggle source
# File lib/sidekiq/limit_fetch/global/selector.rb, line 15 def release(queues, namespace) redis_eval :release, [namespace, uuid, queues] end
uuid()
click to toggle source
# File lib/sidekiq/limit_fetch/global/selector.rb, line 19 def uuid # - if we'll remove "@uuid ||=" from inside of mutex # then @uuid can be overwritten # - if we'll remove "@uuid ||=" from outside of mutex # then each read will lead to mutex @uuid ||= MUTEX_FOR_UUID.synchronize { @uuid || SecureRandom.uuid } end
Private Instance Methods
redis_acquire_script()
click to toggle source
# File lib/sidekiq/limit_fetch/global/selector.rb, line 51 def redis_acquire_script <<-LUA local namespace = table.remove(ARGV, 1)..'limit_fetch:' local worker_name = table.remove(ARGV, 1) local queues = ARGV local available = {} local unblocked = {} local locks local process_locks local blocking_mode for _, queue in ipairs(queues) do if not blocking_mode or unblocked[queue] then local probed_key = namespace..'probed:'..queue local pause_key = namespace..'pause:'..queue local limit_key = namespace..'limit:'..queue local process_limit_key = namespace..'process_limit:'..queue local block_key = namespace..'block:'..queue local paused, limit, process_limit, can_block = unpack(redis.call('mget', pause_key, limit_key, process_limit_key, block_key )) if not paused then limit = tonumber(limit) process_limit = tonumber(process_limit) if can_block or limit then locks = redis.call('llen', probed_key) end if process_limit then local all_locks = redis.call('lrange', probed_key, 0, -1) process_locks = 0 for _, process in ipairs(all_locks) do if process == worker_name then process_locks = process_locks + 1 end end end if not blocking_mode then blocking_mode = can_block and locks > 0 end if blocking_mode and can_block ~= 'true' then for unblocked_queue in string.gmatch(can_block, "[^,]+") do unblocked[unblocked_queue] = true end end if (not limit or limit > locks) and (not process_limit or process_limit > process_locks) then redis.call('rpush', probed_key, worker_name) table.insert(available, queue) end end end end return available LUA end
redis_acquire_sha()
click to toggle source
# File lib/sidekiq/limit_fetch/global/selector.rb, line 43 def redis_acquire_sha @redis_acquire_sha ||= OpenSSL::Digest::SHA1.hexdigest redis_acquire_script end
redis_eval(script_name, args)
click to toggle source
# File lib/sidekiq/limit_fetch/global/selector.rb, line 29 def redis_eval(script_name, args) Sidekiq.redis do |it| it.evalsha send("redis_#{script_name}_sha"), [], args rescue Sidekiq::LimitFetch::RedisCommandError => e raise unless e.message.include? 'NOSCRIPT' if Sidekiq::LimitFetch.post_7? it.eval send("redis_#{script_name}_script"), 0, *args else it.eval send("redis_#{script_name}_script"), argv: args end end end
redis_release_script()
click to toggle source
# File lib/sidekiq/limit_fetch/global/selector.rb, line 119 def redis_release_script <<-LUA local namespace = table.remove(ARGV, 1)..'limit_fetch:' local worker_name = table.remove(ARGV, 1) local queues = ARGV for _, queue in ipairs(queues) do local probed_key = namespace..'probed:'..queue redis.call('lrem', probed_key, 1, worker_name) end LUA end
redis_release_sha()
click to toggle source
# File lib/sidekiq/limit_fetch/global/selector.rb, line 47 def redis_release_sha @redis_release_sha ||= OpenSSL::Digest::SHA1.hexdigest redis_release_script end