class Shoryuken::Batch

Constants

BID_EXPIRE_TTL
VERSION

Attributes

bid[R]
callback_queue[R]
created_at[R]
description[R]

Public Class Methods

new(existing_bid = nil) click to toggle source
# File lib/shoryuken/batch.rb, line 46
def initialize(existing_bid = nil)
  @bid = existing_bid || SecureRandom.urlsafe_base64(10)
  @existing = !(!existing_bid || existing_bid.empty?) # Basically existing_bid.present?
  @initialized = false
  @created_at = Time.now.utc.to_f
  @bidkey = 'BID-' + @bid.to_s
  @ready_to_queue = []
end

Private Class Methods

enqueue_callbacks(event, bid) click to toggle source
# File lib/shoryuken/batch.rb, line 203
def enqueue_callbacks(event, bid)
  batch_key = "BID-#{bid}"
  callback_key = "#{batch_key}-callbacks-#{event}"

  callbacks, queue, parent_bid = Shoryuken.redis do |r|
    r.multi do
      r.smembers(callback_key)
      r.hget(batch_key, 'callback_queue')
      r.hget(batch_key, 'parent_bid')
    end
  end
  return if callbacks.empty?

  parent_bid = !parent_bid || parent_bid.empty? ? nil : parent_bid # Basically parent_bid.blank?

  options ||= {}
  options[:message_attributes] ||= {}
  options[:message_attributes]['shoryuken_class'] = {
    string_value: 'Shoryuken::Batch::Callback::Worker',
    data_type: 'String'
  }

  callbacks.each do |jcb|
    cb = JSON.parse(jcb)

    options[:message_body] = {
      'event' => event,
      'bid' => bid,
      'parent_bid' => parent_bid,
      'job_class' => cb['callback'],
      'arguments' => cb['opts']
    }

    Shoryuken::Client.queues(queue).send_message(options)
  end
end
process_failed_job(bid, jid) click to toggle source
# File lib/shoryuken/batch.rb, line 163
def process_failed_job(bid, jid)
  _, pending, failed, children, complete = Shoryuken.redis do |r|
    r.multi do
      r.sadd("BID-#{bid}-failed", jid)

      r.hincrby("BID-#{bid}", 'pending', 0)
      r.scard("BID-#{bid}-failed")
      r.hincrby("BID-#{bid}", 'children', 0)
      r.scard("BID-#{bid}-complete")

      r.expire("BID-#{bid}-failed", BID_EXPIRE_TTL)
    end
  end

  enqueue_callbacks(:complete, bid) if pending.to_i == failed.to_i && children == complete
end
process_successful_job(bid, jid) click to toggle source
# File lib/shoryuken/batch.rb, line 180
def process_successful_job(bid, jid)
  failed, pending, children, complete, success, _total, parent_bid = Shoryuken.redis do |r|
    r.multi do
      r.scard("BID-#{bid}-failed")
      r.hincrby("BID-#{bid}", 'pending', -1)
      r.hincrby("BID-#{bid}", 'children', 0)
      r.scard("BID-#{bid}-complete")
      r.scard("BID-#{bid}-success")
      r.hget("BID-#{bid}", 'total')
      r.hget("BID-#{bid}", 'parent_bid')

      r.srem("BID-#{bid}-failed", jid)
      r.srem("BID-#{bid}-jids", jid)
      r.expire("BID-#{bid}", BID_EXPIRE_TTL)
    end
  end

  Shoryuken.logger.info "done: #{jid} in batch #{bid}"

  enqueue_callbacks(:complete, bid) if pending.to_i == failed.to_i && children == complete
  enqueue_callbacks(:success, bid) if pending.to_i.zero? && children == success
end

Public Instance Methods

callback_queue=(callback_queue) click to toggle source
# File lib/shoryuken/batch.rb, line 60
def callback_queue=(callback_queue)
  @callback_queue = callback_queue
  persist_bid_attr('callback_queue', callback_queue)
end
description=(description) click to toggle source
# File lib/shoryuken/batch.rb, line 55
def description=(description)
  @description = description
  persist_bid_attr('description', description)
end
increment_job_queue(jid) click to toggle source
# File lib/shoryuken/batch.rb, line 132
def increment_job_queue(jid)
  @ready_to_queue << jid
end
jobs() { || ... } click to toggle source
# File lib/shoryuken/batch.rb, line 78
def jobs
  raise NoBlockGivenError unless block_given?

  bid_data, Thread.current[:bid_data] = Thread.current[:bid_data], []

  begin
    if !@existing && !@initialized
      parent_bid = Thread.current[:bid].bid if Thread.current[:bid]

      Shoryuken.redis do |r|
        r.multi do
          r.hset(@bidkey, 'created_at', @created_at)
          r.hset(@bidkey, 'parent_bid', parent_bid.to_s) if parent_bid
          r.expire(@bidkey, BID_EXPIRE_TTL)
        end
      end

      @initialized = true
    end

    @ready_to_queue = []

    begin
      parent = Thread.current[:bid]
      Thread.current[:bid] = self
      yield
    ensure
      Thread.current[:bid] = parent
    end

    return [] if @ready_to_queue.empty?

    Shoryuken.redis do |r|
      r.multi do
        if parent_bid
          r.hincrby("BID-#{parent_bid}", 'children', 1)
          r.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
        end

        r.hincrby(@bidkey, 'pending', @ready_to_queue.size)
        r.hincrby(@bidkey, 'total', @ready_to_queue.size)
        r.expire(@bidkey, BID_EXPIRE_TTL)

        r.sadd(@bidkey + '-jids', @ready_to_queue)
        r.expire(@bidkey + '-jids', BID_EXPIRE_TTL)
      end
    end

    @ready_to_queue
  ensure
    Thread.current[:bid_data] = bid_data
  end
end
on(event, callback, options = {}) click to toggle source
# File lib/shoryuken/batch.rb, line 65
def on(event, callback, options = {})
  return unless %w[success complete].include?(event.to_s)
  callback_key = "#{@bidkey}-callbacks-#{event}"

  Shoryuken.redis do |r|
    r.multi do
      r.sadd(callback_key, JSON.unparse(callback: callback,
                                        opts: options))
      r.expire(callback_key, BID_EXPIRE_TTL)
    end
  end
end
parent() click to toggle source
# File lib/shoryuken/batch.rb, line 142
def parent
  Shoryuken::Batch.new(parent_bid) if parent_bid
end
parent_bid() click to toggle source
# File lib/shoryuken/batch.rb, line 136
def parent_bid
  Shoryuken.redis do |r|
    r.hget(@bidkey, 'parent_bid')
  end
end
valid?(batch = self) click to toggle source
# File lib/shoryuken/batch.rb, line 146
def valid?(batch = self)
  valid = !Shoryuken.redis { |r| r.exists("invalidated-bid-#{batch.bid}") }
  batch.parent ? valid && valid?(batch.parent) : valid
end

Private Instance Methods

persist_bid_attr(attribute, value) click to toggle source
# File lib/shoryuken/batch.rb, line 153
def persist_bid_attr(attribute, value)
  Shoryuken.redis do |r|
    r.multi do
      r.hset(@bidkey, attribute, value)
      r.expire(@bidkey, BID_EXPIRE_TTL)
    end
  end
end