module Stackdriver::Core::AsyncActor
@private An module that provides a class asynchronous capability when
included. It can create a child thread to run background jobs, and help make sure the child thread terminates properly when process is interrupted.
To use AsyncActor, the classes that are including this module need to define a run_backgrounder method that does the background job. The classes can then control the child thread job through instance methods like async_start, async_stop, etc.
@example
class Foo include AsyncActor def initialize super() end def run_backgrounder # run async job end end foo = Foo.new foo.async_start
Constants
- CLEANUP_TIMEOUT
- WAIT_INTERVAL
Attributes
@private The async actor state
Public Class Methods
@private Constructor to initialize MonitorMixin
# File lib/stackdriver/core/async_actor.rb, line 273 def initialize super() @startup_lock = Mutex.new @cleanup_options = { wait_interval: WAIT_INTERVAL, timeout: CLEANUP_TIMEOUT, force: true } end
@private Cleanup this async job when process exists
# File lib/stackdriver/core/async_actor.rb, line 237 def self.register_for_cleanup actor @exit_lock.synchronize do unless @cleanup_list @cleanup_list = [] at_exit { run_cleanup } end @cleanup_list.push actor end end
@private Take this async job off exit cleanup list
# File lib/stackdriver/core/async_actor.rb, line 250 def self.unregister_for_cleanup actor @exit_lock.synchronize do @cleanup_list&.delete actor end end
Private Class Methods
@private Cleanup the async job
# File lib/stackdriver/core/async_actor.rb, line 259 def self.run_cleanup @exit_lock.synchronize do return unless @cleanup_list @cleanup_list.shift.async_stop! until @cleanup_list.empty? end end
Public Instance Methods
Set the state to :running if the current state is :suspended.
@return [Boolean] True if the state has been set to :running.
Otherwise return false.
# File lib/stackdriver/core/async_actor.rb, line 112 def async_resume ensure_thread synchronize do if async_state == :suspended @async_state = :running async_state_change true else false end end end
Check if async job is running
@return [Boolean] True if state equals :running. Otherwise false.
# File lib/stackdriver/core/async_actor.rb, line 129 def async_running? synchronize do async_state == :running end end
Starts the child thread and asynchronous job
# File lib/stackdriver/core/async_actor.rb, line 64 def async_start ensure_thread end
Nicely ask the child thread to stop by setting the state to :stopping if it's not stopped already.
@return [Boolean] False if child thread is already stopped. Otherwise
true.
# File lib/stackdriver/core/async_actor.rb, line 74 def async_stop ensure_thread synchronize do if async_state == :stopped false else @async_state = :stopping async_state_change true end end end
Ask async job to stop. Then forcefully kill thread if it doesn't stop after timeout if needed.
@return [Symbol] :stopped if async job already stopped. :waited if
async job terminates within timeout range. :timeout if async job doesn't terminate after timeout. :forced if thread is killed by force after timeout.
# File lib/stackdriver/core/async_actor.rb, line 184 def async_stop! ensure_thread timeout = @cleanup_options[:timeout] force = @cleanup_options[:force] return :stopped unless async_stop return :waited if timeout.to_f.positive? && wait_until_async_stopped(timeout) return :timeout unless force @thread.kill @thread.join :forced end
Check if async job has stopped
@return [Boolean] True if state equals :stopped. Otherwise false.
# File lib/stackdriver/core/async_actor.rb, line 160 def async_stopped? synchronize do async_state == :stopped end end
Check if async job is stopping
@return [Boolean] True if state equals :stopping. Otherwise false.
# File lib/stackdriver/core/async_actor.rb, line 170 def async_stopping? synchronize do async_state == :stopping end end
Set the state to :suspend if the current state is :running.
@return [Boolean] Returns true if the state has been set to
:suspended. Otherwise return false.
# File lib/stackdriver/core/async_actor.rb, line 93 def async_suspend ensure_thread synchronize do if async_state == :running @async_state = :suspended async_state_change true else false end end end
Check if async job is suspended
@return [Boolean] True if state equals :suspended. Otherwise false.
# File lib/stackdriver/core/async_actor.rb, line 139 def async_suspended? synchronize do async_state == :suspended end end
Check if async job is working.
@return [Boolean] True if state is either :running or :suspended.
Otherwise false.
# File lib/stackdriver/core/async_actor.rb, line 150 def async_working? synchronize do async_state == :suspended || async_state == :running end end
Abstract method that the inheriting classes should implement.
This should be the main task job that will be run asynchronously and repeatly.
# File lib/stackdriver/core/async_actor.rb, line 230 def run_backgrounder raise "#{self.class} class should override #run_backgrounder method" end
Block current thread until the async job is fully stopped.
@param [Integer] timeout If given, lift the blocking when the time has
exceeded the timeout. If nil, block indefinitely.
@return [Boolean] True if async job is fully stopped. False if timeout.
# File lib/stackdriver/core/async_actor.rb, line 209 def wait_until_async_stopped timeout = nil ensure_thread deadline = timeout ? ::Time.new.to_f + timeout : nil synchronize do until async_state == :stopped cur_time = ::Time.new.to_f return false if deadline && cur_time >= deadline max_interval = @cleanup_options[:wait_interval] interval = deadline ? deadline - cur_time : max_interval interval = max_interval if interval > max_interval @lock_cond.wait interval end end true end
Private Instance Methods
@private Wrapper method for running the async job. It runs a loop while the condition allows, and it calls the run_backgrounder method. This method also ensures the state variable gets set to :stopped when it exits.
# File lib/stackdriver/core/async_actor.rb, line 288 def async_run_job until async_stopped? run_backgrounder return if async_stopping? && backgrounder_stoppable? end ensure @async_state = :stopped async_state_change end
@private Handler when the async actor's state changes. Call the `on_async_state_change` callback function if it's defined.
# File lib/stackdriver/core/async_actor.rb, line 342 def async_state_change on_async_state_change synchronize do @lock_cond.broadcast end end
@private Default backgrounder stop condition when asked to be stopped gracefully. Called from async_run_job when async actor state changes to :stopping
# File lib/stackdriver/core/async_actor.rb, line 335 def backgrounder_stoppable? true end
@private Ensures the child thread is started and kick off the job
to run async_run_job. Also make calls register_for_cleanup on the async job to make sure it exits properly.
# File lib/stackdriver/core/async_actor.rb, line 303 def ensure_thread raise "async_actor not initialized" if @startup_lock.nil? @startup_lock.synchronize do if (@thread.nil? || !@thread.alive?) && @async_state != :stopped @lock_cond = new_cond AsyncActor.register_for_cleanup self @async_state = :running async_state_change @thread = Thread.new do async_run_job AsyncActor.unregister_for_cleanup self end end end end
@private Default abstract definition of this function that's a no-op. The extending classes can override this method to handle state changing logic.
# File lib/stackdriver/core/async_actor.rb, line 354 def on_async_state_change end
@private Set cleanup options.
@param [Hash] kwargs Hash of cleanup options. `:timeout` is the cleanup
wait timeout. `:wait_interval` is the cleanup wait interval. `:force` for forcefully terminate actor when all other options fail.
# File lib/stackdriver/core/async_actor.rb, line 327 def set_cleanup_options **kwargs @cleanup_options.merge! kwargs end