class Sidekiq::ActiveRecord::ManagerWorker
Constants
- DEFAULT_BATCH_SIZE
- DEFAULT_IDENTIFIER_KEY
Public Class Methods
additional_keys()
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 138 def additional_keys additional = manager_options[:additional_keys] additional.present? ? additional : selected_attributes end
batch_size()
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 143 def batch_size manager_options[:batch_size] end
default_models_query(query)
click to toggle source
The default of query to run, when the workers runs perform example
class UserManagerWorker < Sidekiq::ActiveRecord::ManagerWorker sidekiq_delegate_task_to UserTaskWorker default_models_query -> { User.active } end UserManagerWorker.perform_async(:batch_size => 300)
# File lib/sidekiq/active_record/manager_worker.rb, line 95 def default_models_query(query) @query = query end
default_worker_manager_options()
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 103 def default_worker_manager_options { identifier_key: DEFAULT_IDENTIFIER_KEY, selected_attributes: [], additional_keys: [], batch_size: DEFAULT_BATCH_SIZE } end
get_default_models_query()
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 99 def get_default_models_query @query.call() if @query.present? end
get_sidekiq_manager_options()
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 151 def get_sidekiq_manager_options @sidekiq_manager_options_hash ||= default_worker_manager_options end
identifier_key()
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 130 def identifier_key manager_options[:identifier_key] end
manager_options()
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 147 def manager_options get_sidekiq_manager_options.merge(runtime_options) end
model_attributes(model)
click to toggle source
returns the model attributes array:
- model_id, attr1, attr2, …
# File lib/sidekiq/active_record/manager_worker.rb, line 114 def model_attributes(model) additional_attributes = additional_keys.map { |key| model.send(key) } id_attribute = model.send(identifier_key) additional_attributes.unshift(id_attribute) end
perform_query_async(models_query, options = {})
click to toggle source
For a given model collection, it delegates each model to a sub-worker (e.g TaskWorker
) Specify the TaskWorker
with the ‘sidekiq_delegate_task_to` method.
@param models_query ActiveRecord::Relation @param options Hash
:worker_class - the worker class to delegate the task to. Alternative to the default `sidekiq_delegate_task_to` :identifier_key - the model identifier column. Default 'id' :selected_attributes - the attributes to SELECT for models query :additional_keys - additional model keys - defaults to :selected_attributes :batch_size - Specifies the size of each batch to push in bulk. This is also the number of models to fetch in each find_in_batches query. Default is 1000.
@example:
class UserTaskWorker def perform(user_id) # user task logic end end class UserSyncer include Sidekiq::ActiveRecord::ManagerWorker sidekiq_delegate_task_to :user_task_worker # or UserTaskWorker sidekiq_manager_options :batch_size => 500, :identifier_key => :user_token, :selected_attributes => [:first_name, :last_name, :email], :additional_keys => [:full_name, :email] end UserSyncer.perform_query_async(User.active, :batch_size => 300)
is equivalent to doing:
User.active.each {|user| UserTaskWorker.perform(user.id) }
# File lib/sidekiq/active_record/manager_worker.rb, line 53 def perform_query_async(models_query, options = {}) set_runtime_options(options) models = prepare_models_query(models_query) models.find_in_batches(batch_size: batch_size) do |models_batch| model_attributes = models_batch.map { |model| model_attributes(model) } Sidekiq::Client.push_bulk(class: worker_class, args: model_attributes) end end
prepare_models_query(models_query)
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 120 def prepare_models_query(models_query) selected_attrs = [models_query.primary_key.to_sym, identifier_key, selected_attributes].uniq models_query.select(selected_attrs) end
runtime_options()
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 155 def runtime_options @sidekiq_manager_runtime_options || {} end
selected_attributes()
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 134 def selected_attributes manager_options[:selected_attributes] end
set_runtime_options(options={})
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 159 def set_runtime_options(options={}) @sidekiq_manager_runtime_options = options.delete_if { |_, v| v.to_s.strip == '' } end
sidekiq_delegate_task_to(worker_klass)
click to toggle source
@required The task worker to delegate to. @param worker_klass (Sidekiq::Worker, Symbol) - UserTaskWorker or :user_task_worker
# File lib/sidekiq/active_record/manager_worker.rb, line 65 def sidekiq_delegate_task_to(worker_klass) case worker_klass when String, Symbol worker_klass.to_s.split('_').map(&:capitalize).join.constantize else worker_klass end get_sidekiq_manager_options[:worker_class] = worker_klass end
sidekiq_manager_options(opts = {})
click to toggle source
Allows customization for this type of ManagerWorker
. Legal options:
:worker_class - the worker class to delegate the task to. Alternative to `sidekiq_delegate_task_to` :identifier_key - the model identifier column. Default 'id' :selected_attributes - the attributes to SELECT for models query :additional_keys - additional model keys - defaults to :selected_attributes :batch_size - Specifies the size of the batch. Default to 1000.
# File lib/sidekiq/active_record/manager_worker.rb, line 83 def sidekiq_manager_options(opts = {}) @sidekiq_manager_options_hash = get_sidekiq_manager_options.merge((opts || {})) end
worker_class()
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 125 def worker_class fail NotImplementedError.new('`worker_class` was not specified') unless manager_options[:worker_class].present? manager_options[:worker_class] end
Public Instance Methods
perform(options = {})
click to toggle source
# File lib/sidekiq/active_record/manager_worker.rb, line 9 def perform(options = {}) default_query = self.class.get_default_models_query self.class.perform_query_async(default_query, options) end