class Google::Cloud::Trace::AsyncReporter

# AsyncReporter

@private Used by the {Google::Cloud::Trace::Middleware} to asynchronously buffer traces and push batches to Stackdriver Trace service when used in a Rack-based application.

Attributes

interval[R]

@private Implementation accessors

max_bytes[R]

@private Implementation accessors

max_count[R]

@private Implementation accessors

max_queue[R]

@private Implementation accessors

service[R]

@private Implementation accessors

threads[R]

@private Implementation accessors

Public Class Methods

new(service, max_count: 1000, max_bytes: 4_000_000, max_queue: 100, interval: 5, threads: 10) click to toggle source

@private Creates a new AsyncReporter instance.

Calls superclass method
# File lib/google/cloud/trace/async_reporter.rb, line 39
def initialize service, max_count: 1000, max_bytes: 4_000_000,
               max_queue: 100, interval: 5, threads: 10
  # init MonitorMixin
  super()

  @service = service

  @max_count = max_count
  @max_bytes = max_bytes
  @max_queue = max_queue
  @interval  = interval
  @threads   = threads

  @error_callbacks = []

  @cond = new_cond

  # Make sure all buffered messages are sent when process exits.
  at_exit { stop! }
end

Public Instance Methods

flush!() click to toggle source

Forces all traces in the current batch to be patched to the API immediately.

@return [AsyncReporter] returns self so calls can be chained.

# File lib/google/cloud/trace/async_reporter.rb, line 154
def flush!
  synchronize do
    patch_batch!
    @cond.broadcast
  end

  self
end
on_error(&block) click to toggle source

Register to be notified of errors when raised.

If an unhandled error has occurred the reporter will attempt to recover from the error and resume buffering, batching, and patching traces.

Multiple error handlers can be added.

@yield [callback] The block to be called when an error is raised. @yieldparam [Exception] error The error raised.

# File lib/google/cloud/trace/async_reporter.rb, line 193
def on_error &block
  synchronize do
    @error_callbacks << block
  end
end
patch_traces(traces) click to toggle source

Add the traces to the queue to be reported to Stackdriver Trace asynchronously. Signal the child thread to start processing the queue.

@param [Google::Cloud::Trace::TraceRecord,

Array{Google::Cloud::Trace::TraceRecord}] traces Either a single
trace object or an array of trace objects.
# File lib/google/cloud/trace/async_reporter.rb, line 67
def patch_traces traces
  if synchronize { @stopped }
    raise_stopped_error traces
    return
  end

  synchronize do
    Array(traces).each do |trace|
      # Add the trace to the batch
      @batch ||= Batch.new self
      next if @batch.try_add trace

      # If we can't add to the batch, publish and create a new batch
      patch_batch!
      @batch = Batch.new self
      @batch.add trace
    end

    init_resources!

    patch_batch! if @batch.ready?

    @cond.broadcast
  end
  self
end
project() click to toggle source

Get the project id from underlying service object.

# File lib/google/cloud/trace/async_reporter.rb, line 96
def project
  service.project
end
started?() click to toggle source

Whether the reporter has been started.

@return [boolean] `true` when started, `false` otherwise.

# File lib/google/cloud/trace/async_reporter.rb, line 168
def started?
  !stopped?
end
stop() click to toggle source

