class ParallelMinion::Minion

Constants

INFINITE

Give an infinite amount of time to wait for a Minion to complete a task

Attributes

completed_log_level[R]
enabled[W]
scoped_classes[R]
started_log_level[R]
arguments[R]

Returns [Array<Object>] list of arguments in the order they were passed into the initializer

description[R]

Returns [String] the description supplied on the initializer

duration[R]

Returns [Float] the number of milli-seconds the the minion took to complete Returns nil if the minion is still running

exception[R]

Returns [Exception] the exception that was raised otherwise nil

log_exception[R]
metric[R]

Metrics [String]

on_exception_level[R]
on_timeout[R]
start_time[R]
timeout[R]

Returns [Integer] the maximum duration in milli-seconds that the Minion may take to complete the task

wait_metric[R]

Metrics [String]

Public Class Methods

completed_log_level=(level) click to toggle source

Change the log level for the Completed log message.

Default: :info

Valid levels:

:trace, :debug, :info, :warn, :error, :fatal
# File lib/parallel_minion/minion.rb, line 86
def self.completed_log_level=(level)
  raise(ArgumentError, "Invalid log level: #{level}") unless SemanticLogger::LEVELS.include?(level)
  @completed_log_level = level
end
current_scopes() click to toggle source
# File lib/parallel_minion/minion.rb, line 311
def self.current_scopes
  scoped_classes.collect(&:all)
end
enabled?() click to toggle source

Returns whether minions are enabled to run in their own threads

# File lib/parallel_minion/minion.rb, line 48
def self.enabled?
  @enabled
end
new(*arguments, description: 'Minion', metric: nil, log_exception: nil, on_exception_level: self.class.completed_log_level, enabled: self.class.enabled?, timeout: INFINITE, on_timeout: nil, wait_metric: nil, &block) click to toggle source

Create a new Minion

Creates a new thread and logs the time for the supplied block to complete processing.
The exception without stack trace is logged whenever an exception is thrown in the thread.

Re-raises any unhandled exception in the calling thread when `#result` is called.
Copies the logging tags and specified ActiveRecord scopes to the new thread.

Parameters

*arguments
  Any number of arguments can be supplied that are passed into the block
  in the order they are listed.

  Note:
    All arguments must be supplied by copy and not by reference.
    For example, use `#dup` to create copies of passed data.
    Pass by copy is critical to prevent concurrency issues when multiple threads
    attempt to update the same object at the same time.

Proc / lambda
  A block of code must be supplied that the Minion will execute.

  Note:
    This block will be executed within the scope of the created minion instance
    and _not_ within the scope of where the Proc/lambda was originally created.
    This is done to force all parameters to be passed in explicitly
    and should be read-only or duplicates of the original data.

:description [String]
  Description for this task that the Minion is performing.
  Written to the log file along with the time take to complete the task.

:timeout [Integer]
  Maximum amount of time in milli-seconds that the task may take to complete
  before #result times out.
  Set to 0 to give the thread an infinite amount of time to complete.
  Default: 0 ( Wait forever )

  Notes:
  - :timeout does not affect what happens to the Minion running the
    the task, it only affects how long #result will take to return.
  - The Minion will continue to run even after the timeout has been exceeded
  - If :enabled is false, or ParallelMinion::Minion.enabled is false,
    then :timeout is ignored and assumed to be Minion::INFINITE
    since the code is run in the calling thread when the Minion is created

:on_timeout [Exception]
  The class to raise on the minion when the minion times out.
  By raising the exception on the running thread it ensures that the thread
  ends due to the exception, rather than continuing to execute.
  The exception is only raised on the running minion when #result is called.
  The current call to #result will complete with a result of nil, future
  calls to #result will raise the supplied exception on the current thread
  since the thread will have terminated with that exception.

  Note: :on_timeout has no effect if not #enabled?

:metric [String]
  Name of the metric to forward to Semantic Logger when measuring the minion execution time.
  Example: inquiry/address_cleansing

  When a metric is supplied the following metrics will also be generated:
  - wait
      Duration waiting for a minion to complete.

  The additional metrics are added to the supplied metric name. For example:
  - inquiry/address_cleansing/wait

