class Sidekiq::ActiveRecord::ManagerWorker

Constants

DEFAULT_BATCH_SIZE
DEFAULT_IDENTIFIER_KEY

Public Class Methods

additional_keys() click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 138
def additional_keys
  additional = manager_options[:additional_keys]
  additional.present? ? additional :  selected_attributes
end
batch_size() click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 143
def batch_size
  manager_options[:batch_size]
end
default_models_query(query) click to toggle source

The default of query to run, when the workers runs perform example

class UserManagerWorker < Sidekiq::ActiveRecord::ManagerWorker
  sidekiq_delegate_task_to UserTaskWorker
  default_models_query -> { User.active }
end

UserManagerWorker.perform_async(:batch_size => 300)
# File lib/sidekiq/active_record/manager_worker.rb, line 95
def default_models_query(query)
  @query = query
end
default_worker_manager_options() click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 103
def default_worker_manager_options
  {
      identifier_key: DEFAULT_IDENTIFIER_KEY,
      selected_attributes: [],
      additional_keys: [],
      batch_size: DEFAULT_BATCH_SIZE
  }
end
get_default_models_query() click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 99
def get_default_models_query
  @query.call() if @query.present?
end
get_sidekiq_manager_options() click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 151
def get_sidekiq_manager_options
  @sidekiq_manager_options_hash ||= default_worker_manager_options
end
identifier_key() click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 130
def identifier_key
  manager_options[:identifier_key]
end
manager_options() click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 147
def manager_options
  get_sidekiq_manager_options.merge(runtime_options)
end
model_attributes(model) click to toggle source

returns the model attributes array:

model_id, attr1, attr2, …
# File lib/sidekiq/active_record/manager_worker.rb, line 114
def model_attributes(model)
  additional_attributes = additional_keys.map { |key| model.send(key) }
  id_attribute = model.send(identifier_key)
  additional_attributes.unshift(id_attribute)
end
perform_query_async(models_query, options = {}) click to toggle source

For a given model collection, it delegates each model to a sub-worker (e.g TaskWorker) Specify the TaskWorker with the ‘sidekiq_delegate_task_to` method.

@param models_query ActiveRecord::Relation @param options Hash

:worker_class - the worker class to delegate the task to. Alternative to the default `sidekiq_delegate_task_to`
:identifier_key - the model identifier column. Default 'id'
:selected_attributes - the attributes to SELECT for models query
:additional_keys - additional model keys - defaults to :selected_attributes
:batch_size - Specifies the size of each batch to push in bulk.
              This is also the number of models to fetch in each find_in_batches query.
              Default is 1000.

@example:

class UserTaskWorker
  def perform(user_id)
    # user task logic
  end
end

class UserSyncer
  include Sidekiq::ActiveRecord::ManagerWorker

  sidekiq_delegate_task_to :user_task_worker # or UserTaskWorker
  sidekiq_manager_options :batch_size => 500,
                          :identifier_key => :user_token,
                          :selected_attributes => [:first_name, :last_name, :email],
                          :additional_keys => [:full_name, :email]
end

UserSyncer.perform_query_async(User.active, :batch_size => 300)

is equivalent to doing:

User.active.each {|user| UserTaskWorker.perform(user.id) }
# File lib/sidekiq/active_record/manager_worker.rb, line 53
def perform_query_async(models_query, options = {})
  set_runtime_options(options)
  models = prepare_models_query(models_query)
  models.find_in_batches(batch_size: batch_size) do |models_batch|
    model_attributes = models_batch.map { |model| model_attributes(model) }
    Sidekiq::Client.push_bulk(class: worker_class, args: model_attributes)
  end
end
prepare_models_query(models_query) click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 120
def prepare_models_query(models_query)
  selected_attrs = [models_query.primary_key.to_sym, identifier_key, selected_attributes].uniq
  models_query.select(selected_attrs)
end
runtime_options() click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 155
def runtime_options
  @sidekiq_manager_runtime_options || {}
end
selected_attributes() click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 134
def selected_attributes
  manager_options[:selected_attributes]
end
set_runtime_options(options={}) click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 159
def set_runtime_options(options={})
  @sidekiq_manager_runtime_options = options.delete_if { |_, v| v.to_s.strip == '' }
end
sidekiq_delegate_task_to(worker_klass) click to toggle source

@required The task worker to delegate to. @param worker_klass (Sidekiq::Worker, Symbol) - UserTaskWorker or :user_task_worker

# File lib/sidekiq/active_record/manager_worker.rb, line 65
def sidekiq_delegate_task_to(worker_klass)
  case worker_klass
  when String, Symbol
    worker_klass.to_s.split('_').map(&:capitalize).join.constantize
  else
    worker_klass
  end
  get_sidekiq_manager_options[:worker_class] = worker_klass
end
sidekiq_manager_options(opts = {}) click to toggle source

Allows customization for this type of ManagerWorker. Legal options:

:worker_class - the worker class to delegate the task to. Alternative to `sidekiq_delegate_task_to`
:identifier_key - the model identifier column. Default 'id'
:selected_attributes - the attributes to SELECT for models query
:additional_keys - additional model keys - defaults to :selected_attributes
:batch_size - Specifies the size of the batch. Default to 1000.
# File lib/sidekiq/active_record/manager_worker.rb, line 83
def sidekiq_manager_options(opts = {})
  @sidekiq_manager_options_hash = get_sidekiq_manager_options.merge((opts || {}))
end
worker_class() click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 125
def worker_class
  fail NotImplementedError.new('`worker_class` was not specified') unless manager_options[:worker_class].present?
  manager_options[:worker_class]
end

Public Instance Methods

perform(options = {}) click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 9
def perform(options = {})
  default_query = self.class.get_default_models_query
  self.class.perform_query_async(default_query, options)
end