class Sidekiq::Merger::Merge

Attributes

merge_key[R]
queue[R]
worker_class[R]

Public Class Methods

all() click to toggle source
# File lib/sidekiq/merger/merge.rb, line 6
def all
  redis = Sidekiq::Merger::Redis.new

  redis.all_merges.map { |full_merge_key| initialize_with_full_merge_key(full_merge_key, redis: redis) }
end
get_options(worker_class) click to toggle source
# File lib/sidekiq/merger/merge.rb, line 36
def get_options(worker_class)
  (worker_class.get_sidekiq_options["merger"] || {}).with_indifferent_access
end
initialize_with_args(worker_class, queue, args, options = {}) click to toggle source
# File lib/sidekiq/merger/merge.rb, line 21
def initialize_with_args(worker_class, queue, args, options = {})
  new(worker_class, queue, merge_key(worker_class, args), options)
end
initialize_with_full_merge_key(full_merge_key, options = {}) click to toggle source
# File lib/sidekiq/merger/merge.rb, line 12
def initialize_with_full_merge_key(full_merge_key, options = {})
  keys = full_merge_key.split(":")
  raise "Invalid merge key" if keys.size < 3
  worker_class = keys[0].camelize.constantize
  queue = keys[1]
  merge_key = keys[2]
  new(worker_class, queue, merge_key, options)
end
merge_key(worker_class, args) click to toggle source
# File lib/sidekiq/merger/merge.rb, line 25
def merge_key(worker_class, args)
  options = get_options(worker_class)
  merge_key = options["key"]
  if merge_key.respond_to?(:call)
    merge_key = merge_key.call(args)
  end
  merge_key = "" if merge_key.nil?
  merge_key = merge_key.to_json unless merge_key.is_a?(String)
  merge_key
end
new(worker_class, queue, merge_key, redis: Sidekiq::Merger::Redis.new) click to toggle source
# File lib/sidekiq/merger/merge.rb, line 43
def initialize(worker_class, queue, merge_key, redis: Sidekiq::Merger::Redis.new)
  @worker_class = worker_class
  @queue = queue
  @merge_key = merge_key
  @redis = redis
end

Public Instance Methods

==(other) click to toggle source
# File lib/sidekiq/merger/merge.rb, line 105
def ==(other)
  self.worker_class == other.worker_class &&
  self.queue == other.queue &&
  self.merge_key == other.merge_key
end
add(args, execution_time) click to toggle source
# File lib/sidekiq/merger/merge.rb, line 50
def add(args, execution_time)
  if !options[:unique] || !@redis.merge_exists?(full_merge_key, args)
    @redis.push_message(full_merge_key, args, execution_time)
  end
end
all_args() click to toggle source
# File lib/sidekiq/merger/merge.rb, line 97
def all_args
  @redis.get_merge(full_merge_key)
end
can_flush?() click to toggle source
# File lib/sidekiq/merger/merge.rb, line 89
def can_flush?
  !execution_time.nil? && execution_time < Time.now
end
delete(args) click to toggle source
# File lib/sidekiq/merger/merge.rb, line 56
def delete(args)
  @redis.delete_message(full_merge_key, args)
end
delete_all() click to toggle source
# File lib/sidekiq/merger/merge.rb, line 60
def delete_all
  @redis.delete_merge(full_merge_key)
end
execution_time() click to toggle source
# File lib/sidekiq/merger/merge.rb, line 101
def execution_time
  @execution_time ||= @redis.merge_execution_time(full_merge_key)
end
flush() click to toggle source
# File lib/sidekiq/merger/merge.rb, line 68
def flush
  msgs = []

  if @redis.lock_merge(full_merge_key, Sidekiq::Merger::Config.lock_ttl)
    msgs = @redis.pluck_merge(full_merge_key)
  end

  unless msgs.empty?
    batches = options[:batch_size].nil? ? [msgs] : msgs.each_slice(options[:batch_size].to_i).to_a
    batches.each do |batch_msgs|
      # preserve FIFO when enqueuing batches
      Sidekiq::Client.push(
        "class" => worker_class,
        "queue" => queue,
        "args" => batch_msgs,
        "merged" => true
      )
    end
  end
end
full_merge_key() click to toggle source
# File lib/sidekiq/merger/merge.rb, line 93
def full_merge_key
  @full_merge_key ||= [worker_class.name.to_s.underscore, queue, merge_key].join(":")
end
size() click to toggle source
# File lib/sidekiq/merger/merge.rb, line 64
def size
  @redis.merge_size(full_merge_key)
end

Private Instance Methods

options() click to toggle source
# File lib/sidekiq/merger/merge.rb, line 113
def options
  @options ||= self.class.get_options(worker_class)
rescue NameError
  {}
end