class Sidekiq::Hierarchy::Job

Constants

COMPLETED_AT_FIELD
ENQUEUED_AT_FIELD
FINISHED_SUBTREE_SIZE_FIELD
INFO_FIELD

Job hash keys

INFO_KEYS
ONE_MONTH
PARENT_FIELD
RUN_AT_FIELD
STATUS_COMPLETE
STATUS_ENQUEUED

Values for STATUS_FIELD

STATUS_FAILED
STATUS_FIELD
STATUS_REQUEUED
STATUS_RUNNING
SUBTREE_SIZE_FIELD
WORKFLOW_FINISHED_AT_FIELD
WORKFLOW_STATUS_FIELD

Attributes

jid[R]

Class definition

Public Class Methods

create(jid, job_hash) click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 42
def create(jid, job_hash)
  new(jid).tap do |job|
    job[INFO_FIELD] = Sidekiq.dump_json(filtered_job_hash(job_hash))
    job.increment_subtree_size  # start at subtree size 1 -- no children
  end
end
find(jid)
Alias for: new
new(jid) click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 35
def initialize(jid)
  @jid = jid
end
Also aliased as: find

Private Class Methods

filtered_job_hash(job_hash) click to toggle source

saves INFO_KEYS as well as whatever keys are specified in the worker’s sidekiq options under :workflow_keys

# File lib/sidekiq/hierarchy/job.rb, line 51
def filtered_job_hash(job_hash)
  keys_to_keep = (INFO_KEYS + Array(job_hash['workflow_keys'])).uniq
  job_hash.select { |k, _| keys_to_keep.include?(k) }
end

Public Instance Methods

==(other_job) click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 67
def ==(other_job)
  other_job.instance_of?(self.class) &&
    self.jid == other_job.jid
end
[](key) click to toggle source

Magic getter backed by redis hash

# File lib/sidekiq/hierarchy/job.rb, line 73
def [](key)
  redis { |conn| conn.hget(redis_job_hkey, key) }
end
[]=(key, value) click to toggle source

Magic setter backed by redis hash

# File lib/sidekiq/hierarchy/job.rb, line 78
def []=(key, value)
  redis do |conn|
    conn.multi do
      conn.hset(redis_job_hkey, key, value)
      conn.expire(redis_job_hkey, ONE_MONTH)
    end
  end
  value
end
add_child(child_job) click to toggle source

Draws a new doubly-linked parent-child relationship in redis

# File lib/sidekiq/hierarchy/job.rb, line 169
def add_child(child_job)
  redis do |conn|
    conn.multi do
      # draw child->parent relationship
      conn.hset(child_job.redis_job_hkey, PARENT_FIELD, self.jid)
      conn.expire(child_job.redis_job_hkey, ONE_MONTH)
      # draw parent->child relationship
      conn.rpush(redis_children_lkey, child_job.jid)
      conn.expire(redis_children_lkey, ONE_MONTH)
    end
  end

  # updates subtree counts to reflect new child
  increment_subtree_size(child_job.subtree_size)
  increment_finished_subtree_size(child_job.finished_subtree_size)
end
as_json(options={}) click to toggle source

Serialisation

# File lib/sidekiq/hierarchy/job.rb, line 314
def as_json(options={})
  {k: info['class'], c: children.sort_by {|c| c.info['class']}.map(&:as_json)}
end
children() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 101
def children
  redis do |conn|
    conn.lrange(redis_children_lkey, 0, -1).map { |jid| self.class.find(jid) }
  end
end
complete!() click to toggle source

Status update: mark as complete (step 3)

# File lib/sidekiq/hierarchy/job.rb, line 265
def complete!
  update_status :complete
end
complete?() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 269
def complete?
  status == :complete
end
complete_at() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 273
def complete_at
  if complete? && t = self[COMPLETED_AT_FIELD]
    Time.at(t.to_f)
  end
end
delete() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 58
def delete
  children.each(&:delete)
  redis { |conn| conn.del(redis_children_lkey, redis_job_hkey) }
end
enqueue!() click to toggle source

Status update: mark as enqueued (step 1)

# File lib/sidekiq/hierarchy/job.rb, line 235
def enqueue!
  update_status :enqueued
end
enqueued?() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 239
def enqueued?
  status == :enqueued
end
enqueued_at() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 243
def enqueued_at
  if t = self[ENQUEUED_AT_FIELD]
    Time.at(t.to_f)
  end
end
exists?() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 63
def exists?
  redis { |conn| conn.exists(redis_job_hkey) }
end
fail!() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 287
def fail!
  update_status :failed
end
failed?() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 291
def failed?
  status == :failed
end
failed_at() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 295
def failed_at
  if failed? && t = self[COMPLETED_AT_FIELD]
    Time.at(t.to_f)
  end
end
finished?() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 301
def finished?
  [:failed, :complete].include?(status)  # two terminal states
end
finished_at() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 305
def finished_at
  if t = self[COMPLETED_AT_FIELD]
    Time.at(t.to_f)
  end
