module Datadog::Workers::Async::Thread

Adds threading behavior to workers to run tasks asynchronously. rubocop:disable Metrics/ModuleLength

Constants

FORK_POLICY_RESTART
FORK_POLICY_STOP
SHUTDOWN_TIMEOUT

Attributes

error[R]
fork_policy[W]
pid[R]
result[RW]

Public Class Methods

included(base) click to toggle source
# File lib/ddtrace/workers/async.rb, line 14
def self.included(base)
  base.send(:prepend, PrependedMethods)
end

Public Instance Methods

completed?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 62
def completed?
  !worker.nil? && worker.status == false && !error?
end
error?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 57
def error?
  return false unless instance_variable_defined?(:@error)
  !@error.nil?
end
failed?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 66
def failed?
  !worker.nil? && worker.status.nil?
end
fork_policy() click to toggle source
# File lib/ddtrace/workers/async.rb, line 74
def fork_policy
  @fork_policy ||= FORK_POLICY_STOP
end
forked?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 70
def forked?
  !pid.nil? && pid != Process.pid
end
join(timeout = nil) click to toggle source
# File lib/ddtrace/workers/async.rb, line 32
def join(timeout = nil)
  return true unless running?
  !worker.join(timeout).nil?
end
run_async?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 44
def run_async?
  return false unless instance_variable_defined?(:@run_async)
  @run_async == true
end
running?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 53
def running?
  !worker.nil? && worker.alive?
end
started?() click to toggle source
# File lib/ddtrace/workers/async.rb, line 49
def started?
  !(worker.nil? || forked?)
end
terminate() click to toggle source
# File lib/ddtrace/workers/async.rb, line 37
def terminate
  return false unless running?
  @run_async = false
  worker.terminate
  true
end

Protected Instance Methods

after_fork() click to toggle source
# File lib/ddtrace/workers/async.rb, line 87
def after_fork
  # Do nothing by default
end
mutex() click to toggle source
# File lib/ddtrace/workers/async.rb, line 83
def mutex
  @mutex ||= Mutex.new
end

Private Instance Methods

mutex_after_fork() click to toggle source
# File lib/ddtrace/workers/async.rb, line 96
def mutex_after_fork
  @mutex_after_fork ||= Mutex.new
end
restart_after_fork(&block) click to toggle source
# File lib/ddtrace/workers/async.rb, line 151
def restart_after_fork(&block)
  mutex_after_fork.synchronize do
    if forked?
      # Trigger callback to allow workers to reset themselves accordingly
      after_fork

      # Start worker
      start_worker(&block)
    end
  end
end
start(&block) click to toggle source
# File lib/ddtrace/workers/async.rb, line 104
def start(&block)
  mutex.synchronize do
    return if running?
    if forked?
      case fork_policy
      when FORK_POLICY_STOP
        stop_fork
      when FORK_POLICY_RESTART
        restart_after_fork(&block)
      end
    elsif !run_async?
      start_worker(&block)
    end
  end
end
start_worker() { || ... } click to toggle source
# File lib/ddtrace/workers/async.rb, line 120
def start_worker
  @run_async = true
  @pid = Process.pid
  @error = nil
  Datadog.logger.debug("Starting thread in the process: #{Process.pid}")

  @worker = ::Thread.new do
    begin
      yield
    # rubocop:disable Lint/RescueException
    rescue Exception => e
      @error = e
      Datadog.logger.debug("Worker thread error. Cause #{e.message} Location: #{e.backtrace.first}")
      raise
    end
  end
end
stop_fork() click to toggle source
# File lib/ddtrace/workers/async.rb, line 138
def stop_fork
  mutex_after_fork.synchronize do
    if forked?
      # Trigger callback to allow workers to reset themselves accordingly
      after_fork

      # Reset and turn off
      @pid = Process.pid
      @run_async = false
    end
  end
end
worker() click to toggle source
# File lib/ddtrace/workers/async.rb, line 100
def worker
  @worker ||= nil
end