class ParallelMinion::Minion
Constants
- INFINITE
Give an infinite amount of time to wait for a
Minion
to complete a task
Attributes
Returns [Array<Object>] list of arguments in the order they were passed into the initializer
Returns [String] the description supplied on the initializer
Returns [Float] the number of milli-seconds the the minion took to complete Returns nil if the minion is still running
Returns [Exception] the exception that was raised otherwise nil
Metrics [String]
Returns [Integer] the maximum duration in milli-seconds that the Minion
may take to complete the task
Metrics [String]
Public Class Methods
Change the log level for the Completed log message.
Default: :info
Valid levels:
:trace, :debug, :info, :warn, :error, :fatal
# File lib/parallel_minion/minion.rb, line 86 def self.completed_log_level=(level) raise(ArgumentError, "Invalid log level: #{level}") unless SemanticLogger::LEVELS.include?(level) @completed_log_level = level end
# File lib/parallel_minion/minion.rb, line 311 def self.current_scopes scoped_classes.collect(&:all) end
Returns whether minions are enabled to run in their own threads
# File lib/parallel_minion/minion.rb, line 48 def self.enabled? @enabled end
Create a new Minion
Creates a new thread and logs the time for the supplied block to complete processing. The exception without stack trace is logged whenever an exception is thrown in the thread. Re-raises any unhandled exception in the calling thread when `#result` is called. Copies the logging tags and specified ActiveRecord scopes to the new thread.
Parameters
*arguments Any number of arguments can be supplied that are passed into the block in the order they are listed. Note: All arguments must be supplied by copy and not by reference. For example, use `#dup` to create copies of passed data. Pass by copy is critical to prevent concurrency issues when multiple threads attempt to update the same object at the same time. Proc / lambda A block of code must be supplied that the Minion will execute. Note: This block will be executed within the scope of the created minion instance and _not_ within the scope of where the Proc/lambda was originally created. This is done to force all parameters to be passed in explicitly and should be read-only or duplicates of the original data. :description [String] Description for this task that the Minion is performing. Written to the log file along with the time take to complete the task. :timeout [Integer] Maximum amount of time in milli-seconds that the task may take to complete before #result times out. Set to 0 to give the thread an infinite amount of time to complete. Default: 0 ( Wait forever ) Notes: - :timeout does not affect what happens to the Minion running the the task, it only affects how long #result will take to return. - The Minion will continue to run even after the timeout has been exceeded - If :enabled is false, or ParallelMinion::Minion.enabled is false, then :timeout is ignored and assumed to be Minion::INFINITE since the code is run in the calling thread when the Minion is created :on_timeout [Exception] The class to raise on the minion when the minion times out. By raising the exception on the running thread it ensures that the thread ends due to the exception, rather than continuing to execute. The exception is only raised on the running minion when #result is called. The current call to #result will complete with a result of nil, future calls to #result will raise the supplied exception on the current thread since the thread will have terminated with that exception. Note: :on_timeout has no effect if not #enabled? :metric [String] Name of the metric to forward to Semantic Logger when measuring the minion execution time. Example: inquiry/address_cleansing When a metric is supplied the following metrics will also be generated: - wait Duration waiting for a minion to complete. The additional metrics are added to the supplied metric name. For example: - inquiry/address_cleansing/wait :log_exception [Symbol] Control whether or how an exception thrown in the block is reported by Semantic Logger. Values: :full Log the exception class, message, and backtrace :partial Log the exception class and message. The backtrace will not be logged :off Any unhandled exception raised in the block will not be logged Default: :partial :on_exception_level [:trace | :debug | :info | :warn | :error | :fatal] Override the log level only when an exception occurs. Default: ParallelMinion::Minion.completed_log_level :enabled [Boolean] Override the global setting: `ParallelMinion::Minion.enabled?` for this minion instance.
The overhead for moving the task to a Minion
(separate thread) vs running it sequentially is about 0.3 ms if performing other tasks in-between starting the task and requesting its result.
The following call adds 0.5 ms to total processing time vs running the code in-line:
ParallelMinion::Minion.new(description: 'Count', timeout: 5) { 1 }.result
Note:
On JRuby it is recommended to add the following setting to .jrubyrc thread.pool.enabled=true
Example:
ParallelMinion::Minion.new( 10.days.ago, description: 'Doing something else in parallel', timeout: 1000 ) do |date| MyTable.where('created_at <= ?', date).count end
Example, when the result is being ignored, log full exception as an error:
ParallelMinion::Minion.new( customer, description: "We don't care about the result", log_exception: :full, on_exception_level: :error ) do |customer| customer.save! end
# File lib/parallel_minion/minion.rb, line 217 def initialize(*arguments, description: 'Minion', metric: nil, log_exception: nil, on_exception_level: self.class.completed_log_level, enabled: self.class.enabled?, timeout: INFINITE, on_timeout: nil, wait_metric: nil, &block) raise 'Missing mandatory block that Minion must perform' unless block @start_time = Time.now @exception = nil @arguments = arguments @timeout = timeout.to_f @description = description.to_s @metric = metric @log_exception = log_exception @on_exception_level = on_exception_level @enabled = enabled @on_timeout = on_timeout @wait_metric = (wait_metric || "#{metric}/wait") if @metric # When minion is disabled it is obvious in the logs since the name will now be 'Inline' instead of 'Minion' unless @enabled l = self.class.logger.dup l.name = 'Inline' self.logger = l end @enabled ? run(&block) : run_inline(&block) end
# File lib/parallel_minion/minion.rb, line 61 def self.scoped_classes=(scoped_classes) @scoped_classes = scoped_classes.dup end
Change the log level for the Started log message.
Default: :info
Valid levels:
:trace, :debug, :info, :warn, :error, :fatal
# File lib/parallel_minion/minion.rb, line 71 def self.started_log_level=(level) raise(ArgumentError, "Invalid log level: #{level}") unless SemanticLogger::LEVELS.include?(level) @started_log_level = level end
Public Instance Methods
Returns [Boolean] whether the minion has completed working on the task
# File lib/parallel_minion/minion.rb, line 284 def completed? enabled? ? @thread.stop? : true end
Returns [Boolean] whether this minion is enabled to run in a separate thread
# File lib/parallel_minion/minion.rb, line 303 def enabled? @enabled end
Returns [Boolean] whether the minion failed while performing the assigned task
# File lib/parallel_minion/minion.rb, line 289 def failed? !exception.nil? end
Returns the result when the thread completes Returns nil if the thread has not yet completed Raises any unhandled exception in the thread, if any
Note: The result of any thread cannot be nil
# File lib/parallel_minion/minion.rb, line 256 def result # Return nil if Minion is still working and has time left to finish if working? ms = time_left logger.measure( self.class.completed_log_level, "Waited for Minion to complete: #{description}", min_duration: 0.01, metric: wait_metric ) do if @thread.join(ms.nil? ? nil : ms / 1000).nil? @thread.raise(@on_timeout.new("Minion: #{description} timed out")) if @on_timeout logger.warn("Timed out waiting for: #{description}") return end end end # Return the exception, if any, otherwise the task result exception.nil? ? @result : Kernel.raise(exception) end
Returns the amount of time left in milli-seconds that this Minion
has to finish its task Returns 0 if no time is left Returns nil if their is no time limit. I.e. :timeout was set to Minion::INFINITE (infinite time left)
# File lib/parallel_minion/minion.rb, line 296 def time_left return nil if timeout.zero? || (timeout == -1) duration = timeout - (Time.now - start_time) * 1000 duration <= 0 ? 0 : duration end
Returns [Boolean] whether the minion is still working on the assigned task
# File lib/parallel_minion/minion.rb, line 279 def working? enabled? ? @thread.alive? : false end
Private Instance Methods
rubocop:enable Lint/RescueException
# File lib/parallel_minion/minion.rb, line 346 def run(&block) # Capture tags from current thread tags = SemanticLogger.tags tags = tags.nil? || tags.empty? ? nil : tags.dup named_tags = SemanticLogger.named_tags named_tags = named_tags.nil? || named_tags.empty? ? nil : named_tags.dup # Captures scopes from current thread. Only applicable for AR models scopes = self.class.current_scopes if defined?(ActiveRecord::Base) @thread = Thread.new(*arguments) do Thread.current.name = "#{description}-#{Thread.current.object_id}" # Copy logging tags from parent thread, if any SemanticLogger.tagged(*tags) do SemanticLogger.named_tagged(named_tags) do logger.public_send(self.class.started_log_level, "Started #{description}") # rubocop:disable Lint/RescueException begin proc = proc { run_in_scope(scopes, &block) } logger.measure( self.class.completed_log_level, "Completed #{description}", log_exception: log_exception, on_exception_level: on_exception_level, metric: metric, &proc ) rescue Exception => exc @exception = exc nil ensure @duration = Time.now - start_time # Return any database connections used by this thread back to the pool ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) end # rubocop:enable Lint/RescueException end end end end
# File lib/parallel_minion/minion.rb, line 389 def run_in_scope(scopes, &block) if scopes.nil? || scopes.empty? @result = instance_exec(*@arguments, &block) else # Use the captured scope when running the block. # Each Class to scope requires passing a block to .scoping. proc = proc { instance_exec(*@arguments, &block) } first = scopes.shift scopes.each { |scope| proc = proc { scope.scoping(&proc) } } @result = first.scoping(&proc) end end
Run the supplied block of code in the current thread. Useful for debugging, testing, and when running in batch environments.
# File lib/parallel_minion/minion.rb, line 327 def run_inline(&block) logger.public_send(self.class.started_log_level, "Started #{description}") logger.measure( self.class.completed_log_level, "Completed #{description}", log_exception: log_exception, on_exception_level: on_exception_level, metric: metric ) do @result = instance_exec(*arguments, &block) end rescue Exception => exc @exception = exc ensure @duration = Time.now - start_time end