class Sidekiq::Superworker::SubjobProcessor

Public Class Methods

complete(subjob) click to toggle source
# File lib/sidekiq/superworker/subjob_processor.rb, line 72
def complete(subjob)
  Superworker.debug "#{subjob.to_info}: Complete"
  subjob.update_attribute(:status, 'complete')

  # If children are present, enqueue the first one
  children = subjob.children
  if children.present?
    Superworker.debug "#{subjob.to_info}: Enqueueing children"
    enqueue(children.first)
    return
  # Otherwise, set this as having its descendants complete
  else
    descendants_are_complete(subjob)
  end
end
enqueue(subjob) click to toggle source
# File lib/sidekiq/superworker/subjob_processor.rb, line 5
def enqueue(subjob)
  Superworker.debug "#{subjob.to_info}: Trying to enqueue"
  # Only enqueue subjobs that aren't running, complete, etc
  return unless subjob.status == 'initialized'
  
  Superworker.debug "#{subjob.to_info}: Enqueueing"
  # If this is a parallel subjob, enqueue all of its children
  if subjob.subworker_class == 'parallel'
    subjob.update_attribute(:status, 'running')

    Superworker.debug "#{subjob.to_info}: Enqueueing parallel children"
    jids = subjob.children.collect do |child|
      enqueue(child)
    end
    jid = jids.first
  elsif subjob.subworker_class == 'batch'
    subjob.update_attribute(:status, 'running')

    Superworker.debug "#{subjob.to_info}: Enqueueing batch children"
    jids = subjob.children.collect do |child|
      child.update_attribute(:status, 'running')
      enqueue(child.children.first)
    end
    jid = jids.first
  else
    klass = "::#{subjob.subworker_class}".constantize

    # If this is a superworker, mark it as complete, which will queue its children or its next subjob
    if klass.respond_to?(:is_a_superworker?) && klass.is_a_superworker?
      complete(subjob)
    # Otherwise, enqueue it in Sidekiq
    else
      # We need to explicitly set the job's JID, so that the ActiveRecord record can be updated before
      # the job fires off. If the job started first, it could finish before the ActiveRecord update
      # transaction completes, causing a race condition when finding the ActiveRecord record in
      # Processor#complete.
      jid = subjob.jid
      subjob.update_attributes(
        status: 'queued'
      )
      enqueue_in_sidekiq(subjob, klass, jid)
    end
  end
  jid
end
enqueue_in_sidekiq(subjob, klass, jid) click to toggle source
# File lib/sidekiq/superworker/subjob_processor.rb, line 51
def enqueue_in_sidekiq(subjob, klass, jid)
  Superworker.debug "#{subjob.to_info}: Enqueueing in Sidekiq"

  # If sidekiq-unique-jobs is being used for this worker, a number of issues arise if the subjob isn't
  # queued, so we'll bypass the unique functionality of the worker while running the subjob.
  is_unique = klass.respond_to?(:sidekiq_options_hash) && !!(klass.sidekiq_options_hash || {})['unique']
  if is_unique
    unique_value = klass.sidekiq_options_hash.delete('unique')
    unique_job_expiration_value = klass.sidekiq_options_hash.delete('unique_job_expiration')
  end

  sidekiq_push(subjob, klass, jid)

  if is_unique
    klass.sidekiq_options_hash['unique'] = unique_value
    klass.sidekiq_options_hash['unique_job_expiration'] = unique_job_expiration_value
  end

  jid
end
error(subjob, worker, item, exception) click to toggle source
# File lib/sidekiq/superworker/subjob_processor.rb, line 88
def error(subjob, worker, item, exception)
  Superworker.debug "#{subjob.to_info}: Error"
  subjob.update_attribute(:status, 'failed')
  SuperjobProcessor.error(subjob.superjob_id, worker, item, exception)
end

Protected Class Methods

descendants_are_complete(subjob) click to toggle source
# File lib/sidekiq/superworker/subjob_processor.rb, line 96
def descendants_are_complete(subjob)
  Superworker.debug "#{subjob.to_info}: Descendants are complete"
  subjob.update_attribute(:descendants_are_complete, true)

  if subjob.subworker_class == 'batch_child' || subjob.subworker_class == 'batch'
    complete(subjob)
  end

  parent = subjob.parent
  is_child_of_parallel = parent && parent.subworker_class == 'parallel'

  # If a parent exists, check whether this subjob's siblings are all complete
  if parent
    siblings_descendants_are_complete = parent.children.all? { |child| child.descendants_are_complete }
    if siblings_descendants_are_complete
      Superworker.debug "#{subjob.to_info}: Parent (#{parent.to_info}) is complete"
      descendants_are_complete(parent)
      parent.update_attribute(:status, 'complete') if is_child_of_parallel
    end
  end

  unless is_child_of_parallel
    # If a next subjob is present, enqueue it
    next_subjob = subjob.next
    if next_subjob
      enqueue(next_subjob)
      return
    end

    # If there isn't a parent, then, this is the final subjob of the superjob
    unless parent
      Superworker.debug "#{subjob.to_info}: Superjob is complete"
      SuperjobProcessor.complete(subjob.superjob_id)
    end
  end
end
sidekiq_item(subjob, klass, jid) click to toggle source
# File lib/sidekiq/superworker/subjob_processor.rb, line 139
def sidekiq_item(subjob, klass, jid)
  item = { 'class' => klass, 'args' => subjob.arg_values, 'jid' => jid }
  if subjob.meta && subjob.meta[:sidekiq]
    item.merge!(subjob.meta[:sidekiq].stringify_keys)
  end
  item
end
sidekiq_push(subjob, klass, jid) click to toggle source
# File lib/sidekiq/superworker/subjob_processor.rb, line 133
def sidekiq_push(subjob, klass, jid)
  # This is akin to perform_async, but it allows us to explicitly set the JID
  item = sidekiq_item(subjob, klass, jid)
  Sidekiq::Client.push(item)
end