class SideJob::ServerMiddleware

This middleware is primarily responsible for changing job status depending on events {SideJob::Job} sets status to terminating or queued when a job is queued All other job status changes happen here For simplicity, a job is allowed to be queued multiple times in the Sidekiq queue Only when it gets pulled out to be run, i.e. here, we decide if we want to actually run it

Attributes

raise_errors[RW]

If true, we do not rescue or log errors and instead propagate errors (useful for testing)

Public Instance Methods

call(worker, msg, queue) { || ... } click to toggle source

Called by sidekiq as a server middleware to handle running a worker @param worker [SideJob::Worker] @param msg [Hash] Sidekiq message format @param queue [String] Queue the job was pulled from

# File lib/sidejob/server_middleware.rb, line 17
def call(worker, msg, queue)
  # handle non SideJob workers (i.e. normal Sidekiq jobs)
  unless worker.is_a?(SideJob::Worker)
    yield
    return
  end

  @worker = worker
  return unless @worker.exists? # make sure the job has not been deleted

  # only run if status is queued or terminating
  case @worker.status
    when 'queued', 'terminating'
    else
      return
  end

  # We use the presence of this lock:worker key to indicate that a worker is trying to the get the job lock.
  # No other worker needs to also wait and no calls to {SideJob::Job#run} need to queue a new run.
  return unless SideJob.redis.set("#{@worker.redis_key}:lock:worker", 1, {nx: true, ex: 2})

  # Obtain a lock to allow only one worker to run at a time to simplify workers from having to deal with concurrency
  token = @worker.lock(CONFIGURATION[:lock_expiration])
  if token
    begin
      SideJob.redis.del "#{@worker.redis_key}:lock:worker"
      SideJob.context(job: @worker.id) do
        case @worker.status
          when 'queued'
            run_worker { yield }
          when 'terminating'
            terminate_worker
          # else no longer need running
        end
      end
    ensure
      @worker.unlock(token)
      @worker.run(parent: true) # run the parent every time worker runs
    end
  else
    SideJob.redis.del "#{@worker.redis_key}:lock:worker"
    # Unable to obtain job lock which may indicate another worker thread is running
    # Schedule another run
    # Note that the actual time before requeue depends on sidekiq poll_interval (default 15 seconds)
    case @worker.status
      when 'queued', 'terminating'
        @worker.run(wait: 1)
      # else no longer need running
    end
  end
end

Private Instance Methods

add_exception(exception) click to toggle source
# File lib/sidejob/server_middleware.rb, line 118
def add_exception(exception)
  if SideJob::ServerMiddleware.raise_errors
    raise exception
  else
    SideJob.log exception
  end
end
run_worker() { || ... } click to toggle source
# File lib/sidejob/server_middleware.rb, line 81
def run_worker(&block)
  # limit each job to being called too many times per minute
  # this is to help prevent bad coding that leads to infinite looping
  # Uses Rate limiter 1 pattern from http://redis.io/commands/INCR
  rate_key = "#{@worker.redis_key}:rate:#{Time.now.to_i / 60}"
  rate = SideJob.redis.multi do |multi|
    multi.incr rate_key
    multi.expire rate_key, 60
  end[0]

  if rate.to_i > CONFIGURATION[:max_runs_per_minute]
    SideJob.log({ error: 'Job was terminated due to being called too rapidly' })
    @worker.terminate
  else
    # normal run

    # if ran_at is not set, then this is the first run of the job, so call the startup method if it exists
    @worker.startup if @worker.respond_to?(:startup) && ! SideJob.redis.exists("#{@worker.redis_key}:ran_at")

    SideJob.redis.set "#{@worker.redis_key}:ran_at", SideJob.timestamp
    @worker.status = 'running'
    yield
    @worker.status = 'completed' if @worker.status == 'running'
  end
rescue SideJob::Worker::Suspended
  @worker.status = 'suspended' if @worker.status == 'running'
rescue => e
  # only set failed if not terminating/terminated
  case @worker.status
    when 'terminating', 'terminated'
    else
      @worker.status = 'failed'
  end

  add_exception e
end
terminate_worker() click to toggle source
# File lib/sidejob/server_middleware.rb, line 71
def terminate_worker
  # We let workers perform cleanup before terminating jobs
  # To prevent workers from preventing termination, errors are ignored
  @worker.shutdown if @worker.respond_to?(:shutdown)
rescue => e
  add_exception e
ensure
  @worker.status = 'terminated'
end