class ArcFurnace::DSL

Constants

ALLOWABLE_PARAM_TYPES

Public Class Methods

filter(node_id, type: BlockFilter, params: {}, &block) click to toggle source

Define a node that filters rows. By default you get a BlockFilter (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block determines if a given row flows to a downstream node

# File lib/arc-furnace/pipeline.rb, line 90
def self.filter(node_id, type: BlockFilter, params: {}, &block)
  if block_given? && type <= BlockFilter
    params[:block] = block
  end
  raise "Filter #{type} is not a Filter!" unless type <= Filter
  define_intermediate(node_id, type: type, params: params)
end
hash_node(node_id, type: ArcFurnace::Hash, params:) click to toggle source

Define a hash node, processing all rows from it's source and caching them in-memory.

# File lib/arc-furnace/pipeline.rb, line 33
def self.hash_node(node_id, type: ArcFurnace::Hash, params:)
  define_intermediate(node_id, type: type, params: params)
end
inherited(subclass) click to toggle source

Ensure that subclasses don't overwrite the parent's transform node definitions

# File lib/arc-furnace/pipeline.rb, line 13
def self.inherited(subclass)
  subclass.intermediates_map = intermediates_map.dup
end
inner_join(node_id, type: ArcFurnace::InnerJoin, params:) click to toggle source

Define an inner join node where rows from the source are dropped if an associated entity is not found in the hash for the join key

# File lib/arc-furnace/pipeline.rb, line 46
def self.inner_join(node_id, type: ArcFurnace::InnerJoin, params:)
  define_intermediate(node_id, type: type, params: params)
end
instance(params = {}) click to toggle source

Create an instance to run a transformation, passing the parameters to instantiate the transform instance with. The resulting class instance will have a single public method–#execute, which will perform the transformation.

# File lib/arc-furnace/pipeline.rb, line 114
def self.instance(params = {})
  @params = params
  PipelineInstance.new(self, params)
end
merge(node_id, type: ArcFurnace::Merge, params:) click to toggle source

Define a merge node where rows from multiple source nodes are merged into a single row

# File lib/arc-furnace/pipeline.rb, line 70
def self.merge(node_id, type: ArcFurnace::Merge, params:)
  define_intermediate(node_id, type: type, params: params)
end
observer(node_id, type: BlockObserver, params: {}, &block) click to toggle source

Define a node that observes rows. By default you get a BlockObserver (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block is ignored; all rows are forwarded to the next node in the line

# File lib/arc-furnace/pipeline.rb, line 102
def self.observer(node_id, type: BlockObserver, params: {}, &block)
  if block_given? && type <= BlockObserver
    params[:block] = block
  end
  raise "Observer #{type} is not an Observer!" unless type <= Observer
  define_intermediate(node_id, type: type, params: params)
end
outer_join(node_id, type: ArcFurnace::OuterJoin, params:) click to toggle source

Define an outer join node where rows from the source are kept even if an associated entity is not found in the hash for the join key

# File lib/arc-furnace/pipeline.rb, line 52
def self.outer_join(node_id, type: ArcFurnace::OuterJoin, params:)
  define_intermediate(node_id, type: type, params: params)
end
sink(type:, source:, params:) click to toggle source

Define the sink for this transformation. Only a single sink may be specified per transformation. The sink is delivered a hash per row or entity, and feeds them from the graph of nodes above it.

# File lib/arc-furnace/pipeline.rb, line 20
def self.sink(type:, source:, params:)
  if sink_node
    raise 'Sink already defined!'
  end

  @sink_node = -> do
    type.new(resolve_parameters(:sink, params))
  end
  @sink_source = source
end
source(node_id, type:, params:) click to toggle source

A source that has row semantics, delivering a hash per row (or per entity) for the source.

# File lib/arc-furnace/pipeline.rb, line 39
def self.source(node_id, type:, params:)
  raise "Source #{type} is not a Source!" unless type <= Source
  define_intermediate(node_id, type: type, params: params)
end
transform(node_id, type: BlockTransform, params: {}, &block) click to toggle source

Define a node that transforms rows. By default you get a BlockTransform (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the row for the next downstream node.

# File lib/arc-furnace/pipeline.rb, line 60
def self.transform(node_id, type: BlockTransform, params: {}, &block)
  if block_given? && type <= BlockTransform
    params[:block] = block
  end
  raise "Transform #{type} is not a Transform!" unless type <= Transform
  define_intermediate(node_id, type: type, params: params)
end
unfold(node_id, type: BlockUnfold, params: {}, &block) click to toggle source

Define a node that unfolds rows. By default you get a BlockUnfold (and when this metaprogramming method is passed a block) that will be passed a hash for each row. The result of the block becomes the set of rows for the next downstream node.

# File lib/arc-furnace/pipeline.rb, line 78
def self.unfold(node_id, type: BlockUnfold, params: {}, &block)
  if block_given? && type <= BlockUnfold
    params[:block] = block
  end
  raise "Unfold #{type} is not an Unfold!" unless type <= Unfold
  define_intermediate(node_id, type: type, params: params)
end

Private Class Methods

define_intermediate(node_id, type:, params:) click to toggle source
# File lib/arc-furnace/pipeline.rb, line 123
def self.define_intermediate(node_id, type:, params:)
  intermediates_map[node_id] = -> do
    resolved_params = resolve_parameters(node_id, params)
    key_parameters = type.instance_method(:initialize).parameters do |param|
      ALLOWABLE_PARAM_TYPES.include?(param.first)
    end.map(&:second)
    # Allow params to be passed that are not in the initializer
    instance = create_instance_with_error_handling(type, resolved_params, key_parameters)
    instance.params = resolved_params
    instance
  end
end