module Sidekiq

Sidekiq's Data API provides a Ruby object model on top of Sidekiq's runtime data in Redis. This API should never be used within application code for business logic.

The Sidekiq server process never uses this API: all data manipulation is done directly for performance reasons to ensure we are using Redis as efficiently as possible at every callsite.

This file is designed to be required within the user's deployment script; it should need a bare minimum of dependencies.

require "sidekiq/metrics/deploy"
gitdesc = `git log -1 --format="%h %s"`.strip
d = Sidekiq::Metrics::Deploy.new
d.mark(label: gitdesc)

Note that you cannot mark more than once per minute. This is a feature, not a bug.

This file contains the components which track execution metrics within Sidekiq.

SdNotify is a pure-Ruby implementation of sd_notify(3). It can be used to notify systemd about state changes. Methods of this package are no-op on non-systemd systems (eg. Darwin).

The API maps closely to the original implementation of sd_notify(3), therefore be sure to check the official man pages prior to using SdNotify.

@see www.freedesktop.org/software/systemd/man/sd_notify.html

Sidekiq's systemd integration allows Sidekiq to inform systemd:

1. when it has successfully started
2. when it is starting shutdown
3. periodically for a liveness check with a watchdog thread

Use `Sidekiq.transactional_push!` in your sidekiq.rb initializer

Constants

ClientMiddleware

Server-side middleware must import this Module in order to get access to server resources during `call`.

DEFAULTS
DEFAULT_ERROR_HANDLER

DEFAULT_ERROR_HANDLER is a constant that allows the default error handler to be referenced. It must be defined here, after the default_error_handler method is defined.

FAKE_INFO
Job

Sidekiq::Job is a new alias for Sidekiq::Worker as of Sidekiq 6.3.0. Use `include Sidekiq::Job` rather than `include Sidekiq::Worker`.

The term “worker” is too generic and overly confusing, used in several different contexts meaning different things. Many people call a Sidekiq process a “worker”. Some people call the thread that executes jobs a “worker”. This change brings Sidekiq closer to ActiveJob where your job classes extend ApplicationJob.

LICENSE
NAME
VERSION
Workers

The WorkSet stores the work being done by this Sidekiq cluster. It tracks the process and thread working on each job.

WARNING WARNING WARNING

This is live data that can change every millisecond. If you call size => 5 and then expect each to be called 5 times, you're going to have a bad time.

works = Sidekiq::WorkSet.new
works.size => 2
works.each do |process_id, thread_id, work|
  # process_id is a unique identifier per Sidekiq process
  # thread_id is a unique identifier per thread
  # work is a Hash which looks like:
  # { 'queue' => name, 'run_at' => timestamp, 'payload' => job_hash }
  # run_at is an epoch Integer.
end

Public Class Methods

[](key) click to toggle source
# File lib/sidekiq.rb, line 106
def self.[](key)
  @config[key]
end
[]=(key, val) click to toggle source
# File lib/sidekiq.rb, line 110
def self.[]=(key, val)
  @config[key] = val
end
average_scheduled_poll_interval=(interval) click to toggle source

How frequently Redis should be checked by a random Sidekiq process for scheduled and retriable jobs. Each individual process will take turns by waiting some multiple of this value.

See sidekiq/scheduled.rb for an in-depth explanation of this value

# File lib/sidekiq.rb, line 303
def self.average_scheduled_poll_interval=(interval)
  self[:average_scheduled_poll_interval] = interval
end
client_middleware() { |client_chain| ... } click to toggle source
# File lib/sidekiq.rb, line 209
def self.client_middleware
  @client_chain ||= Middleware::Chain.new(self)
  yield @client_chain if block_given?
  @client_chain
end
concurrency=(val) click to toggle source

config.concurrency = 5

# File lib/sidekiq.rb, line 60
def self.concurrency=(val)
  self[:concurrency] = Integer(val)
end
configure_client() { |self| ... } click to toggle source

Configuration for Sidekiq client, use like:

Sidekiq.configure_client do |config|
  config.redis = { size: 1, url: 'redis://myhost:8877/0' }
end
# File lib/sidekiq.rb, line 151
def self.configure_client
  yield self unless server?
end
configure_server() { |self| ... } click to toggle source

Configuration for Sidekiq server, use like:

Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.add MyServerHook
  end
end
# File lib/sidekiq.rb, line 141
def self.configure_server
  yield self if server?
end
death_handlers() click to toggle source

Death handlers are called when all retries for a job have been exhausted and the job dies. It's the notification to your application that this job will not succeed without manual intervention.

Sidekiq.configure_server do |config|

config.death_handlers << ->(job, ex) do
end

end

# File lib/sidekiq.rb, line 250
def self.death_handlers
  self[:death_handlers]
