class OpenCensus::Trace::Exporters::Datadog::Worker

Constants

SHUTDOWN_TIMEOUT

Public Class Methods

new(transport, max_buffer_size, flush_interval, service_name) click to toggle source
# File lib/opencensus/trace/exporters/datadog/worker.rb, line 11
def initialize(transport, max_buffer_size, flush_interval, service_name)
  @transport = transport
  @flush_interval = flush_interval
  @span_buffer = SpanBuffer.new(max_buffer_size)
  @converter = Converter.new(service_name)
  @shutdown = ConditionVariable.new
  @mutex = Mutex.new

  @worker = nil
  @run = false
end

Public Instance Methods

callback_spans() click to toggle source
# File lib/opencensus/trace/exporters/datadog/worker.rb, line 23
def callback_spans
  return if @span_buffer.empty?

  begin
    spans = @span_buffer.pop()
    hash = Hash.new { |h, k| h[k] = [] }
    spans.each do |span|
      dd_span = @converter.convert_span(span)
      hash[dd_span[:trace_id]] << dd_span
    end
    traces = hash.map {|key,value| value}
    count = traces.length
    @transport.upload(MessagePack.pack(traces), count)
  rescue StandardError => e
    Datadog.log.error("[daatadog-exporter] failed to flush spans: #{e}")
  end
end
enqueue(span) click to toggle source
# File lib/opencensus/trace/exporters/datadog/worker.rb, line 74
def enqueue(span)
  @span_buffer.push(span)
end
join() click to toggle source
# File lib/opencensus/trace/exporters/datadog/worker.rb, line 60
def join
  @worker.join(SHUTDOWN_TIMEOUT)
end
perform() click to toggle source
# File lib/opencensus/trace/exporters/datadog/worker.rb, line 64
def perform
  loop do
    callback_spans
    @mutex.synchronize do
      return if !@run && @span_buffer.empty?
      @shutdown.wait(@mutex, @flush_interval) if @run
    end
  end
end
start() click to toggle source
# File lib/opencensus/trace/exporters/datadog/worker.rb, line 41
def start
  @mutex.synchronize do
    return if @run
    @run = true
    @worker = Thread.new { perform }
  end
end
stop() click to toggle source
# File lib/opencensus/trace/exporters/datadog/worker.rb, line 49
def stop
  @mutex.synchronize do
    return unless @run
    @span_buffer.close
    @run = false
    @shutdown.signal
  end
  join
  true
end