class Expeditor::Command
Public Class Methods
const(value)
click to toggle source
# File lib/expeditor/command.rb, line 121 def self.const(value) ConstCommand.new(value) end
new(opts = {}, &block)
click to toggle source
# File lib/expeditor/command.rb, line 13 def initialize(opts = {}, &block) @service = opts.fetch(:service, Expeditor::Services.default) @timeout = opts[:timeout] @dependencies = opts.fetch(:dependencies, []) @normal_future = nil @retryable_options = Concurrent::IVar.new @normal_block = block @fallback_block = nil @ivar = Concurrent::IVar.new end
start(opts = {}, &block)
click to toggle source
# File lib/expeditor/command.rb, line 125 def self.start(opts = {}, &block) Command.new(opts, &block).start end
Public Instance Methods
chain(opts = {}, &block)
click to toggle source
XXX: Raise ArgumentError when given `opts` has :dependencies because this forcefully change given :dependencies.
`chain` returns new command that has self as dependencies
# File lib/expeditor/command.rb, line 116 def chain(opts = {}, &block) opts[:dependencies] = [self] Command.new(opts, &block) end
get()
click to toggle source
# File lib/expeditor/command.rb, line 51 def get raise NotStartedError unless started? @normal_future.get_or_else do if @fallback_block && @service.fallback_enabled? @ivar.wait if @ivar.rejected? raise @ivar.reason else @ivar.value end else raise @normal_future.reason end end end
on_complete(&block)
click to toggle source
command.on_complete do |success, value, reason|
...
end
# File lib/expeditor/command.rb, line 88 def on_complete(&block) on do |_, value, reason| block.call(reason == nil, value, reason) end end
on_failure(&block)
click to toggle source
command.on_failure do |e|
...
end
# File lib/expeditor/command.rb, line 106 def on_failure(&block) on do |_, _, reason| block.call(reason) if reason end end
on_success(&block)
click to toggle source
command.on_success do |value|
...
end
# File lib/expeditor/command.rb, line 97 def on_success(&block) on do |_, value, reason| block.call(value) unless reason end end
set_fallback(&block)
click to toggle source
# File lib/expeditor/command.rb, line 67 def set_fallback(&block) if started? raise AlreadyStartedError, "Do not allow set_fallback call after command is started" end reset_fallback(&block) self end
start(current_thread: false)
click to toggle source
@param current_thread [Boolean] Execute the task on current thread(blocking)
# File lib/expeditor/command.rb, line 26 def start(current_thread: false) unless started? if current_thread prepare(Concurrent::ImmediateExecutor.new) else prepare end @normal_future.safe_execute end self end
start_with_retry(current_thread: false, **retryable_options)
click to toggle source
Equivalent to retryable gem options
# File lib/expeditor/command.rb, line 39 def start_with_retry(current_thread: false, **retryable_options) unless started? @retryable_options.set(retryable_options) start(current_thread: current_thread) end self end
started?()
click to toggle source
# File lib/expeditor/command.rb, line 47 def started? @normal_future && @normal_future.executed? end
wait()
click to toggle source
# File lib/expeditor/command.rb, line 80 def wait raise NotStartedError unless started? @ivar.wait end
with_fallback(&block)
click to toggle source
# File lib/expeditor/command.rb, line 75 def with_fallback(&block) warn 'Expeditor::Command#with_fallback is deprecated. Please use set_fallback instead' set_fallback(&block) end
Private Instance Methods
breakable_block(args, &block)
click to toggle source
# File lib/expeditor/command.rb, line 229 def breakable_block(args, &block) @service.run_if_allowed do block.call(*args) end end
initial_normal(executor, &block)
click to toggle source
retryable_block do breakable_block do block.call end end
end
# File lib/expeditor/command.rb, line 166 def initial_normal(executor, &block) future = RichFuture.new(executor: executor) do args = wait_dependencies timeout_block(args, &block) end future.add_observer do |_, _, reason| metrics(reason) end future end
metrics(reason)
click to toggle source
# File lib/expeditor/command.rb, line 235 def metrics(reason) case reason when nil @service.success when Timeout::Error @service.timeout when RejectedExecutionError @service.rejection when CircuitBreakError @service.break when DependencyError @service.dependency else @service.failure end end
on(&callback)
click to toggle source
# File lib/expeditor/command.rb, line 256 def on(&callback) @ivar.add_observer(&callback) end
prepare(executor = @service.executor)
click to toggle source
set future set fallback future as an observer start dependencies
# File lib/expeditor/command.rb, line 134 def prepare(executor = @service.executor) @normal_future = initial_normal(executor, &@normal_block) @normal_future.add_observer do |_, value, reason| if reason # failure if @fallback_block future = RichFuture.new(executor: executor) do success, value, reason = Concurrent::SafeTaskExecutor.new(@fallback_block, rescue_exception: true).execute(reason) if success @ivar.set(value) else @ivar.fail(reason) end end future.safe_execute else @ivar.fail(reason) end else # success @ivar.set(value) end end @dependencies.each(&:start) end
reset_fallback(&block)
click to toggle source
# File lib/expeditor/command.rb, line 252 def reset_fallback(&block) @fallback_block = block end
retryable_block(args, &block)
click to toggle source
# File lib/expeditor/command.rb, line 218 def retryable_block(args, &block) if @retryable_options.fulfilled? Retryable.retryable(@retryable_options.value) do |retries, exception| metrics(exception) if retries > 0 breakable_block(args, &block) end else breakable_block(args, &block) end end
timeout_block(args, &block)
click to toggle source
# File lib/expeditor/command.rb, line 208 def timeout_block(args, &block) if @timeout Timeout::timeout(@timeout) do retryable_block(args, &block) end else retryable_block(args, &block) end end
wait_dependencies()
click to toggle source
# File lib/expeditor/command.rb, line 177 def wait_dependencies if @dependencies.count > 0 current = Thread.current executor = Concurrent::ThreadPoolExecutor.new( min_threads: 0, max_threads: 5, max_queue: 0, ) error = Concurrent::IVar.new error.add_observer do |_, e, _| executor.shutdown current.raise(DependencyError.new(e)) end args = [] @dependencies.each_with_index do |c, i| executor.post do begin args[i] = c.get rescue => e error.set(e) end end end executor.shutdown executor.wait_for_termination args else [] end end