end
default_error_handler(ex, ctx) click to toggle source

Private APIs

# File lib/sidekiq.rb, line 84
def self.default_error_handler(ex, ctx)
  logger.warn(dump_json(ctx)) unless ctx.empty?
  logger.warn("#{ex.class.name}: #{ex.message}")
  logger.warn(ex.backtrace.join("\n")) unless ex.backtrace.nil?
end
default_job_options() click to toggle source
# File lib/sidekiq.rb, line 237
def self.default_job_options
  @default_job_options ||= {"retry" => true, "queue" => "default"}
end
default_job_options=(hash) click to toggle source
# File lib/sidekiq.rb, line 229
def self.default_job_options=(hash)
  @default_job_options = default_job_options.merge(hash.transform_keys(&:to_s))
end
default_server_middleware() click to toggle source
# File lib/sidekiq.rb, line 221
def self.default_server_middleware
  Middleware::Chain.new(self)
end
default_worker_options() click to toggle source
# File lib/sidekiq.rb, line 233
def self.default_worker_options # deprecated
  @default_job_options ||= {"retry" => true, "queue" => "default"}
end
default_worker_options=(hash) click to toggle source
# File lib/sidekiq.rb, line 225
def self.default_worker_options=(hash) # deprecated
  @default_job_options = default_job_options.merge(hash.transform_keys(&:to_s))
end
dump_json(object) click to toggle source
# File lib/sidekiq.rb, line 258
def self.dump_json(object)
  JSON.generate(object)
end
ent?() click to toggle source
# File lib/sidekiq.rb, line 294
def self.ent?
  defined?(Sidekiq::Enterprise)
end
error_handlers() click to toggle source

Register a proc to handle any error which occurs within the Sidekiq process.

Sidekiq.configure_server do |config|
  config.error_handlers << proc {|ex,ctx_hash| MyErrorService.notify(ex, ctx_hash) }
end

The default error handler logs errors to Sidekiq.logger.

# File lib/sidekiq.rb, line 314
def self.error_handlers
  self[:error_handlers]
end
fetch(*args, &block) click to toggle source
# File lib/sidekiq.rb, line 118
def self.fetch(*args, &block)
  @config.fetch(*args, &block)
end
handle_exception(ex, ctx = {}) click to toggle source
# File lib/sidekiq.rb, line 122
def self.handle_exception(ex, ctx = {})
  self[:error_handlers].each do |handler|
    handler.call(ex, ctx)
  rescue => ex
    logger.error "!!! ERROR HANDLER THREW AN ERROR !!!"
    logger.error ex
    logger.error ex.backtrace.join("\n") unless ex.backtrace.nil?
  end
end
load_json(string) click to toggle source
# File lib/sidekiq.rb, line 254
def self.load_json(string)
  JSON.parse(string)
end
log_formatter() click to toggle source
# File lib/sidekiq.rb, line 262
def self.log_formatter
  @log_formatter ||= if ENV["DYNO"]
    Sidekiq::Logger::Formatters::WithoutTimestamp.new
  else
    Sidekiq::Logger::Formatters::Pretty.new
  end
end
log_formatter=(log_formatter) click to toggle source
# File lib/sidekiq.rb, line 270
def self.log_formatter=(log_formatter)
  @log_formatter = log_formatter
  logger.formatter = log_formatter
end
logger() click to toggle source
# File lib/sidekiq.rb, line 275
def self.logger
  @logger ||= Sidekiq::Logger.new($stdout, level: :info)
end
logger=(logger) click to toggle source
# File lib/sidekiq.rb, line 279
def self.logger=(logger)
  if logger.nil?
    self.logger.level = Logger::FATAL
    return self.logger
  end

  logger.extend(Sidekiq::LoggingUtils)

  @logger = logger
end
merge!(hash) click to toggle source
# File lib/sidekiq.rb, line 114
def self.merge!(hash)
  @config.merge!(hash)
end
on(event, &block) click to toggle source

Register a block to run at a point in the Sidekiq lifecycle. :startup, :quiet or :shutdown are valid events.

Sidekiq.configure_server do |config|
  config.on(:shutdown) do
    puts "Goodbye cruel world!"
  end
end
# File lib/sidekiq.rb, line 326
def self.on(event, &block)
  raise ArgumentError, "Symbols only please: #{event}" unless event.is_a?(Symbol)
  raise ArgumentError, "Invalid event name: #{event}" unless self[:lifecycle_events].key?(event)
  self[:lifecycle_events][event] << block
end
options() click to toggle source
# File lib/sidekiq.rb, line 96
def self.options
  logger.warn "`config.options[:key] = value` is deprecated, use `config[:key] = value`: #{caller(1..2)}"
  @config
