class NxtPipeline::Pipeline
Attributes
constructors[R]
current_arg[RW]
current_step[RW]
default_constructor_name[RW]
error_callbacks[R]
logger[RW]
step_resolvers[R]
steps[RW]
Public Class Methods
execute(**opts, &block)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 3 def self.execute(**opts, &block) new(&block).execute(**opts) end
new(step_resolvers = default_step_resolvers, &block)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 7 def initialize(step_resolvers = default_step_resolvers, &block) @steps = [] @error_callbacks = [] @logger = Logger.new @current_step = nil @current_arg = nil @default_constructor_name = nil @constructors = {} @step_resolvers = step_resolvers configure(&block) if block_given? end
Public Instance Methods
after_execution(&block)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 126 def after_execution(&block) callbacks.register([:after, :execution], block) end
after_step(&block)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 114 def after_step(&block) callbacks.register([:after, :step], block) end
around_execution(&block)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 130 def around_execution(&block) callbacks.register([:around, :execution], block) end
around_step(&block)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 118 def around_step(&block) callbacks.register([:around, :step], block) end
before_execution(&block)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 122 def before_execution(&block) callbacks.register([:before, :execution], block) end
before_step(&block)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 110 def before_step(&block) callbacks.register([:before, :step], block) end
constructor(name, **opts, &constructor)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 24 def constructor(name, **opts, &constructor) name = name.to_sym raise StandardError, "Already registered step :#{name}" if constructors[name] constructors[name] = Constructor.new(name, **opts, &constructor) return unless opts.fetch(:default, false) set_default_constructor(name) end
execute(**change_set, &block)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 71 def execute(**change_set, &block) reset configure(&block) if block_given? callbacks.run(:before, :execution, change_set) result = callbacks.around :execution, change_set do steps.inject(change_set) do |set, step| execute_step(step, **set) rescue StandardError => error decorate_error_with_details(error, set, step, logger) handle_error_of_step(error) set end end callbacks.run(:after, :execution, change_set) result rescue StandardError => error handle_step_error(error) end
Also aliased as: call
handle_step_error(error)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 95 def handle_step_error(error) log_step(current_step) callback = find_error_callback(error) raise unless callback callback.call(current_step, current_arg, error) end
on_errors(*errors, halt_on_error: true, &callback)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 104 def on_errors(*errors, halt_on_error: true, &callback) error_callbacks << ErrorCallback.new(errors, halt_on_error, &callback) end
Also aliased as: on_error
raise_duplicate_default_constructor()
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 43 def raise_duplicate_default_constructor raise ArgumentError, 'Default step already defined' end
set_default_constructor(default_constructor)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 38 def set_default_constructor(default_constructor) raise_duplicate_default_constructor if default_constructor_name.present? self.default_constructor_name = default_constructor end
step(argument = nil, **opts, &block)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 47 def step(argument = nil, **opts, &block) constructor = if block_given? # make type the :to_s of inline steps fall back to :inline if no type is given argument ||= :inline opts.reverse_merge!(to_s: argument) Constructor.new(:inline, **opts, &block) else constructor = step_resolvers.lazy.map do |resolver| resolver.call(argument) end.find(&:itself) if constructor constructor && constructors.fetch(constructor) { raise KeyError, "No step :#{argument} registered" } elsif default_constructor argument ||= default_constructor_name default_constructor else raise StandardError, "Could not resolve step from: #{argument}" end end register_step(argument, constructor, callbacks, **opts) end
step_resolver(&block)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 34 def step_resolver(&block) step_resolvers << block end
Private Instance Methods
callbacks()
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 136 def callbacks @callbacks ||= NxtPipeline::Callbacks.new(pipeline: self) end
decorate_error_with_details(error, change_set, step, logger)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 182 def decorate_error_with_details(error, change_set, step, logger) error.define_singleton_method :details do OpenStruct.new( change_set: change_set, logger: logger, step: step ) end error end
default_constructor()
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 145 def default_constructor return unless default_constructor_name @default_constructor ||= constructors[default_constructor_name.to_sym] end
default_step_resolvers()
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 178 def default_step_resolvers [->(step_argument) { step_argument.is_a?(Symbol) && step_argument }] end
execute_step(step, **change_set)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 151 def execute_step(step, **change_set) self.current_step = step self.current_arg = change_set result = step.execute(**change_set) log_step(step) result || change_set end
find_error_callback(error)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 159 def find_error_callback(error) error_callbacks.find { |callback| callback.applies_to_error?(error) } end
handle_error_of_step(error)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 197 def handle_error_of_step(error) error_callback = find_error_callback(error) raise error unless error_callback.present? && error_callback.continue_after_error? log_step(current_step) raise error unless error_callback.present? error_callback.call(current_step, current_arg, error) end
log_step(step)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 163 def log_step(step) return unless logger.respond_to?(:call) logger.call(step) end
raise_reserved_type_inline_error()
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 174 def raise_reserved_type_inline_error raise ArgumentError, 'Type :inline is reserved for inline steps!' end
register_step(argument, constructor, callbacks, **opts)
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 193 def register_step(argument, constructor, callbacks, **opts) steps << Step.new(argument, constructor, steps.count, self, callbacks, **opts) end
reset()
click to toggle source
# File lib/nxt_pipeline/pipeline.rb, line 169 def reset self.current_arg = nil self.current_step = nil end