:log_exception [Symbol]
  Control whether or how an exception thrown in the block is
  reported by Semantic Logger. Values:
   :full
     Log the exception class, message, and backtrace
   :partial
     Log the exception class and message. The backtrace will not be logged
   :off
     Any unhandled exception raised in the block will not be logged
   Default: :partial

:on_exception_level [:trace | :debug | :info | :warn | :error | :fatal]
  Override the log level only when an exception occurs.
  Default: ParallelMinion::Minion.completed_log_level

:enabled [Boolean]
  Override the global setting: `ParallelMinion::Minion.enabled?` for this minion instance.

The overhead for moving the task to a Minion (separate thread) vs running it sequentially is about 0.3 ms if performing other tasks in-between starting the task and requesting its result.

The following call adds 0.5 ms to total processing time vs running the code in-line:

ParallelMinion::Minion.new(description: 'Count', timeout: 5) { 1 }.result

Note:

On JRuby it is recommended to add the following setting to .jrubyrc
  thread.pool.enabled=true

Example:

ParallelMinion::Minion.new(
  10.days.ago,
  description: 'Doing something else in parallel',
  timeout:     1000
) do |date|
  MyTable.where('created_at <= ?', date).count
end

Example, when the result is being ignored, log full exception as an error:

ParallelMinion::Minion.new(
  customer,
  description:        "We don't care about the result",
  log_exception:      :full,
  on_exception_level: :error
) do |customer|
  customer.save!
end
# File lib/parallel_minion/minion.rb, line 217
def initialize(*arguments,
               description: 'Minion',
               metric: nil,
               log_exception: nil,
               on_exception_level: self.class.completed_log_level,
               enabled: self.class.enabled?,
               timeout: INFINITE,
               on_timeout: nil,
               wait_metric: nil,
               &block)
  raise 'Missing mandatory block that Minion must perform' unless block
  @start_time         = Time.now
  @exception          = nil
  @arguments          = arguments
  @timeout            = timeout.to_f
  @description        = description.to_s
  @metric             = metric
  @log_exception      = log_exception
  @on_exception_level = on_exception_level
  @enabled            = enabled
  @on_timeout         = on_timeout

  @wait_metric        = (wait_metric || "#{metric}/wait") if @metric

  # When minion is disabled it is obvious in the logs since the name will now be 'Inline' instead of 'Minion'
  unless @enabled
    l           = self.class.logger.dup
    l.name      = 'Inline'
    self.logger = l
  end

  @enabled ? run(&block) : run_inline(&block)
end
scoped_classes=(scoped_classes) click to toggle source
# File lib/parallel_minion/minion.rb, line 61
def self.scoped_classes=(scoped_classes)
  @scoped_classes = scoped_classes.dup
end
started_log_level=(level) click to toggle source

Change the log level for the Started log message.

Default: :info

Valid levels:

:trace, :debug, :info, :warn, :error, :fatal
# File lib/parallel_minion/minion.rb, line 71
def self.started_log_level=(level)
  raise(ArgumentError, "Invalid log level: #{level}") unless SemanticLogger::LEVELS.include?(level)
  @started_log_level = level
end

Public Instance Methods

completed?() click to toggle source

Returns [Boolean] whether the minion has completed working on the task

# File lib/parallel_minion/minion.rb, line 284
def completed?
  enabled? ? @thread.stop? : true
end
enabled?() click to toggle source

Returns [Boolean] whether this minion is enabled to run in a separate thread

# File lib/parallel_minion/minion.rb, line 303
def enabled?
  @enabled
end
failed?() click to toggle source

Returns [Boolean] whether the minion failed while performing the assigned task

# File lib/parallel_minion/minion.rb, line 289
def failed?
  !exception.nil?
end
result() click to toggle source

Returns the result when the thread completes Returns nil if the thread has not yet completed Raises any unhandled exception in the thread, if any

