class Sidekiq::Batching::Batch
Attributes
name[R]
queue[R]
worker_class[R]
Public Class Methods
new(worker_class, queue, redis_pool = nil)
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 5 def initialize(worker_class, queue, redis_pool = nil) @worker_class = worker_class @queue = queue @name = "#{worker_class.underscore}:#{queue}" @redis = Sidekiq::Batching::Redis.new end
Private Class Methods
all()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 112 def all redis = Sidekiq::Batching::Redis.new redis.batches.map do |name| new(*extract_worker_klass_and_queue(name)) end end
extract_worker_klass_and_queue(name)
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 120 def extract_worker_klass_and_queue(name) klass, queue = name.split(':') [klass.classify, queue] end
Public Instance Methods
add(msg)
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 14 def add(msg) msg = msg.to_json @redis.push_msg(@name, msg, enqueue_similar_once?) if should_add? msg end
chunk_size()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 28 def chunk_size worker_class_options['batch_size'] || Sidekiq::Batching::Config.max_batch_size end
could_flush?()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 61 def could_flush? could_flush_on_overflow? || could_flush_on_time? end
delete()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 77 def delete @redis.delete(@name) end
flush()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 39 def flush chunk = pluck if chunk set_current_time_as_last Sidekiq::Client.push( 'class' => @worker_class, 'queue' => @queue, 'args' => [true, chunk] ) end end
last_execution_time()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 65 def last_execution_time last_time = @redis.get_last_execution_time(@name) Time.parse(last_time) if last_time end
next_execution_time()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 70 def next_execution_time if interval = worker_class_options['batch_flush_interval'] last_time = last_execution_time last_time + interval.seconds if last_time end end
pluck()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 33 def pluck if @redis.lock(@name) @redis.pluck(@name, chunk_size).map { |value| JSON.parse(value) } end end
should_add?(msg)
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 19 def should_add? msg return true unless enqueue_similar_once? !@redis.enqueued?(@name, msg) end
size()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 24 def size @redis.batch_size(@name) end
worker_class_constant()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 51 def worker_class_constant @worker_class.constantize end
worker_class_options()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 55 def worker_class_options worker_class_constant.get_sidekiq_options rescue NameError {} end
Private Instance Methods
could_flush_on_overflow?()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 82 def could_flush_on_overflow? worker_class_options['batch_size'] && size >= worker_class_options['batch_size'] end
could_flush_on_time?()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 87 def could_flush_on_time? return false if size.zero? last_time = last_execution_time next_time = next_execution_time if last_time.blank? set_current_time_as_last false else if next_time next_time < Time.now end end end
enqueue_similar_once?()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 103 def enqueue_similar_once? worker_class_options['batch_unique'] == true end
set_current_time_as_last()
click to toggle source
# File lib/sidekiq/batching/batch.rb, line 107 def set_current_time_as_last @redis.set_last_execution_time(@name, Time.now) end