class Sidekiq::Hierarchy::WorkflowSet

A sorted set of Workflows that permits enumeration

Constants

PAGE_SIZE

Public Class Methods

for_status(status) click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 12
def self.for_status(status)
  case status
  when :running
    RunningSet.new
  when :complete
    CompleteSet.new
  when :failed
    FailedSet.new
  end
end
new(status) click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 23
def initialize(status)
  raise ArgumentError, 'status cannot be nil' if status.nil?
  @status = status
end

Public Instance Methods

==(other_workflow_set) click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 28
def ==(other_workflow_set)
  other_workflow_set.instance_of?(self.class)
end
add(workflow) click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 36
def add(workflow)
  redis { |conn| conn.zadd(redis_zkey, Time.now.to_f, workflow.jid) }
end
contains?(workflow) click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 40
def contains?(workflow)
  !!redis { |conn| conn.zscore(redis_zkey, workflow.jid) }
end
each() { |find_by_jid| ... } click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 66
def each
  return enum_for(:each) unless block_given?

  elements = []
  last_max_score = Time.now.to_f
  loop do
    elements = redis do |conn|
      conn.zrevrangebyscore(redis_zkey, last_max_score, '-inf', limit: [0, PAGE_SIZE], with_scores: true)
        .drop_while { |elt| elements.include?(elt) }
    end
    break if elements.empty?
    elements.each { |jid, _| yield Workflow.find_by_jid(jid) }
    _, last_max_score = elements.last  # timestamp of last element
  end
end
move(workflow, from_set=nil) click to toggle source

Move a workflow to this set from its current one This should really be done in Lua, but unit testing support is just not there, so there is a potential race condition in which a workflow could end up in multiple sets. the effect of this is minimal, so we’ll fix it later.

# File lib/sidekiq/hierarchy/workflow_set.rb, line 57
def move(workflow, from_set=nil)
  redis do |conn|
    conn.multi do
      conn.zrem(from_set.redis_zkey, workflow.jid) if from_set
      conn.zadd(redis_zkey, Time.now.to_f, workflow.jid)
    end.last
  end
end
redis_zkey() click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 82
def redis_zkey
  "hierarchy:set:#{@status}"
end
remove(workflow) click to toggle source

Remove a workflow from the set if it is present. This operation can only be executed as cleanup (i.e., on a workflow that has been unpersisted/deleted); otherwise it will fail in order to avoid memory leaks.

# File lib/sidekiq/hierarchy/workflow_set.rb, line 48
def remove(workflow)
  raise 'Workflow still exists' if workflow.exists?
  redis { |conn| conn.zrem(redis_zkey, workflow.jid) }
end
size() click to toggle source
# File lib/sidekiq/hierarchy/workflow_set.rb, line 32
def size
  redis { |conn| conn.zcard(redis_zkey) }
end