module Datadog::Workers::Async::Thread
Adds threading behavior to workers to run tasks asynchronously. rubocop:disable Metrics/ModuleLength
Constants
- FORK_POLICY_RESTART
- FORK_POLICY_STOP
- SHUTDOWN_TIMEOUT
Attributes
error[R]
fork_policy[W]
pid[R]
result[RW]
Public Class Methods
included(base)
click to toggle source
# File lib/ddtrace/workers/async.rb, line 14 def self.included(base) base.send(:prepend, PrependedMethods) end
Public Instance Methods
completed?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 62 def completed? !worker.nil? && worker.status == false && !error? end
error?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 57 def error? return false unless instance_variable_defined?(:@error) !@error.nil? end
failed?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 66 def failed? !worker.nil? && worker.status.nil? end
fork_policy()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 74 def fork_policy @fork_policy ||= FORK_POLICY_STOP end
forked?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 70 def forked? !pid.nil? && pid != Process.pid end
join(timeout = nil)
click to toggle source
# File lib/ddtrace/workers/async.rb, line 32 def join(timeout = nil) return true unless running? !worker.join(timeout).nil? end
run_async?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 44 def run_async? return false unless instance_variable_defined?(:@run_async) @run_async == true end
running?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 53 def running? !worker.nil? && worker.alive? end
started?()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 49 def started? !(worker.nil? || forked?) end
terminate()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 37 def terminate return false unless running? @run_async = false worker.terminate true end
Protected Instance Methods
after_fork()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 87 def after_fork # Do nothing by default end
mutex()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 83 def mutex @mutex ||= Mutex.new end
Private Instance Methods
mutex_after_fork()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 96 def mutex_after_fork @mutex_after_fork ||= Mutex.new end
restart_after_fork(&block)
click to toggle source
# File lib/ddtrace/workers/async.rb, line 151 def restart_after_fork(&block) mutex_after_fork.synchronize do if forked? # Trigger callback to allow workers to reset themselves accordingly after_fork # Start worker start_worker(&block) end end end
start(&block)
click to toggle source
# File lib/ddtrace/workers/async.rb, line 104 def start(&block) mutex.synchronize do return if running? if forked? case fork_policy when FORK_POLICY_STOP stop_fork when FORK_POLICY_RESTART restart_after_fork(&block) end elsif !run_async? start_worker(&block) end end end
start_worker() { || ... }
click to toggle source
# File lib/ddtrace/workers/async.rb, line 120 def start_worker @run_async = true @pid = Process.pid @error = nil Datadog.logger.debug("Starting thread in the process: #{Process.pid}") @worker = ::Thread.new do begin yield # rubocop:disable Lint/RescueException rescue Exception => e @error = e Datadog.logger.debug("Worker thread error. Cause #{e.message} Location: #{e.backtrace.first}") raise end end end
stop_fork()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 138 def stop_fork mutex_after_fork.synchronize do if forked? # Trigger callback to allow workers to reset themselves accordingly after_fork # Reset and turn off @pid = Process.pid @run_async = false end end end
worker()
click to toggle source
# File lib/ddtrace/workers/async.rb, line 100 def worker @worker ||= nil end