class NewRelic::Agent::InfiniteTracing::StreamingBuffer
Constants
- DEFAULT_QUEUE_SIZE
- FLUSH_DELAY
- MAX_FLUSH_WAIT
Attributes
Public Class Methods
# File lib/infinite_tracing/streaming_buffer.rb, line 25 def initialize max_size = DEFAULT_QUEUE_SIZE @max_size = max_size @lock = Mutex.new @queue = Queue.new @batch = Array.new end
Public Instance Methods
Pushes the segment given onto the queue.
If the queue is at capacity, it is dumped and a supportability metric is recorded for the event.
When a restart signal is received, the queue is locked with a mutex, blocking the push until the queue has restarted.
# File lib/infinite_tracing/streaming_buffer.rb, line 49 def << segment @lock.synchronize do clear_queue if @queue.size >= @max_size NewRelic::Agent.increment_metric SPANS_SEEN_METRIC @queue.push segment end end
Returns the blocking enumerator that will pop items off the queue while any items are present
yielding is deferred until batch_size spans is reached.
If nil
is popped, the queue is closing. A final yield on non-empty batch is fired.
The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.
# File lib/infinite_tracing/streaming_buffer.rb, line 120 def batch_enumerator return enum_for(:enumerator) unless block_given? loop do if proc_or_segment = @queue.pop(false) NewRelic::Agent.increment_metric SPANS_SENT_METRIC @batch << transform(proc_or_segment) if @batch.size >= BATCH_SIZE yield SpanBatch.new(spans: @batch) @batch.clear end else yield SpanBatch.new(spans: @batch) unless @batch.empty? raise ClosedQueueError end end end
Drops all segments from the queue and records a supportability metric for the event.
# File lib/infinite_tracing/streaming_buffer.rb, line 59 def clear_queue @queue.clear NewRelic::Agent.increment_metric QUEUE_DUMPED_METRIC end
# File lib/infinite_tracing/streaming_buffer.rb, line 82 def close_queue @lock.synchronize { @queue.close } end
Returns the blocking enumerator that will pop items off the queue while any items are present If nil
is popped, the queue is closing.
The segment is transformed into a serializable span here so processing is taking place within the gRPC call's thread rather than in the main application thread.
# File lib/infinite_tracing/streaming_buffer.rb, line 94 def enumerator return enum_for(:enumerator) unless block_given? loop do if segment = @queue.pop(false) NewRelic::Agent.increment_metric SPANS_SENT_METRIC yield transform(segment) else raise ClosedQueueError end end end
Waits for the queue to be fully consumed or for the waiting consumers to release.
# File lib/infinite_tracing/streaming_buffer.rb, line 66 def flush_queue @queue.num_waiting.times { @queue.push nil } close_queue # Logs if we're throwing away spans because nothing's # waiting to take them off the queue. if @queue.num_waiting == 0 && !@queue.empty? NewRelic::Agent.logger.warn "Discarding #{@queue.size} segments on Streaming Buffer" return end # Only wait a short while for queue to flush cutoff = Time.now + MAX_FLUSH_WAIT until @queue.empty? || Time.now >= cutoff do sleep(FLUSH_DELAY) end end
Dumps the contents of this streaming buffer onto the given buffer and closes the queue
# File lib/infinite_tracing/streaming_buffer.rb, line 34 def transfer new_buffer @lock.synchronize do until @queue.empty? do new_buffer.push @queue.pop end @queue.close end end
Private Instance Methods
# File lib/infinite_tracing/streaming_buffer.rb, line 140 def span_event proc_or_segment if proc_or_segment.is_a?(Proc) proc_or_segment.call else SpanEventPrimitive.for_segment(proc_or_segment) end end
# File lib/infinite_tracing/streaming_buffer.rb, line 148 def transform proc_or_segment Span.new Transformer.transform(span_event proc_or_segment) end