Begins the process of stopping the reporter. Traces already in the queue will be published, but no new traces can be added. Use {#wait!} to block until the reporter is fully stopped and all pending traces have been pushed to the API.

@return [AsyncReporter] returns self so calls can be chained.

# File lib/google/cloud/trace/async_reporter.rb, line 107
def stop
  synchronize do
    break if @stopped

    @stopped = true
    patch_batch!
    @cond.broadcast
    @thread_pool&.shutdown
  end

  self
end
stop!(timeout = nil) click to toggle source

Stop this asynchronous reporter and block until it has been stopped.

@param [Number] timeout Timeout in seconds.

# File lib/google/cloud/trace/async_reporter.rb, line 125
def stop! timeout = nil
  stop
  wait! timeout
end
stopped?() click to toggle source

Whether the reporter has been stopped.

@return [boolean] `true` when stopped, `false` otherwise.

# File lib/google/cloud/trace/async_reporter.rb, line 177
def stopped?
  synchronize { @stopped }
end
wait!(timeout = nil) click to toggle source

Blocks until the reporter is fully stopped, all pending traces have been published, and all callbacks have completed. Does not stop the reporter. To stop the reporter, first call {#stop} and then call {#wait!} to block until the reporter is stopped.

@return [AsyncReporter] returns self so calls can be chained.

# File lib/google/cloud/trace/async_reporter.rb, line 137
def wait! timeout = nil
  synchronize do
    if @thread_pool
      @thread_pool.shutdown
      @thread_pool.wait_for_termination timeout
    end
  end

  self
end

Protected Instance Methods

default_error_callbacks() click to toggle source
# File lib/google/cloud/trace/async_reporter.rb, line 282
def default_error_callbacks
  # This is memoized to reduce calls to the configuration.
  @default_error_callbacks ||= begin
    error_callback = Google::Cloud::Trace.configure.on_error
    error_callback ||= Google::Cloud.configure.on_error
    if error_callback
      [error_callback]
    else
      []
    end
  end
end
error!(error) click to toggle source

Calls all error callbacks.

# File lib/google/cloud/trace/async_reporter.rb, line 275
def error! error
  # We shouldn't need to synchronize getting the callbacks.
  error_callbacks = @error_callbacks
  error_callbacks = default_error_callbacks if error_callbacks.empty?
  error_callbacks.each { |error_callback| error_callback.call error }
end
init_resources!() click to toggle source
# File lib/google/cloud/trace/async_reporter.rb, line 201
def init_resources!
  @thread_pool ||= Concurrent::ThreadPoolExecutor.new \
    max_threads: @threads, max_queue: @max_queue
  @thread ||= Thread.new { run_background }
  nil # returning nil because of rubocop...
end
patch_batch!() click to toggle source
# File lib/google/cloud/trace/async_reporter.rb, line 228
def patch_batch!
  return unless @batch

  batch_to_be_patched = @batch
  @batch = nil
  patch_traces_async batch_to_be_patched
end
patch_traces_async(batch) click to toggle source
# File lib/google/cloud/trace/async_reporter.rb, line 236
def patch_traces_async batch
  Concurrent::Promises.future_on(
    @thread_pool, batch.traces
  ) do |traces|
    patch_traces_with traces
  end
rescue Concurrent::RejectedExecutionError => e
  async_error = AsyncReporterError.new(
    "Error writing traces: #{e.message}",
    batch.traces
  )
  # Manually set backtrace so we don't have to raise
  async_error.set_backtrace caller
  error! async_error
end
patch_traces_with(traces) click to toggle source
# File lib/google/cloud/trace/async_reporter.rb, line 252
def patch_traces_with traces
  service.patch_traces traces
rescue StandardError => e
  patch_error = AsyncPatchTracesError.new(
    "Error writing traces: #{e.message}",
    traces
  )
  # Manually set backtrace so we don't have to raise
  patch_error.set_backtrace caller
  error! patch_error
end
raise_stopped_error(traces) click to toggle source
# File lib/google/cloud/trace/async_reporter.rb, line 264
def raise_stopped_error traces
  stopped_error = AsyncReporterError.new(
    "AsyncReporter is stopped. Cannot patch traces.",
    traces
  )
  # Manually set backtrace so we don't have to raise
  stopped_error.set_backtrace caller
  error! stopped_error
end
run_background() click to toggle source
# File lib/google/cloud/trace/async_reporter.rb, line 208
def run_background
  synchronize do
    until @stopped
      if @batch.nil?
        @cond.wait
        next
      end

      if @batch.ready?
        # interval met, publish the batch...
        patch_batch!
        @cond.wait
      else
        # still waiting for the interval to publish the batch...
        @cond.wait @batch.publish_wait
      end
    end
  end
end