class Sidekiq::Hierarchy::Job
Constants
- COMPLETED_AT_FIELD
- ENQUEUED_AT_FIELD
- FINISHED_SUBTREE_SIZE_FIELD
- INFO_FIELD
Job
hash keys- INFO_KEYS
- ONE_MONTH
- PARENT_FIELD
- RUN_AT_FIELD
- STATUS_COMPLETE
- STATUS_ENQUEUED
Values for
STATUS_FIELD
- STATUS_FAILED
- STATUS_FIELD
- STATUS_REQUEUED
- STATUS_RUNNING
- SUBTREE_SIZE_FIELD
- WORKFLOW_FINISHED_AT_FIELD
- WORKFLOW_STATUS_FIELD
Attributes
Class definition
Public Class Methods
# File lib/sidekiq/hierarchy/job.rb, line 42 def create(jid, job_hash) new(jid).tap do |job| job[INFO_FIELD] = Sidekiq.dump_json(filtered_job_hash(job_hash)) job.increment_subtree_size # start at subtree size 1 -- no children end end
# File lib/sidekiq/hierarchy/job.rb, line 35 def initialize(jid) @jid = jid end
Private Class Methods
saves INFO_KEYS
as well as whatever keys are specified in the worker’s sidekiq options under :workflow_keys
# File lib/sidekiq/hierarchy/job.rb, line 51 def filtered_job_hash(job_hash) keys_to_keep = (INFO_KEYS + Array(job_hash['workflow_keys'])).uniq job_hash.select { |k, _| keys_to_keep.include?(k) } end
Public Instance Methods
# File lib/sidekiq/hierarchy/job.rb, line 67 def ==(other_job) other_job.instance_of?(self.class) && self.jid == other_job.jid end
Magic getter backed by redis hash
# File lib/sidekiq/hierarchy/job.rb, line 73 def [](key) redis { |conn| conn.hget(redis_job_hkey, key) } end
Magic setter backed by redis hash
# File lib/sidekiq/hierarchy/job.rb, line 78 def []=(key, value) redis do |conn| conn.multi do conn.hset(redis_job_hkey, key, value) conn.expire(redis_job_hkey, ONE_MONTH) end end value end
Draws a new doubly-linked parent-child relationship in redis
# File lib/sidekiq/hierarchy/job.rb, line 169 def add_child(child_job) redis do |conn| conn.multi do # draw child->parent relationship conn.hset(child_job.redis_job_hkey, PARENT_FIELD, self.jid) conn.expire(child_job.redis_job_hkey, ONE_MONTH) # draw parent->child relationship conn.rpush(redis_children_lkey, child_job.jid) conn.expire(redis_children_lkey, ONE_MONTH) end end # updates subtree counts to reflect new child increment_subtree_size(child_job.subtree_size) increment_finished_subtree_size(child_job.finished_subtree_size) end
Serialisation
# File lib/sidekiq/hierarchy/job.rb, line 314 def as_json(options={}) {k: info['class'], c: children.sort_by {|c| c.info['class']}.map(&:as_json)} end
# File lib/sidekiq/hierarchy/job.rb, line 101 def children redis do |conn| conn.lrange(redis_children_lkey, 0, -1).map { |jid| self.class.find(jid) } end end
Status update: mark as complete (step 3)
# File lib/sidekiq/hierarchy/job.rb, line 265 def complete! update_status :complete end
# File lib/sidekiq/hierarchy/job.rb, line 269 def complete? status == :complete end
# File lib/sidekiq/hierarchy/job.rb, line 273 def complete_at if complete? && t = self[COMPLETED_AT_FIELD] Time.at(t.to_f) end end
# File lib/sidekiq/hierarchy/job.rb, line 58 def delete children.each(&:delete) redis { |conn| conn.del(redis_children_lkey, redis_job_hkey) } end
Status update: mark as enqueued (step 1)
# File lib/sidekiq/hierarchy/job.rb, line 235 def enqueue! update_status :enqueued end
# File lib/sidekiq/hierarchy/job.rb, line 239 def enqueued? status == :enqueued end
# File lib/sidekiq/hierarchy/job.rb, line 243 def enqueued_at if t = self[ENQUEUED_AT_FIELD] Time.at(t.to_f) end end
# File lib/sidekiq/hierarchy/job.rb, line 63 def exists? redis { |conn| conn.exists(redis_job_hkey) } end
# File lib/sidekiq/hierarchy/job.rb, line 287 def fail! update_status :failed end
# File lib/sidekiq/hierarchy/job.rb, line 291 def failed? status == :failed end
# File lib/sidekiq/hierarchy/job.rb, line 295 def failed_at if failed? && t = self[COMPLETED_AT_FIELD] Time.at(t.to_f) end end
# File lib/sidekiq/hierarchy/job.rb, line 301 def finished? [:failed, :complete].include?(status) # two terminal states end
# File lib/sidekiq/hierarchy/job.rb, line 305 def finished_at if t = self[COMPLETED_AT_FIELD] Time.at(t.to_f) end end
The cached count of the finished jobs in the tree rooted at this job
# File lib/sidekiq/hierarchy/job.rb, line 156 def finished_subtree_size self[FINISHED_SUBTREE_SIZE_FIELD].to_i end
Recursively updates finished subtree size on this and all higher subtrees
# File lib/sidekiq/hierarchy/job.rb, line 161 def increment_finished_subtree_size(incr=1) redis { |conn| conn.hincrby(redis_job_hkey, FINISHED_SUBTREE_SIZE_FIELD, incr) } if p_job = parent p_job.increment_finished_subtree_size(incr) end end
Recursively updates subtree size on this and all higher subtrees
# File lib/sidekiq/hierarchy/job.rb, line 148 def increment_subtree_size(incr=1) redis { |conn| conn.hincrby(redis_job_hkey, SUBTREE_SIZE_FIELD, incr) } if p_job = parent p_job.increment_subtree_size(incr) end end
# File lib/sidekiq/hierarchy/job.rb, line 88 def info Sidekiq.load_json(self[INFO_FIELD]) end
# File lib/sidekiq/hierarchy/job.rb, line 111 def leaf? children.none? end
Walks down the workflow tree and returns all its leaf nodes If called on a leaf, returns an array containing only itself Warning: recursive!
# File lib/sidekiq/hierarchy/job.rb, line 125 def leaves # This could be done in a single Lua script server-side... self.leaf? ? [self] : children.flat_map(&:leaves) end
Tree exploration and manipulation
# File lib/sidekiq/hierarchy/job.rb, line 95 def parent if parent_jid = self[PARENT_FIELD] self.class.find(parent_jid) end end
# File lib/sidekiq/hierarchy/job.rb, line 325 def redis_children_lkey "#{redis_job_hkey}:children" end
Redis backend
# File lib/sidekiq/hierarchy/job.rb, line 321 def redis_job_hkey "hierarchy:job:#{jid}" end
# File lib/sidekiq/hierarchy/job.rb, line 279 def requeue! update_status :requeued end
# File lib/sidekiq/hierarchy/job.rb, line 283 def requeued? status == :requeued end
Walks up the workflow tree and returns its root job node Warning: recursive!
# File lib/sidekiq/hierarchy/job.rb, line 117 def root # This could be done in a single Lua script server-side... self.root? ? self : self.parent.root end
# File lib/sidekiq/hierarchy/job.rb, line 107 def root? parent.nil? end
Status update: mark as running (step 2)
# File lib/sidekiq/hierarchy/job.rb, line 250 def run! update_status :running end
# File lib/sidekiq/hierarchy/job.rb, line 258 def run_at if t = self[RUN_AT_FIELD] Time.at(t.to_f) end end
# File lib/sidekiq/hierarchy/job.rb, line 254 def running? status == :running end
Status get/set
# File lib/sidekiq/hierarchy/job.rb, line 193 def status case self[STATUS_FIELD] when STATUS_ENQUEUED :enqueued when STATUS_RUNNING :running when STATUS_COMPLETE :complete when STATUS_REQUEUED :requeued when STATUS_FAILED :failed else :unknown end end
Walks the subtree rooted here in DFS order Returns an Enumerator; use to_a to get an array instead
# File lib/sidekiq/hierarchy/job.rb, line 132 def subtree_jobs to_visit = [self] Enumerator.new do |y| while node = to_visit.pop y << node # sugar for yielding a value to_visit += node.children end end end
The cached cardinality of the tree rooted at this job
# File lib/sidekiq/hierarchy/job.rb, line 143 def subtree_size self[SUBTREE_SIZE_FIELD].to_i end
# File lib/sidekiq/hierarchy/job.rb, line 210 def update_status(new_status) old_status = status return if new_status == old_status case new_status when :enqueued s_val, t_field = STATUS_ENQUEUED, ENQUEUED_AT_FIELD when :running s_val, t_field = STATUS_RUNNING, RUN_AT_FIELD when :complete s_val, t_field = STATUS_COMPLETE, COMPLETED_AT_FIELD when :requeued s_val, t_field = STATUS_REQUEUED, nil when :failed s_val, t_field = STATUS_FAILED, COMPLETED_AT_FIELD end self[STATUS_FIELD] = s_val self[t_field] = Time.now.to_f.to_s if t_field increment_finished_subtree_size if [:failed, :complete].include?(new_status) Sidekiq::Hierarchy.publish(Notifications::JOB_UPDATE, self, new_status, old_status) end
# File lib/sidekiq/hierarchy/job.rb, line 186 def workflow Workflow.find(root) end