class ArcFurnace::Pipeline::PipelineInstance

Attributes

dsl_class[R]
error_handler[R]
intermediates_map[R]
params[R]
sink_node[R]
sink_source[R]

Public Class Methods

new(dsl_class, error_handler: ErrorHandler.new, **params) click to toggle source
# File lib/arc-furnace/pipeline.rb, line 139
def initialize(dsl_class, error_handler: ErrorHandler.new, **params)
  @dsl_class = dsl_class
  @params = params
  @intermediates_map = {}
  @error_handler = error_handler
end

Public Instance Methods

execute() click to toggle source
# File lib/arc-furnace/pipeline.rb, line 146
def execute
  build
  prepare
  run
end

Private Instance Methods

build() click to toggle source
# File lib/arc-furnace/pipeline.rb, line 171
def build
  dsl_class.intermediates_map.each do |key, instance|
    intermediates_map[key] = instance_exec(&instance) if instance
  end
  @sink_node = instance_exec(&dsl_class.sink_node)
  @sink_source = intermediates_map[dsl_class.sink_source]
end
create_instance_with_error_handling(type, resolved_params, key_parameters) click to toggle source
# File lib/arc-furnace/pipeline.rb, line 199
def create_instance_with_error_handling(type, resolved_params, key_parameters)
  type.new(resolved_params.slice(*key_parameters))
rescue CSV::MalformedCSVError
  raise "File #{resolved_params[:filename]} cannot be processed."
end
prepare() click to toggle source
# File lib/arc-furnace/pipeline.rb, line 162
def prepare
  intermediates_map.each do |node_id, instance|
    instance.error_handler = error_handler
    instance.node_id = node_id
    instance.prepare
  end
  sink_node.prepare
end
resolve_parameter(node_id, key) click to toggle source
# File lib/arc-furnace/pipeline.rb, line 195
def resolve_parameter(node_id, key)
  self.params[key] || self.intermediates_map[key] || (raise "When processing node #{node_id}: Unknown key #{key}!")
end
resolve_parameters(node_id, params_to_resolve) click to toggle source
# File lib/arc-furnace/pipeline.rb, line 179
def resolve_parameters(node_id, params_to_resolve)
  params_to_resolve.each_with_object({}) do |(key, value), result|
    result[key] =
      if key == :sources
        value.map { |_value| resolve_parameter(node_id, _value)  }
      elsif value.is_a?(Symbol)
        # Allow resolution of intermediates
        resolve_parameter(node_id, value)
      elsif value.nil?
        resolve_parameter(node_id, key)
      else
        value
      end
  end
end
run() click to toggle source
# File lib/arc-furnace/pipeline.rb, line 154
def run
  while (row = sink_source.row)
    sink_node.row(row)
  end
  intermediates_map.each { |_, instance| instance.finalize }
  sink_node.finalize
end