class NewRelic::Agent::InfiniteTracing::StreamingBuffer

Constants

DEFAULT_QUEUE_SIZE
FLUSH_DELAY
MAX_FLUSH_WAIT

Attributes

queue[R]

Public Class Methods

new(max_size = DEFAULT_QUEUE_SIZE) click to toggle source
# File lib/infinite_tracing/streaming_buffer.rb, line 25
def initialize max_size = DEFAULT_QUEUE_SIZE
  @max_size = max_size
  @lock = Mutex.new
  @queue = Queue.new
  @batch = Array.new
end

Public Instance Methods

<<(segment) click to toggle source

Pushes the segment given onto the queue.

If the queue is at capacity, it is dumped and a supportability metric is recorded for the event.

When a restart signal is received, the queue is locked with a mutex, blocking the push until the queue has restarted.

# File lib/infinite_tracing/streaming_buffer.rb, line 49
def << segment
  @lock.synchronize do
    clear_queue if @queue.size >= @max_size
    NewRelic::Agent.increment_metric SPANS_SEEN_METRIC
    @queue.push segment
  end
end
batch_enumerator() { |span_batch(spans: batch)| ... } click to toggle source

Returns the blocking enumerator that will pop items off the queue while any items are present

yielding is deferred until batch_size spans is reached.

If nil is popped, the queue is closing. A final yield on non-empty batch is fired.

The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.

# File lib/infinite_tracing/streaming_buffer.rb, line 120
def batch_enumerator
  return enum_for(:enumerator) unless block_given?
  loop do
    if proc_or_segment = @queue.pop(false)
      NewRelic::Agent.increment_metric SPANS_SENT_METRIC
      @batch << transform(proc_or_segment)
      if @batch.size >= BATCH_SIZE
        yield SpanBatch.new(spans: @batch)
        @batch.clear
      end

    else
      yield SpanBatch.new(spans: @batch) unless @batch.empty?
      raise ClosedQueueError
    end
  end
end
clear_queue() click to toggle source

Drops all segments from the queue and records a supportability metric for the event.

# File lib/infinite_tracing/streaming_buffer.rb, line 59
def clear_queue
  @queue.clear
  NewRelic::Agent.increment_metric QUEUE_DUMPED_METRIC
end
close_queue() click to toggle source
# File lib/infinite_tracing/streaming_buffer.rb, line 82
def close_queue
  @lock.synchronize { @queue.close }
end
enumerator() { |transform(segment)| ... } click to toggle source

Returns the blocking enumerator that will pop items off the queue while any items are present If nil is popped, the queue is closing.

The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.

# File lib/infinite_tracing/streaming_buffer.rb, line 94
def enumerator
  return enum_for(:enumerator) unless block_given?
  loop do
    if segment = @queue.pop(false)
      NewRelic::Agent.increment_metric SPANS_SENT_METRIC
      yield transform(segment)

    else
      raise ClosedQueueError
    end
  end
end
flush_queue() click to toggle source

Waits for the queue to be fully consumed or for the waiting consumers to release.

# File lib/infinite_tracing/streaming_buffer.rb, line 66
def flush_queue
  @queue.num_waiting.times { @queue.push nil }
  close_queue

  # Logs if we're throwing away spans because nothing's
  # waiting to take them off the queue.
  if @queue.num_waiting == 0 && !@queue.empty?
    NewRelic::Agent.logger.warn "Discarding #{@queue.size} segments on Streaming Buffer"
    return
  end

  # Only wait a short while for queue to flush
  cutoff = Time.now + MAX_FLUSH_WAIT
  until @queue.empty? || Time.now >= cutoff do sleep(FLUSH_DELAY) end
end
transfer(new_buffer) click to toggle source

Dumps the contents of this streaming buffer onto the given buffer and closes the queue

# File lib/infinite_tracing/streaming_buffer.rb, line 34
def transfer new_buffer
  @lock.synchronize do
    until @queue.empty? do new_buffer.push @queue.pop end
    @queue.close
  end
end

Private Instance Methods

span_event(proc_or_segment) click to toggle source
# File lib/infinite_tracing/streaming_buffer.rb, line 140
def span_event proc_or_segment
  if proc_or_segment.is_a?(Proc)
    proc_or_segment.call 
  else
    SpanEventPrimitive.for_segment(proc_or_segment)
  end
end
transform(proc_or_segment) click to toggle source
# File lib/infinite_tracing/streaming_buffer.rb, line 148
def transform proc_or_segment
  Span.new Transformer.transform(span_event proc_or_segment)
end