class Sidekiq::Hierarchy::WorkflowSet
A sorted set of Workflows that permits enumeration
Constants
- PAGE_SIZE
Public Class Methods
for_status(status)
click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 12 def self.for_status(status) case status when :running RunningSet.new when :complete CompleteSet.new when :failed FailedSet.new end end
new(status)
click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 23 def initialize(status) raise ArgumentError, 'status cannot be nil' if status.nil? @status = status end
Public Instance Methods
==(other_workflow_set)
click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 28 def ==(other_workflow_set) other_workflow_set.instance_of?(self.class) end
add(workflow)
click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 36 def add(workflow) redis { |conn| conn.zadd(redis_zkey, Time.now.to_f, workflow.jid) } end
contains?(workflow)
click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 40 def contains?(workflow) !!redis { |conn| conn.zscore(redis_zkey, workflow.jid) } end
each() { |find_by_jid| ... }
click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 66 def each return enum_for(:each) unless block_given? elements = [] last_max_score = Time.now.to_f loop do elements = redis do |conn| conn.zrevrangebyscore(redis_zkey, last_max_score, '-inf', limit: [0, PAGE_SIZE], with_scores: true) .drop_while { |elt| elements.include?(elt) } end break if elements.empty? elements.each { |jid, _| yield Workflow.find_by_jid(jid) } _, last_max_score = elements.last # timestamp of last element end end
move(workflow, from_set=nil)
click to toggle source
Move a workflow to this set from its current one This should really be done in Lua, but unit testing support is just not there, so there is a potential race condition in which a workflow could end up in multiple sets. the effect of this is minimal, so we’ll fix it later.
# File lib/sidekiq/hierarchy/workflow_set.rb, line 57 def move(workflow, from_set=nil) redis do |conn| conn.multi do conn.zrem(from_set.redis_zkey, workflow.jid) if from_set conn.zadd(redis_zkey, Time.now.to_f, workflow.jid) end.last end end
redis_zkey()
click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 82 def redis_zkey "hierarchy:set:#{@status}" end
remove(workflow)
click to toggle source
Remove a workflow from the set if it is present. This operation can only be executed as cleanup (i.e., on a workflow that has been unpersisted/deleted); otherwise it will fail in order to avoid memory leaks.
# File lib/sidekiq/hierarchy/workflow_set.rb, line 48 def remove(workflow) raise 'Workflow still exists' if workflow.exists? redis { |conn| conn.zrem(redis_zkey, workflow.jid) } end
size()
click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 32 def size redis { |conn| conn.zcard(redis_zkey) } end