class Sidekiq::Grouping::Batch
Attributes
name[R]
queue[R]
worker_class[R]
Public Class Methods
new(worker_class, queue, _redis_pool = nil)
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 6 def initialize(worker_class, queue, _redis_pool = nil) @worker_class = worker_class @queue = queue @name = "#{worker_class.underscore}:#{queue}" @redis = Sidekiq::Grouping::Redis.new end
Private Class Methods
all()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 126 def all redis = Sidekiq::Grouping::Redis.new redis.batches.map do |name| new(*extract_worker_klass_and_queue(name)) end end
extract_worker_klass_and_queue(name)
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 134 def extract_worker_klass_and_queue(name) klass, queue = name.split(":") [klass.camelize, queue] end
Public Instance Methods
add(msg)
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 15 def add(msg) msg = msg.to_json return unless should_add? msg @redis.push_msg( @name, msg, remember_unique: enqueue_similar_once? ) end
chunk_size()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 36 def chunk_size worker_class_options["batch_size"] || Sidekiq::Grouping::Config.max_batch_size end
could_flush?()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 76 def could_flush? could_flush_on_overflow? || could_flush_on_time? end
delete()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 93 def delete @redis.delete(@name) end
flush()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 52 def flush chunk = pluck return unless chunk chunk.each_slice(chunk_size) do |subchunk| Sidekiq::Client.push( "class" => @worker_class, "queue" => @queue, "args" => [true, subchunk] ) end set_current_time_as_last end
last_execution_time()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 80 def last_execution_time last_time = @redis.get_last_execution_time(@name) Time.parse(last_time) if last_time end
next_execution_time()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 85 def next_execution_time interval = worker_class_options["batch_flush_interval"] return unless interval last_time = last_execution_time last_time + interval.seconds if last_time end
pluck()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 46 def pluck return unless @redis.lock(@name) @redis.pluck(@name, pluck_size).map { |value| JSON.parse(value) } end
pluck_size()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 41 def pluck_size worker_class_options["batch_flush_size"] || chunk_size end
should_add?(msg)
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 26 def should_add?(msg) return true unless enqueue_similar_once? !@redis.enqueued?(@name, msg) end
size()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 32 def size @redis.batch_size(@name) end
worker_class_constant()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 66 def worker_class_constant @worker_class.constantize end
worker_class_options()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 70 def worker_class_options worker_class_constant.get_sidekiq_options rescue NameError {} end
Private Instance Methods
could_flush_on_overflow?()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 99 def could_flush_on_overflow? size >= pluck_size end
could_flush_on_time?()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 103 def could_flush_on_time? return false if size.zero? last_time = last_execution_time next_time = next_execution_time if last_time.blank? set_current_time_as_last false elsif next_time next_time < Time.now end end
enqueue_similar_once?()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 117 def enqueue_similar_once? worker_class_options["batch_unique"] == true end
set_current_time_as_last()
click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 121 def set_current_time_as_last @redis.set_last_execution_time(@name, Time.now) end