class OpenTelemetry::Exporters::Datadog::DatadogSpanProcessor

Implementation of the duck type SpanProcessor that batches spans exported by the SDK into complete traces then pushes them to the exporter pipeline.

All spans reported by the SDK implementation are first added to a synchronized in memory trace storage (with a {max_queue_size} maximum size, of trace size {max_trace_size} after the size of either is reached spans are dropped). When traces are designated as “complete” they're added to a queue that is exported every schedule_delay_millis to the exporter pipeline in batches of completed traces. The datadog writer and transport supplied to the exporter handle the bulk of the timeout and retry logic.

Constants

MAX_QUEUE_SIZE
MAX_TRACE_SIZE
SCHEDULE_DELAY_MILLIS

Attributes

check_traces_queue[R]
max_queue_size[R]
max_trace_size[R]
traces[R]
traces_spans_count[R]
traces_spans_ended_count[R]

Public Class Methods

new(exporter:, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_trace_size: MAX_TRACE_SIZE) click to toggle source
# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 31
def initialize(exporter:,
               schedule_delay_millis: SCHEDULE_DELAY_MILLIS,
               max_queue_size: MAX_QUEUE_SIZE,
               max_trace_size: MAX_TRACE_SIZE)
  raise ArgumentError if max_trace_size > max_queue_size

  @exporter = exporter
  @mutex = Mutex.new
  @condition = ConditionVariable.new
  @keep_running = true
  @delay_seconds = schedule_delay_millis / 1000.0
  @max_queue_size = max_queue_size
  @max_trace_size = max_trace_size
  @spans = []
  @thread = Thread.new { work }

  @traces = {}
  @traces_spans_count = {}
  @traces_spans_ended_count = {}
  @check_traces_queue = []
  @_spans_dropped = false
end

Public Instance Methods

force_flush() click to toggle source

TODO: test this explicitly. Export all ended traces to the configured `Exporter` that have not yet been exported.

This method should only be called in cases where it is absolutely necessary, such as when using some FaaS providers that may suspend the process after an invocation, but before the `Processor` exports the completed spans.

# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 131
def force_flush
  snapshot = lock { fetch_batch }
  export_batch(snapshot)
end
on_finish(span) click to toggle source

adds a span to the batcher, threadsafe may block on lock

# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 96
def on_finish(span)
  if @keep_running == false
    OpenTelemetry.logger.warn('Already shutdown, dropping span')
    return
  end

  # TODO: determine if all "not-sampled" spans still get passed to on_finish?
  # If so then we don't need to account for Probability Sampling
  # and can likely incorporate Priority Sampling from DD
  # If not, then we need to ensure the rate from OpenTelemetry.tracer_provider.active_trace_config.sampler
  # can be expoed to the span or attached to spanData in some way
  # return unless span.context.trace_flags.sampled?

  context = span.context
  trace_id = context.trace_id

  lock do
    if traces_spans_ended_count[trace_id].nil?
      traces_spans_ended_count[trace_id] = 1
    else
      traces_spans_ended_count[trace_id] += 1
    end

    check_traces_queue.unshift(trace_id) if trace_exportable?(trace_id)
  end
end
on_start(span, _parent_context) click to toggle source

datadog trace-agent endpoint requires a complete trace to be sent threadsafe may block on lock

# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 56
def on_start(span, _parent_context)
  context = span.context
  trace_id = context.trace_id

  lock do
    if all_spans_count(traces_spans_count) >= max_queue_size
      # instead of just dropping all new spans, dd-trace-rb drops a random trace
      # https://github.com/DataDog/dd-trace-rb/blob/c6fbf2410a60495f1b2d8912bf7ea7dc63422141/lib/ddtrace/buffer.rb#L34-L36
      # It allows for a more fair usage of the queue when under stress load,
      # and will create proportional representation of code paths being instrumented at stress time.
      unfinished_trace_id = fetch_unfinished_trace_id

      # if there are no unfinished traces able to be dropped, don't add more spans, and return early
      if unfinished_trace_id.nil?
        OpenTelemetry.logger.warn('Max spans for all traces, spans will be dropped')
        @_spans_dropped = true
        return
      end

      drop_unfinished_trace(unfinished_trace_id)
      OpenTelemetry.logger.warn('Max spans for all traces, traces will be dropped')
    end

    if traces[trace_id].nil?
      traces[trace_id] = [span]
      traces_spans_count[trace_id] = 1
    else
      if traces[trace_id].size >= max_trace_size
        OpenTelemetry.logger.warn('Max spans for trace, spans will be dropped')
        @_spans_dropped = true
        return
      end

      traces[trace_id] << span
      traces_spans_count[trace_id] += 1
    end
  end
end
shutdown(timeout: nil) click to toggle source

shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished

# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 138
def shutdown(timeout: nil)
  lock do
    @keep_running = false
    @condition.signal
  end

  @thread.join
  force_flush
  @exporter.shutdown
end

Private Instance Methods

all_spans_count(traces_spans_count) click to toggle source
# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 181
def all_spans_count(traces_spans_count)
  traces_spans_count.values.sum
end
drop_unfinished_trace(trace_id) click to toggle source
# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 210
def drop_unfinished_trace(trace_id)
  traces.delete(trace_id)
  traces_spans_count.delete(trace_id)
  traces_spans_ended_count.delete(trace_id)
end
export_batch(trace_spans) click to toggle source
# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 167
def export_batch(trace_spans)
  return if trace_spans.empty?

  trace_spans.each do |spans|
    @exporter.export(spans)
  rescue StandardError => e
    OpenTelemetry.logger.warn("Exception while exporting Span batch. #{e.message} , #{e.backtrace}")
  end
end
fetch_batch() click to toggle source
# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 185
def fetch_batch
  export_traces = []

  check_traces_queue.reverse_each do |trace_id|
    next unless trace_exportable?(trace_id)

    export_traces << fetch_spans(traces.delete(trace_id))
    check_traces_queue.delete(trace_id)
    traces_spans_count.delete(trace_id)
    traces_spans_ended_count.delete(trace_id)
  end

  export_traces
end
fetch_spans(spans) click to toggle source
# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 200
def fetch_spans(spans)
  spans.map!(&:to_span_data)
end
fetch_unfinished_trace_id() click to toggle source
# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 204
def fetch_unfinished_trace_id
  # don't delete potentially finished trace awaiting export
  unfinished_traces = traces.keys - check_traces_queue
  unfinished_traces[rand(unfinished_traces.length)]
end
lock() { || ... } click to toggle source
# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 216
def lock
  @mutex.synchronize do
    yield
  end
end
trace_exportable?(trace_id) click to toggle source
# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 177
def trace_exportable?(trace_id)
  traces_spans_count[trace_id] - traces_spans_ended_count[trace_id] <= 0 if traces_spans_count.key?(trace_id) && traces_spans_ended_count.key?(trace_id)
end
work() click to toggle source
# File lib/opentelemetry/exporters/datadog/datadog_span_processor.rb, line 153
def work
  while @keep_running
    trace_spans = lock do
      @condition.wait(@mutex, @delay_seconds) if @keep_running
      @condition.wait(@mutex, @delay_seconds) while check_traces_queue.empty? && @keep_running
      return unless @keep_running

      fetch_batch
    end

    export_batch(trace_spans)
  end
end