class Strongman::Batch

Attributes

fulfilled[RW]
fulfilling[RW]
lock[RW]
name[RW]
parent[RW]

Public Class Methods

new(loader_block, name: nil, parent: nil, max_batch_size: Float::INFINITY) click to toggle source
# File lib/strongman.rb, line 17
def initialize(loader_block, name: nil, parent: nil, max_batch_size: Float::INFINITY)
  @name = name
  @queue = Concurrent::Array.new
  @promise = Concurrent::Promises.resolvable_future
  @loader_block = loader_block
  @lock = Concurrent::ReadWriteLock.new
  @parent = parent
  @children = Concurrent::Array.new
  @fulfilling = Concurrent::AtomicBoolean.new(false)
  @fulfilled = Concurrent::AtomicBoolean.new(false)
  @max_batch_size = max_batch_size

  @parent.children << self if @parent

  @root = nil
  @batch_chain = nil
end

Public Instance Methods

batch_chain() click to toggle source
# File lib/strongman.rb, line 123
def batch_chain
  if @batch_chain
    @batch_chain
  else
    @batch_chain = Concurrent::Array.new

    add_children = -> (batch) {
      @batch_chain << batch
      if batch.children.size > 0
        batch.children.flat_map(&add_children)
      end
    }
    add_children.(root)

    @batch_chain
  end
end
children() click to toggle source
# File lib/strongman.rb, line 171
def children
  @children ||= Concurrent::Array.new
end
fulfill!() click to toggle source
# File lib/strongman.rb, line 155
def fulfill!
  results = @loader_block.call(@queue)

  if results.is_a?(Concurrent::Promises::Future)
    # if the strongman loader block returns a promise (e.g. if the block uses another loader),
    # make sure to touch it to kick off any delayed effects before chaining
    results.touch.then do |inner_results|
      @promise.fulfill(normalize_results(inner_results))
    end.flat
  else
    @promise.fulfill(normalize_results(results))
  end

  self
end
fulfill_hierarchy() click to toggle source
# File lib/strongman.rb, line 141
def fulfill_hierarchy
  raise Error.new("Only run #fulfill_hierarchy on root batches") if @parent

  with_lock do
    return if fulfilled?

    mark_fulfilling!
    batch_chain.reverse.each(&:fulfill!)
  ensure
    mark_fulfilled!
    mark_not_fulfilling!
  end
end
fulfilled?() click to toggle source
# File lib/strongman.rb, line 35
def fulfilled?
  root.fulfilled.true?
end
fulfilling?() click to toggle source
# File lib/strongman.rb, line 39
def fulfilling?
  root.fulfilling.true?
end
mark_fulfilled!() click to toggle source
# File lib/strongman.rb, line 86
def mark_fulfilled!
  root.fulfilled.make_true
  self
end
mark_fulfilling!() click to toggle source
# File lib/strongman.rb, line 91
def mark_fulfilling!
  root.fulfilling.make_true
  self
end
mark_not_fulfilling!() click to toggle source
# File lib/strongman.rb, line 96
def mark_not_fulfilling!
  root.fulfilling.make_false
  self
end
needs_fulfilling?() click to toggle source
# File lib/strongman.rb, line 43
def needs_fulfilling?
  !fulfilled? && !fulfilling?
end
queue(key) click to toggle source
# File lib/strongman.rb, line 47
def queue(key)
  @queue << key

  future = @promise.then do |results|
    unless results.key?(key)
      raise StandardError, "Batch loader didn't resolve a key: #{key}. Resolved keys: #{results.keys}"
    end

    result = results[key]

    if result.is_a?(Concurrent::Promises::Future)
      result
    else
      Concurrent::Promises.resolvable_future.fulfill(result)
    end
  end.flat

  #
  # If our queue is full, fulfill immediately and return the bare future
  #
  if @queue.size >= @max_batch_size
    root.fulfill_hierarchy

    future
  else
    #
    # If the queue is not full, create a delayed future that fulfills when the value is requested and chains
    # to the inner future
    #
    Concurrent::Promises.delay do
      # with_lock do
      root.fulfill_hierarchy if root.needs_fulfilling?
      # end

      future
    end.flat
  end
end
root() click to toggle source
# File lib/strongman.rb, line 107
def root
  if @root
    @root
  else
    find_top = -> (batch) {
      if batch.parent
        find_top.(batch.parent)
      else
        batch
      end
    }

    @root = find_top.(self)
  end
end
with_lock() { || ... } click to toggle source
# File lib/strongman.rb, line 101
def with_lock
  root.lock.with_write_lock do
    yield
  end
end

Private Instance Methods

normalize_results(results) click to toggle source
# File lib/strongman.rb, line 177
def normalize_results(results)
  unless results.is_a?(Array) || results.is_a?(Hash)
    raise TypeError, "Batch loader must return an Array or Hash, but returned: #{results.class.name}"
  end

  if @queue.size != results.size
    raise StandardError, "Batch loader must be instantiated with function that returns Array or Hash " \
      "of the same size as provided to it Array of keys" \
      "\n\nProvided keys:\n#{@queue}" \
      "\n\nReturned values:\n#{results}"
  end

  if results.is_a?(Array)
    Hash[@queue.zip(results)]
  else
    results.is_a?(Hash)
    results
  end
end