end
options=(opts) click to toggle source
# File lib/sidekiq.rb, line 101
def self.options=(opts)
  logger.warn "config.options = hash` is deprecated, use `config.merge!(hash)`: #{caller(1..2)}"
  @config = opts
end
pro?() click to toggle source
# File lib/sidekiq.rb, line 290
def self.pro?
  defined?(Sidekiq::Pro)
end
queues=(val) click to toggle source

config.queues = %w( high default low ) # strict config.queues = %w( high,3 default,2 low,1 ) # weighted config.queues = %w( feature1,1 feature2,1 feature3,1 ) # random

With weighted priority, queue will be checked first (weight / total) of the time. high will be checked first (3/6) or 50% of the time. I'd recommend setting weights between 1-10. Weights in the hundreds or thousands are ridiculous and unnecessarily expensive. You can get random queue ordering by explicitly setting all weights to 1.

# File lib/sidekiq.rb, line 73
def self.queues=(val)
  self[:queues] = Array(val).each_with_object([]) do |qstr, memo|
    name, weight = qstr.split(",")
    self[:strict] = false if weight.to_i > 0
    [weight.to_i, 1].max.times do
      memo << name
    end
  end
end
redis() { |conn| ... } click to toggle source
# File lib/sidekiq.rb, line 159
def self.redis
  raise ArgumentError, "requires a block" unless block_given?
  redis_pool.with do |conn|
    retryable = true
    begin
      yield conn
    rescue RedisConnection.adapter::BaseError => ex
      # 2550 Failover can cause the server to become a replica, need
      # to disconnect and reopen the socket to get back to the primary.
      # 4495 Use the same logic if we have a "Not enough replicas" error from the primary
      # 4985 Use the same logic when a blocking command is force-unblocked
      # The same retry logic is also used in client.rb
      if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/
        conn.disconnect!
        retryable = false
        retry
      end
      raise
    end
  end
end
redis=(hash) click to toggle source
# File lib/sidekiq.rb, line 201
def self.redis=(hash)
  @redis = if hash.is_a?(ConnectionPool)
    hash
  else
    RedisConnection.create(hash)
  end
end
redis_info() click to toggle source
# File lib/sidekiq.rb, line 181
def self.redis_info
  redis do |conn|
    # admin commands can't go through redis-namespace starting
    # in redis-namespace 2.0
    if conn.respond_to?(:namespace)
      conn.redis.info
    else
      conn.info
    end
  rescue RedisConnection.adapter::CommandError => ex
    # 2850 return fake version when INFO command has (probably) been renamed
    raise unless /unknown command/.match?(ex.message)
    FAKE_INFO
  end
end
redis_pool() click to toggle source
# File lib/sidekiq.rb, line 197
def self.redis_pool
  @redis ||= RedisConnection.create
end
server?() click to toggle source
# File lib/sidekiq.rb, line 155
def self.server?
  defined?(Sidekiq::CLI)
end
server_middleware() { |server_chain| ... } click to toggle source
# File lib/sidekiq.rb, line 215
def self.server_middleware
  @server_chain ||= default_server_middleware
  yield @server_chain if block_given?
  @server_chain
end
start_watchdog() click to toggle source
# File lib/sidekiq/systemd.rb, line 8
def self.start_watchdog
  usec = Integer(ENV["WATCHDOG_USEC"])
  return Sidekiq.logger.error("systemd Watchdog too fast: " + usec) if usec < 1_000_000

  sec_f = usec / 1_000_000.0
  # "It is recommended that a daemon sends a keep-alive notification message
  # to the service manager every half of the time returned here."
  ping_f = sec_f / 2
  Sidekiq.logger.info "Pinging systemd watchdog every #{ping_f.round(1)} sec"
  Thread.new do
    loop do
      sleep ping_f
      Sidekiq::SdNotify.watchdog
    end
  end
end
strict_args!(mode = :raise) click to toggle source
# File lib/sidekiq.rb, line 332
def self.strict_args!(mode = :raise)
  self[:on_complex_arguments] = mode
end
transactional_push!() click to toggle source
# File lib/sidekiq/transaction_aware_client.rb, line 33
def self.transactional_push!
  begin
    require "after_commit_everywhere"
  rescue LoadError
    Sidekiq.logger.error("You need to add after_commit_everywhere to your Gemfile to use Sidekiq's transactional client")
    raise
  end

  default_job_options["client_class"] = Sidekiq::TransactionAwareClient
  Sidekiq::JobUtil::TRANSIENT_ATTRIBUTES << "client_class"
  true
end
❨╯°□°❩╯︵┻━┻() click to toggle source
# File lib/sidekiq.rb, line 55
def self.❨╯°□°❩╯︵┻━┻
  puts "Calm down, yo."
end