class Sidekiq::Batching::Batch

Attributes

name[R]
queue[R]
worker_class[R]

Public Class Methods

new(worker_class, queue, redis_pool = nil) click to toggle source
# File lib/sidekiq/batching/batch.rb, line 5
def initialize(worker_class, queue, redis_pool = nil)
  @worker_class = worker_class
  @queue = queue
  @name = "#{worker_class.underscore}:#{queue}"
  @redis = Sidekiq::Batching::Redis.new
end

Private Class Methods

all() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 112
def all
  redis = Sidekiq::Batching::Redis.new

  redis.batches.map do |name|
    new(*extract_worker_klass_and_queue(name))
  end
end
extract_worker_klass_and_queue(name) click to toggle source
# File lib/sidekiq/batching/batch.rb, line 120
def extract_worker_klass_and_queue(name)
  klass, queue = name.split(':')
  [klass.classify, queue]
end

Public Instance Methods

add(msg) click to toggle source
# File lib/sidekiq/batching/batch.rb, line 14
def add(msg)
  msg = msg.to_json
  @redis.push_msg(@name, msg, enqueue_similar_once?) if should_add? msg
end
chunk_size() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 28
def chunk_size
  worker_class_options['batch_size'] ||
  Sidekiq::Batching::Config.max_batch_size
end
could_flush?() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 61
def could_flush?
  could_flush_on_overflow? || could_flush_on_time?
end
delete() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 77
def delete
  @redis.delete(@name)
end
flush() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 39
def flush
  chunk = pluck
  if chunk
    set_current_time_as_last
    Sidekiq::Client.push(
      'class' => @worker_class,
      'queue' => @queue,
      'args' => [true, chunk]
    )
  end
end
last_execution_time() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 65
def last_execution_time
  last_time = @redis.get_last_execution_time(@name)
  Time.parse(last_time) if last_time
end
next_execution_time() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 70
def next_execution_time
  if interval = worker_class_options['batch_flush_interval']
    last_time = last_execution_time
    last_time + interval.seconds if last_time
  end
end
pluck() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 33
def pluck
  if @redis.lock(@name)
    @redis.pluck(@name, chunk_size).map { |value| JSON.parse(value) }
  end
end
should_add?(msg) click to toggle source
# File lib/sidekiq/batching/batch.rb, line 19
def should_add? msg
  return true unless enqueue_similar_once?
  !@redis.enqueued?(@name, msg)
end
size() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 24
def size
  @redis.batch_size(@name)
end
worker_class_constant() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 51
def worker_class_constant
  @worker_class.constantize
end
worker_class_options() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 55
def worker_class_options
  worker_class_constant.get_sidekiq_options
rescue NameError
  {}
end

Private Instance Methods

could_flush_on_overflow?() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 82
def could_flush_on_overflow?
  worker_class_options['batch_size'] &&
  size >= worker_class_options['batch_size']
end
could_flush_on_time?() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 87
def could_flush_on_time?
  return false if size.zero?

  last_time = last_execution_time
  next_time = next_execution_time

  if last_time.blank?
    set_current_time_as_last
    false
  else
    if next_time
      next_time < Time.now
    end
  end
end
enqueue_similar_once?() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 103
def enqueue_similar_once?
  worker_class_options['batch_unique'] == true
end
set_current_time_as_last() click to toggle source
# File lib/sidekiq/batching/batch.rb, line 107
def set_current_time_as_last
  @redis.set_last_execution_time(@name, Time.now)
end