class NxtPipeline::Pipeline

Attributes

constructors[R]
current_arg[RW]
current_step[RW]
default_constructor_name[RW]
error_callbacks[R]
logger[RW]
step_resolvers[R]
steps[RW]

Public Class Methods

execute(**opts, &block) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 3
def self.execute(**opts, &block)
  new(&block).execute(**opts)
end
new(step_resolvers = default_step_resolvers, &block) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 7
def initialize(step_resolvers = default_step_resolvers, &block)
  @steps = []
  @error_callbacks = []
  @logger = Logger.new
  @current_step = nil
  @current_arg = nil
  @default_constructor_name = nil
  @constructors = {}
  @step_resolvers = step_resolvers

  configure(&block) if block_given?
end

Public Instance Methods

after_execution(&block) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 126
def after_execution(&block)
  callbacks.register([:after, :execution], block)
end
after_step(&block) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 114
def after_step(&block)
  callbacks.register([:after, :step], block)
end
around_execution(&block) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 130
def around_execution(&block)
  callbacks.register([:around, :execution], block)
end
around_step(&block) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 118
def around_step(&block)
  callbacks.register([:around, :step], block)
end
before_execution(&block) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 122
def before_execution(&block)
  callbacks.register([:before, :execution], block)
end
before_step(&block) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 110
def before_step(&block)
  callbacks.register([:before, :step], block)
end
call(**change_set, &block)
Alias for: execute
constructor(name, **opts, &constructor) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 24
def constructor(name, **opts, &constructor)
  name = name.to_sym
  raise StandardError, "Already registered step :#{name}" if constructors[name]

  constructors[name] = Constructor.new(name, **opts, &constructor)

  return unless opts.fetch(:default, false)
  set_default_constructor(name)
end
execute(**change_set, &block) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 71
def execute(**change_set, &block)
  reset

  configure(&block) if block_given?
  callbacks.run(:before, :execution, change_set)

  result = callbacks.around :execution, change_set do
    steps.inject(change_set) do |set, step|
      execute_step(step, **set)
    rescue StandardError => error
      decorate_error_with_details(error, set, step, logger)
      handle_error_of_step(error)
      set
    end
  end

  callbacks.run(:after, :execution, change_set)
  result
rescue StandardError => error
  handle_step_error(error)
end
Also aliased as: call
handle_step_error(error) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 95
def handle_step_error(error)
  log_step(current_step)
  callback = find_error_callback(error)

  raise unless callback

  callback.call(current_step, current_arg, error)
end
on_error(*errors, halt_on_error: true, &callback)
Alias for: on_errors
on_errors(*errors, halt_on_error: true, &callback) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 104
def on_errors(*errors, halt_on_error: true, &callback)
  error_callbacks << ErrorCallback.new(errors, halt_on_error, &callback)
end
Also aliased as: on_error
raise_duplicate_default_constructor() click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 43
def raise_duplicate_default_constructor
  raise ArgumentError, 'Default step already defined'
end
set_default_constructor(default_constructor) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 38
def set_default_constructor(default_constructor)
  raise_duplicate_default_constructor if default_constructor_name.present?
  self.default_constructor_name = default_constructor
end
step(argument = nil, **opts, &block) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 47
def step(argument = nil, **opts, &block)
  constructor = if block_given?
    # make type the :to_s of inline steps fall back to :inline if no type is given
    argument ||= :inline
    opts.reverse_merge!(to_s: argument)
    Constructor.new(:inline, **opts, &block)
  else
    constructor = step_resolvers.lazy.map do |resolver|
      resolver.call(argument)
    end.find(&:itself)

    if constructor
      constructor && constructors.fetch(constructor) { raise KeyError, "No step :#{argument} registered" }
    elsif default_constructor
      argument ||= default_constructor_name
      default_constructor
    else
      raise StandardError, "Could not resolve step from: #{argument}"
    end
  end

  register_step(argument, constructor, callbacks, **opts)
end
step_resolver(&block) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 34
def step_resolver(&block)
  step_resolvers << block
end

Private Instance Methods

callbacks() click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 136
def callbacks
  @callbacks ||= NxtPipeline::Callbacks.new(pipeline: self)
end
decorate_error_with_details(error, change_set, step, logger) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 182
def decorate_error_with_details(error, change_set, step, logger)
  error.define_singleton_method :details do
    OpenStruct.new(
      change_set: change_set,
      logger: logger,
      step: step
    )
  end
  error
end
default_constructor() click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 145
def default_constructor
  return unless default_constructor_name

  @default_constructor ||= constructors[default_constructor_name.to_sym]
end
default_step_resolvers() click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 178
def default_step_resolvers
  [->(step_argument) { step_argument.is_a?(Symbol) && step_argument }]
end
execute_step(step, **change_set) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 151
def execute_step(step, **change_set)
  self.current_step = step
  self.current_arg = change_set
  result = step.execute(**change_set)
  log_step(step)
  result || change_set
end
find_error_callback(error) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 159
def find_error_callback(error)
  error_callbacks.find { |callback| callback.applies_to_error?(error) }
end
handle_error_of_step(error) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 197
def handle_error_of_step(error)
  error_callback = find_error_callback(error)
  raise error unless error_callback.present? && error_callback.continue_after_error?

  log_step(current_step)
  raise error unless error_callback.present?

  error_callback.call(current_step, current_arg, error)
end
log_step(step) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 163
def log_step(step)
  return unless logger.respond_to?(:call)

  logger.call(step)
end
raise_reserved_type_inline_error() click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 174
def raise_reserved_type_inline_error
  raise ArgumentError, 'Type :inline is reserved for inline steps!'
end
register_step(argument, constructor, callbacks, **opts) click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 193
def register_step(argument, constructor, callbacks, **opts)
  steps << Step.new(argument, constructor, steps.count, self, callbacks, **opts)
end
reset() click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 169
def reset
  self.current_arg = nil
  self.current_step = nil
end