Note: The result of any thread cannot be nil

# File lib/parallel_minion/minion.rb, line 256
def result
  # Return nil if Minion is still working and has time left to finish
  if working?
    ms = time_left
    logger.measure(
      self.class.completed_log_level,
      "Waited for Minion to complete: #{description}",
      min_duration: 0.01,
      metric:       wait_metric
    ) do
      if @thread.join(ms.nil? ? nil : ms / 1000).nil?
        @thread.raise(@on_timeout.new("Minion: #{description} timed out")) if @on_timeout
        logger.warn("Timed out waiting for: #{description}")
        return
      end
    end
  end

  # Return the exception, if any, otherwise the task result
  exception.nil? ? @result : Kernel.raise(exception)
end
time_left() click to toggle source

Returns the amount of time left in milli-seconds that this Minion has to finish its task Returns 0 if no time is left Returns nil if their is no time limit. I.e. :timeout was set to Minion::INFINITE (infinite time left)

# File lib/parallel_minion/minion.rb, line 296
def time_left
  return nil if timeout.zero? || (timeout == -1)
  duration = timeout - (Time.now - start_time) * 1000
  duration <= 0 ? 0 : duration
end
working?() click to toggle source

Returns [Boolean] whether the minion is still working on the assigned task

# File lib/parallel_minion/minion.rb, line 279
def working?
  enabled? ? @thread.alive? : false
end

Private Instance Methods

run(&block) click to toggle source

rubocop:enable Lint/RescueException

# File lib/parallel_minion/minion.rb, line 346
def run(&block)
  # Capture tags from current thread
  tags = SemanticLogger.tags
  tags = tags.nil? || tags.empty? ? nil : tags.dup

  named_tags = SemanticLogger.named_tags
  named_tags = named_tags.nil? || named_tags.empty? ? nil : named_tags.dup

  # Captures scopes from current thread. Only applicable for AR models
  scopes     = self.class.current_scopes if defined?(ActiveRecord::Base)

  @thread = Thread.new(*arguments) do
    Thread.current.name = "#{description}-#{Thread.current.object_id}"

    # Copy logging tags from parent thread, if any
    SemanticLogger.tagged(*tags) do
      SemanticLogger.named_tagged(named_tags) do
        logger.public_send(self.class.started_log_level, "Started #{description}")
        # rubocop:disable Lint/RescueException
        begin
          proc = proc { run_in_scope(scopes, &block) }
          logger.measure(
            self.class.completed_log_level,
            "Completed #{description}",
            log_exception:      log_exception,
            on_exception_level: on_exception_level,
            metric:             metric,
            &proc
          )
        rescue Exception => exc
          @exception = exc
          nil
        ensure
          @duration = Time.now - start_time
          # Return any database connections used by this thread back to the pool
          ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
        end
        # rubocop:enable Lint/RescueException
      end
    end
  end
end
run_in_scope(scopes, &block) click to toggle source
# File lib/parallel_minion/minion.rb, line 389
def run_in_scope(scopes, &block)
  if scopes.nil? || scopes.empty?
    @result = instance_exec(*@arguments, &block)
  else
    # Use the captured scope when running the block.
    # Each Class to scope requires passing a block to .scoping.
    proc  = proc { instance_exec(*@arguments, &block) }
    first = scopes.shift
    scopes.each { |scope| proc = proc { scope.scoping(&proc) } }
    @result = first.scoping(&proc)
  end
end
run_inline(&block) click to toggle source

Run the supplied block of code in the current thread. Useful for debugging, testing, and when running in batch environments.

# File lib/parallel_minion/minion.rb, line 327
def run_inline(&block)
  logger.public_send(self.class.started_log_level, "Started #{description}")
  logger.measure(
    self.class.completed_log_level,
    "Completed #{description}",
    log_exception:      log_exception,
    on_exception_level: on_exception_level,
    metric:             metric
  ) do
    @result = instance_exec(*arguments, &block)
  end
rescue Exception => exc
  @exception = exc
ensure
  @duration = Time.now - start_time
end