module Sidekiq::TrackableBatch::Persistance

@api private Interface for Redis persistance

Private Instance Methods

connection() { |connection| ... } click to toggle source
# File lib/sidekiq/trackable_batch/persistance.rb, line 105
def connection
  Sidekiq.redis do |connection|
    yield connection
  end
end
get_jobs(bid) click to toggle source
# File lib/sidekiq/trackable_batch/persistance.rb, line 17
def get_jobs(bid)
  connection { |c| c.smembers keys(bid)[:jobs] }
end
get_status(bid) click to toggle source
# File lib/sidekiq/trackable_batch/persistance.rb, line 13
def get_status(bid)
  connection { |c| c.hgetall keys(bid)[:status] }
end
keys(bid) click to toggle source
# File lib/sidekiq/trackable_batch/persistance.rb, line 96
def keys(bid)
  {
    jobs: "TB:#{bid}:JOBS",
    status: "TB:#{bid}:STATUS",
    update_listeners: "TB:#{bid}:UPDATE_LISTENERS",
    update_queue: "TB:#{bid}:UPDATE_QUEUE"
  }
end
persist_batch(batch) click to toggle source
# File lib/sidekiq/trackable_batch/persistance.rb, line 21
def persist_batch(batch)
  status = get_status(batch.bid)
  new_max = batch.messages.max_sum + status['max'].to_i

  connection do |c|
    keys = keys(batch.bid)
    c.multi do
      c.hset keys[:status], :max, new_max
      c.expire keys[:status], TTL

      unless batch.messages.empty?
        c.sadd keys[:jobs], batch.messages.to_json
        c.expire keys[:jobs], TTL
      end

      if batch.update_listeners
        c.set keys[:update_listeners], batch.update_listeners.to_json
        c.expire keys[:update_listeners], TTL
        if batch.update_queue
          c.set keys[:update_queue], batch.update_queue
          c.expire keys[:update_queue], TTL
        end
      end

      c.hgetall keys[:status]
    end
  end
end
update_status(batch, updates = {}) click to toggle source

Updates the status of a {TrackableBatch} with provided updates.

  • If the key `value` is provided the value will be incremented by that amount.

  • All other keys will create or replace a string value equal to the key's value.

Also enqueues any update callbacks that have been registered.

# File lib/sidekiq/trackable_batch/persistance.rb, line 54
def update_status(batch, updates = {})
  parent_bid = batch.parent_bid
  keys = if parent_bid
           [
             keys(parent_bid)[:status],
             keys(batch.bid)[:status],
             keys(parent_bid)[:update_listeners],
             keys(parent_bid)[:update_queue]
           ]
         else
           [
             nil,
             keys(batch.bid)[:status],
             keys(batch.bid)[:update_listeners],
             keys(batch.bid)[:update_queue]
           ]
         end

  update_queue, update_listeners = Sidekiq.load_json(
    connection do |c|
      c.evalsha(
        @@sha,
        keys: keys,
        argv: [updates.delete(:value), Sidekiq.dump_json(updates), TTL]
      )
    end
  )

  return unless update_listeners

  Thread.new do # clean thread
    Sidekiq::Client.push_bulk(
      'queue' => update_queue,
      'class' => Sidekiq::TrackableBatch::UpdateNotifier,
      'args' => update_listeners.map do |update_listener|
        target, args = update_listener.first
        [parent_bid || batch.bid, target, args]
      end
    )
  end.join
end