class Sidekiq::Hierarchy::Server::Middleware

Public Class Methods

new(options={}) click to toggle source
# File lib/sidekiq/hierarchy/server/middleware.rb, line 5
def initialize(options={})
end

Public Instance Methods

call(worker, msg, queue) { || ... } click to toggle source

Wraps around the actual execution of a job. Takes params:

worker - the instance of the worker to be used for execution
msg - the hash of job info, something like {'class' => 'HardWorker', 'args' => [1, 2, 'foo'], 'retry' => true}
queue - the named queue to use

Must propagate return value upwards. Since jobs raise errors for signalling, those must be propagated as well.

# File lib/sidekiq/hierarchy/server/middleware.rb, line 14
def call(worker, msg, queue)
  if msg['workflow'] == true  # root job -- start of a new workflow
    Sidekiq::Hierarchy.current_workflow = Workflow.find_by_jid(worker.jid)
    Sidekiq::Hierarchy.current_job = Sidekiq::Hierarchy.current_workflow.root
  elsif msg['workflow'].is_a?(String)  # child job -- inherit parent's workflow
    Sidekiq::Hierarchy.current_workflow = Workflow.find_by_jid(msg['workflow'])
    Sidekiq::Hierarchy.current_job = Job.find(worker.jid)
  end

  Sidekiq::Hierarchy.record_job_running
  ret = yield
  Sidekiq::Hierarchy.record_job_complete

  ret
rescue Exception => e
  if exception_caused_by_shutdown?(e) || retries_remaining?(msg)
    # job will be pushed back onto queue during hard_shutdown or if retries are permitted
    Sidekiq::Hierarchy.record_job_requeued
  else
    Sidekiq::Hierarchy.record_job_failed
  end

  raise
end

Private Instance Methods

exception_caused_by_shutdown?(e) click to toggle source
# File lib/sidekiq/hierarchy/server/middleware.rb, line 54
def exception_caused_by_shutdown?(e)
  e.instance_of?(Sidekiq::Shutdown) ||
    # In Ruby 2.1+, check if original exception was Shutdown
    (defined?(e.cause) && exception_caused_by_shutdown?(e.cause))
end
retries_remaining?(msg) click to toggle source
# File lib/sidekiq/hierarchy/server/middleware.rb, line 39
def retries_remaining?(msg)
  return false unless msg['retry']

  retry_count = msg['retry_count'] || 0
  max_retries = if msg['retry'].is_a?(Fixnum)
                  msg['retry']
                else
                  Sidekiq::Middleware::Server::RetryJobs::DEFAULT_MAX_RETRY_ATTEMPTS
                end

  # this check requires prepending the middleware before sidekiq's builtin retry
  retry_count < max_retries
end