class EvilEvents::Core::Events::Notifier::Worker::Executor

@api private since 0.3.0

Constants

FALLBACK_POLICIES

@api public @since 0.3.0

Attributes

options[R]

@return [Hash]

@api private @since 0.3.0

raw_executor[R]

@return [Concurrent::ThreadPoolExecutor]

@api private @since 0.3.0

Public Class Methods

new(min_threads:, max_threads:, max_queue:, fallback_policy:) click to toggle source

@option min_threads [Integer] @option max_threads [Integer] @option max_queue [Integer] @option fallback_policy [Symbol] @raise [EvilEvents::IncorrectFallbackPolicyError]

@api private @since 0.3.0

# File lib/evil_events/core/events/notifier/worker/executor.rb, line 37
def initialize(min_threads:, max_threads:, max_queue:, fallback_policy:)
  raise EvilEvents::IncorrectFallbackPolicyError unless FALLBACK_POLICIES[fallback_policy]

  @options = {
    min_threads:     min_threads,
    max_threads:     max_threads,
    max_queue:       max_queue,
    fallback_policy: FALLBACK_POLICIES[fallback_policy]
  }.freeze

  initialize_raw_executor!(**@options)
end

Public Instance Methods

execute(job) click to toggle source

@param job [EvilEvents::Core::Events::Notifier::Job] @raise [EvilEvents::WorkerDisabledOrBusyError] @return [Concurrent::Promise]

@api private @sicne 0.3.0 rubocop:disable Metrics/AbcSize, Style/MultilineBlockChain

# File lib/evil_events/core/events/notifier/worker/executor.rb, line 57
def execute(job)
  Concurrent::Promise.new(executor: raw_executor) do
    job.perform
  end.on_success do
    log_success(job.event, job.subscriber)
  end.on_error do |error|
    log_failure(job.event, job.subscriber)
    job.event.__call_on_error_hooks__(error)
  end.execute
rescue Concurrent::RejectedExecutionError
  raise EvilEvents::WorkerDisabledOrBusyError
end
restart!() click to toggle source

@return void

@api private @since 0.3.0

# File lib/evil_events/core/events/notifier/worker/executor.rb, line 84
def restart!
  shutdown!
  initialize_raw_executor!(**options)
end
shutdown!() click to toggle source

@return void

@api private @since 0.3.0

# File lib/evil_events/core/events/notifier/worker/executor.rb, line 75
def shutdown!
  raw_executor.shutdown
  raw_executor.wait_for_termination
end

Private Instance Methods

initialize_raw_executor!(**options) click to toggle source

@option min_threads [Integer] @option max_threads [Integer] @option max_queue [Integer] @option fallback_policy [Symbol] @return [Concurrent::ThreadPoolExecutor]

@api private @since 0.3.0

# File lib/evil_events/core/events/notifier/worker/executor.rb, line 99
def initialize_raw_executor!(**options)
  @raw_executor = Concurrent::ThreadPoolExecutor.new(**options)
end