module Attr::Gather::Workflow::Callable

@api private

Public Instance Methods

call(input) click to toggle source

Execute a workflow

When executing the workflow, tasks are processed in dependant order, with the outputs of each batch being fed as inputs to the next batch. This means the you can enhance the data as the task moves through a workflow, so later tasks can use the enhanced input data.

@example

enhancer = MyEnhancingWorkflow.new
enhancer.call(user_id: 1).value! # => {user_id: 1, email: 't@t.co}

@param input [Hash]

@return [Concurrent::Promise<Hash>]

@note For more information, check out {dry-rb.org/gems/dry-monads/1.0/result}

@api public

# File lib/attr/gather/workflow/callable.rb, line 28
def call(input)
  task_promises = {}

  final_results = self.class.tasks.to_a.map do |task|
    task_promises[task] = execute_task(input, task, task_promises)
  end

  Concurrent::Promise.zip(*final_results).then do |results|
    aggregator.call(input, results)
  end
end

Private Instance Methods

aggregator() click to toggle source

@api private

# File lib/attr/gather/workflow/callable.rb, line 64
def aggregator
  return @aggregator if defined?(@aggregator) && !@aggregator.nil?

  @aggregator = self.class.aggregator
  @aggregator.filter ||= filter if @aggregator.respond_to?(:filter=)

  @aggregator
end
container() click to toggle source

@api private

# File lib/attr/gather/workflow/callable.rb, line 59
def container
  self.class.container
end
execute_task(initial_input, task, task_promises) click to toggle source

Executes a batch of tasks

@return [Array<TaskExecutionResult>]

@api private

# File lib/attr/gather/workflow/callable.rb, line 47
def execute_task(initial_input, task, task_promises)
  task_proc = container.resolve(task.name)
  dep_promises = task.depends_on.map { |t| task_promises[t] }
  input_promise = Concurrent::Promise.zip(*dep_promises)

  input_promise.then do |results|
    dep_input = aggregator.call(initial_input, results)
    task_proc.call(dep_input)
  end
end
filter() click to toggle source

@api private

# File lib/attr/gather/workflow/callable.rb, line 74
def filter
  self.class.filter
end