end
finished_subtree_size() click to toggle source

The cached count of the finished jobs in the tree rooted at this job

# File lib/sidekiq/hierarchy/job.rb, line 156
def finished_subtree_size
  self[FINISHED_SUBTREE_SIZE_FIELD].to_i
end
increment_finished_subtree_size(incr=1) click to toggle source

Recursively updates finished subtree size on this and all higher subtrees

# File lib/sidekiq/hierarchy/job.rb, line 161
def increment_finished_subtree_size(incr=1)
  redis { |conn| conn.hincrby(redis_job_hkey, FINISHED_SUBTREE_SIZE_FIELD, incr) }
  if p_job = parent
    p_job.increment_finished_subtree_size(incr)
  end
end
increment_subtree_size(incr=1) click to toggle source

Recursively updates subtree size on this and all higher subtrees

# File lib/sidekiq/hierarchy/job.rb, line 148
def increment_subtree_size(incr=1)
  redis { |conn| conn.hincrby(redis_job_hkey, SUBTREE_SIZE_FIELD, incr) }
  if p_job = parent
    p_job.increment_subtree_size(incr)
  end
end
info() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 88
def info
  Sidekiq.load_json(self[INFO_FIELD])
end
leaf?() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 111
def leaf?
  children.none?
end
leaves() click to toggle source

Walks down the workflow tree and returns all its leaf nodes If called on a leaf, returns an array containing only itself Warning: recursive!

# File lib/sidekiq/hierarchy/job.rb, line 125
def leaves
  # This could be done in a single Lua script server-side...
  self.leaf? ? [self] : children.flat_map(&:leaves)
end
parent() click to toggle source

Tree exploration and manipulation

# File lib/sidekiq/hierarchy/job.rb, line 95
def parent
  if parent_jid = self[PARENT_FIELD]
    self.class.find(parent_jid)
  end
end
redis_children_lkey() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 325
def redis_children_lkey
  "#{redis_job_hkey}:children"
end
redis_job_hkey() click to toggle source

Redis backend

# File lib/sidekiq/hierarchy/job.rb, line 321
def redis_job_hkey
  "hierarchy:job:#{jid}"
end
requeue!() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 279
def requeue!
  update_status :requeued
end
requeued?() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 283
def requeued?
  status == :requeued
end
root() click to toggle source

Walks up the workflow tree and returns its root job node Warning: recursive!

# File lib/sidekiq/hierarchy/job.rb, line 117
def root
  # This could be done in a single Lua script server-side...
  self.root? ? self : self.parent.root
end
root?() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 107
def root?
  parent.nil?
end
run!() click to toggle source

Status update: mark as running (step 2)

# File lib/sidekiq/hierarchy/job.rb, line 250
def run!
  update_status :running
end
run_at() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 258
def run_at
  if t = self[RUN_AT_FIELD]
    Time.at(t.to_f)
  end
end
running?() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 254
def running?
  status == :running
end
status() click to toggle source

Status get/set

# File lib/sidekiq/hierarchy/job.rb, line 193
def status
  case self[STATUS_FIELD]
  when STATUS_ENQUEUED
    :enqueued
  when STATUS_RUNNING
    :running
  when STATUS_COMPLETE
    :complete
  when STATUS_REQUEUED
    :requeued
  when STATUS_FAILED
    :failed
  else
    :unknown
  end
end
subtree_jobs() click to toggle source

Walks the subtree rooted here in DFS order Returns an Enumerator; use to_a to get an array instead

# File lib/sidekiq/hierarchy/job.rb, line 132
def subtree_jobs
  to_visit = [self]
  Enumerator.new do |y|
    while node = to_visit.pop
      y << node  # sugar for yielding a value
      to_visit += node.children
    end
  end
end
subtree_size() click to toggle source

The cached cardinality of the tree rooted at this job

# File lib/sidekiq/hierarchy/job.rb, line 143
def subtree_size
  self[SUBTREE_SIZE_FIELD].to_i
end
update_status(new_status) click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 210
def update_status(new_status)
  old_status = status
  return if new_status == old_status

  case new_status
  when :enqueued
    s_val, t_field = STATUS_ENQUEUED, ENQUEUED_AT_FIELD
  when :running
    s_val, t_field = STATUS_RUNNING, RUN_AT_FIELD
  when :complete
    s_val, t_field = STATUS_COMPLETE, COMPLETED_AT_FIELD
  when :requeued
    s_val, t_field = STATUS_REQUEUED, nil
  when :failed
    s_val, t_field = STATUS_FAILED, COMPLETED_AT_FIELD
  end

  self[STATUS_FIELD] = s_val
  self[t_field] = Time.now.to_f.to_s if t_field
  increment_finished_subtree_size if [:failed, :complete].include?(new_status)

  Sidekiq::Hierarchy.publish(Notifications::JOB_UPDATE, self, new_status, old_status)
end
workflow() click to toggle source
# File lib/sidekiq/hierarchy/job.rb, line 186
def workflow
  Workflow.find(root)
end