module Attr::Gather::Workflow::Callable
@api private
Public Instance Methods
call(input)
click to toggle source
Execute a workflow
When executing the workflow, tasks are processed in dependant order, with the outputs of each batch being fed as inputs to the next batch. This means the you can enhance the data as the task moves through a workflow, so later tasks can use the enhanced input data.
@example
enhancer = MyEnhancingWorkflow.new enhancer.call(user_id: 1).value! # => {user_id: 1, email: 't@t.co}
@param input [Hash]
@return [Concurrent::Promise<Hash>]
@note For more information, check out {dry-rb.org/gems/dry-monads/1.0/result}
@api public
# File lib/attr/gather/workflow/callable.rb, line 28 def call(input) task_promises = {} final_results = self.class.tasks.to_a.map do |task| task_promises[task] = execute_task(input, task, task_promises) end Concurrent::Promise.zip(*final_results).then do |results| aggregator.call(input, results) end end
Private Instance Methods
aggregator()
click to toggle source
@api private
# File lib/attr/gather/workflow/callable.rb, line 64 def aggregator return @aggregator if defined?(@aggregator) && !@aggregator.nil? @aggregator = self.class.aggregator @aggregator.filter ||= filter if @aggregator.respond_to?(:filter=) @aggregator end
container()
click to toggle source
@api private
# File lib/attr/gather/workflow/callable.rb, line 59 def container self.class.container end
execute_task(initial_input, task, task_promises)
click to toggle source
Executes a batch of tasks
@return [Array<TaskExecutionResult>]
@api private
# File lib/attr/gather/workflow/callable.rb, line 47 def execute_task(initial_input, task, task_promises) task_proc = container.resolve(task.name) dep_promises = task.depends_on.map { |t| task_promises[t] } input_promise = Concurrent::Promise.zip(*dep_promises) input_promise.then do |results| dep_input = aggregator.call(initial_input, results) task_proc.call(dep_input) end end
filter()
click to toggle source
@api private
# File lib/attr/gather/workflow/callable.rb, line 74 def filter self.class.filter end