class Sidekiq::Grouping::Batch

Attributes

name[R]
queue[R]
worker_class[R]

Public Class Methods

new(worker_class, queue, _redis_pool = nil) click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 6
def initialize(worker_class, queue, _redis_pool = nil)
  @worker_class = worker_class
  @queue = queue
  @name = "#{worker_class.underscore}:#{queue}"
  @redis = Sidekiq::Grouping::Redis.new
end

Private Class Methods

all() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 126
def all
  redis = Sidekiq::Grouping::Redis.new

  redis.batches.map do |name|
    new(*extract_worker_klass_and_queue(name))
  end
end
extract_worker_klass_and_queue(name) click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 134
def extract_worker_klass_and_queue(name)
  klass, queue = name.split(":")
  [klass.camelize, queue]
end

Public Instance Methods

add(msg) click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 15
def add(msg)
  msg = msg.to_json
  return unless should_add? msg

  @redis.push_msg(
    @name,
    msg,
    remember_unique: enqueue_similar_once?
  )
end
chunk_size() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 36
def chunk_size
  worker_class_options["batch_size"] ||
    Sidekiq::Grouping::Config.max_batch_size
end
could_flush?() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 76
def could_flush?
  could_flush_on_overflow? || could_flush_on_time?
end
delete() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 93
def delete
  @redis.delete(@name)
end
flush() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 52
def flush
  chunk = pluck
  return unless chunk

  chunk.each_slice(chunk_size) do |subchunk|
    Sidekiq::Client.push(
      "class" => @worker_class,
      "queue" => @queue,
      "args" => [true, subchunk]
    )
  end
  set_current_time_as_last
end
last_execution_time() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 80
def last_execution_time
  last_time = @redis.get_last_execution_time(@name)
  Time.parse(last_time) if last_time
end
next_execution_time() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 85
def next_execution_time
  interval = worker_class_options["batch_flush_interval"]
  return unless interval

  last_time = last_execution_time
  last_time + interval.seconds if last_time
end
pluck() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 46
def pluck
  return unless @redis.lock(@name)

  @redis.pluck(@name, pluck_size).map { |value| JSON.parse(value) }
end
pluck_size() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 41
def pluck_size
  worker_class_options["batch_flush_size"] ||
    chunk_size
end
should_add?(msg) click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 26
def should_add?(msg)
  return true unless enqueue_similar_once?

  !@redis.enqueued?(@name, msg)
end
size() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 32
def size
  @redis.batch_size(@name)
end
worker_class_constant() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 66
def worker_class_constant
  @worker_class.constantize
end
worker_class_options() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 70
def worker_class_options
  worker_class_constant.get_sidekiq_options
rescue NameError
  {}
end

Private Instance Methods

could_flush_on_overflow?() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 99
def could_flush_on_overflow?
  size >= pluck_size
end
could_flush_on_time?() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 103
def could_flush_on_time?
  return false if size.zero?

  last_time = last_execution_time
  next_time = next_execution_time

  if last_time.blank?
    set_current_time_as_last
    false
  elsif next_time
    next_time < Time.now
  end
end
enqueue_similar_once?() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 117
def enqueue_similar_once?
  worker_class_options["batch_unique"] == true
end
set_current_time_as_last() click to toggle source
# File lib/sidekiq/grouping/batch.rb, line 121
def set_current_time_as_last
  @redis.set_last_execution_time(@name, Time.now)
end