class Google::Cloud::Logging::AsyncWriter

# AsyncWriter

AsyncWriter buffers, batches, and transmits log entries efficiently. Writing log entries is asynchronous and will not block.

Batches that cannot be delivered immediately are queued. When the queue is full new batch requests will raise errors that can be consumed using the {#on_error} callback. This provides back pressure in case the writer cannot keep up with requests.

This object is thread-safe; it may accept write requests from multiple threads simultaneously, and will serialize them when executing in the background thread.

@example

require "google/cloud/logging"

logging = Google::Cloud::Logging.new

async = logging.async_writer

entry1 = logging.entry payload: "Job started."
entry2 = logging.entry payload: "Job completed."

labels = { job_size: "large", job_code: "red" }
resource = logging.resource "gae_app",
                            "module_id" => "1",
                            "version_id" => "20150925t173233"

async.write_entries [entry1, entry2],
                    log_name: "my_app_log",
                    resource: resource,
                    labels: labels

Attributes

interval[R]

@private Implementation accessors

logging[R]

@private Implementation accessors

max_bytes[R]

@private Implementation accessors

max_count[R]

@private Implementation accessors

max_queue[R]

@private Implementation accessors

partial_success[R]

@private Implementation accessors

threads[R]

@private Implementation accessors

Public Class Methods

new(logging, max_count: 10_000, max_bytes: 10_000_000, max_queue: 100, interval: 5, threads: 10, partial_success: false) click to toggle source

@private Creates a new AsyncWriter instance.

Calls superclass method
# File lib/google/cloud/logging/async_writer.rb, line 68
def initialize logging, max_count: 10_000, max_bytes: 10_000_000,
               max_queue: 100, interval: 5, threads: 10,
               partial_success: false
  # init MonitorMixin
  super()

  @logging = logging

  @max_count = max_count
  @max_bytes = max_bytes
  @max_queue = max_queue
  @interval  = interval
  @threads   = threads

  @partial_success = partial_success

  @error_callbacks = []

  @cond = new_cond

  # Make sure all buffered messages are sent when process exits.
  at_exit { stop }
end

Public Instance Methods

async_stop!(timeout = nil, force: nil)
Alias for: stop!
flush() click to toggle source

Forces all entries in the current batch to be published immediately.

@return [AsyncWriter] returns self so calls can be chained.

# File lib/google/cloud/logging/async_writer.rb, line 280
def flush
  synchronize do
    publish_batch!
    @cond.broadcast
  end

  self
end
last_error() click to toggle source

The most recent unhandled error to occur while transmitting log entries.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume buffering, batching, and transmitting log entries.

@return [Exception, nil] error The most recent error raised.

@example

require "google/cloud/logging"

logging = Google::Cloud::Logging.new

resource = logging.resource "gae_app",
                            module_id: "1",
                            version_id: "20150925t173233"

async = logging.async_writer

logger = async.logger "my_app_log", resource, env: :production
logger.info "Job started."

# If an error was raised, it can be retrieved here:
async.last_error #=> nil
# File lib/google/cloud/logging/async_writer.rb, line 371
def last_error
  synchronize { @last_error }
end
Also aliased as: last_exception
last_exception()
Alias for: last_error
logger(log_name, resource, labels = {}) click to toggle source

Creates a logger instance that is API-compatible with Ruby's standard library [Logger](ruby-doc.org/stdlib/libdoc/logger/rdoc).

The logger will use AsyncWriter to transmit log entries on a background thread.

@param [String] log_name A log resource name to be associated with the

written log entries.

@param [Google::Cloud::Logging::Resource] resource The monitored

resource to be associated with written log entries.

@param [Hash] labels A set of user-defined data to be associated with

written log entries.

@return [Google::Cloud::Logging::Logger] a Logger object that can be

used in place of a ruby standard library logger object.

@example

require "google/cloud/logging"

logging = Google::Cloud::Logging.new

resource = logging.resource "gae_app",
                            module_id: "1",
                            version_id: "20150925t173233"

async = logging.async_writer
logger = async.logger "my_app_log", resource, env: :production
logger.info "Job started."
# File lib/google/cloud/logging/async_writer.rb, line 195
def logger log_name, resource, labels = {}
  Logger.new self, log_name, resource, labels
end
on_error(&block) click to toggle source

Register to be notified of errors when raised.

If an unhandled error has occurred the writer will attempt to recover from the error and resume buffering, batching, and transmitting log entries

Multiple error handlers can be added.

@yield [callback] The block to be called when an error is raised. @yieldparam [Exception] error The error raised.

@example

require "google/cloud/logging"
require "google/cloud/error_reporting"

logging = Google::Cloud::Logging.new

resource = logging.resource "gae_app",
                            module_id: "1",
                            version_id: "20150925t173233"

async = logging.async_writer

# Register to be notified when unhandled errors occur.
async.on_error do |error|
  # error can be a AsyncWriterError or AsyncWriteEntriesError
  Google::Cloud::ErrorReporting.report error
end

logger = async.logger "my_app_log", resource, env: :production
logger.info "Job started."
# File lib/google/cloud/logging/async_writer.rb, line 338
def on_error &block
  synchronize do
    @error_callbacks << block
  end
end
started?() click to toggle source

Whether the writer has been started.

@return [boolean] `true` when started, `false` otherwise.

# File lib/google/cloud/logging/async_writer.rb, line 293
def started?
  !stopped?
end
stop() click to toggle source

Begins the process of stopping the writer. Entries already in the queue will be published, but no new entries can be added. Use {#wait!} to block until the writer is fully stopped and all pending entries have been published.

@return [AsyncWriter] returns self so calls can be chained.

# File lib/google/cloud/logging/async_writer.rb, line 206
def stop
  synchronize do
    break if @stopped

    @stopped = true
    publish_batch!
    @cond.broadcast
    @thread_pool&.shutdown
  end

  self
end
stop!(timeout = nil, force: nil) click to toggle source

Stop this asynchronous writer and block until it has been stopped.

@param [Number, nil] timeout The maximum number of seconds to wait for

shutdown to complete. Will wait forever when the value is `nil`. The
default value is `nil`.

@param [Boolean] force If set to true, and the writer hasn't stopped

within the given timeout, kill it forcibly by terminating the
thread. This should be used with extreme caution, as it can
leave RPCs unfinished. Default is false.

@return [Symbol] Returns `:new` if {#write_entries} has never been

called on the AsyncWriter, `:stopped` if it was already stopped
at the time of invocation, `:waited` if it stopped during the
timeout period, `:timeout` if it is still running after the
timeout, or `:forced` if it was forcibly killed.
# File lib/google/cloud/logging/async_writer.rb, line 258
def stop! timeout = nil, force: nil
  return :new unless @thread_pool
  return :stopped if stopped?

  stop
  wait! timeout

  if synchronize { @thread_pool.shutdown? }
    return :waited if timeout
  elsif force
    @thread_pool.kill
    return :forced
  end
  :timeout
end
Also aliased as: async_stop!
stopped?() click to toggle source

Whether the writer has been stopped.

@return [boolean] `true` when stopped, `false` otherwise.

# File lib/google/cloud/logging/async_writer.rb, line 301
def stopped?
  synchronize { @stopped }
end
wait!(timeout = nil) click to toggle source

Blocks until the writer is fully stopped, all pending entries have been published, and all callbacks have completed. Does not stop the writer. To stop the writer, first call {#stop} and then call {#wait!} to block until the writer is stopped.

@param [Number, nil] timeout The maximum number of seconds to wait for

shutdown to complete. Will wait forever when the value is `nil`. The
default value is `nil`.

@return [AsyncWriter] returns self so calls can be chained.

# File lib/google/cloud/logging/async_writer.rb, line 230
def wait! timeout = nil
  synchronize do
    if @thread_pool
      @thread_pool.shutdown
      @thread_pool.wait_for_termination timeout
    end
  end

  self
end
write_entries(entries, log_name: nil, resource: nil, labels: nil) click to toggle source

Asynchronously write one or more log entries to the Stackdriver Logging service.

Unlike the main write_entries method, this method usually does not block. The actual write RPCs will happen in the background, and may be batched with related calls. However, if the queue is full, this method will block until enough space has cleared out.

@param [Google::Cloud::Logging::Entry,

Array<Google::Cloud::Logging::Entry>] entries One or more entry
objects to write. The log entries must have values for all required
fields.

@param [String] log_name A default log ID for those log entries in

`entries` that do not specify their own `log_name`. See also
{Entry#log_name=}.

@param [Resource] resource A default monitored resource for those log

entries in entries that do not specify their own resource. See also
{Entry#resource}.

@param [Hash{Symbol,String => String}] labels User-defined `key:value`

items that are added to the `labels` field of each log entry in
`entries`, except when a log entry specifies its own `key:value`
item with the same key. See also {Entry#labels=}.

@return [Google::Cloud::Logging::AsyncWriter] Returns self.

@example

require "google/cloud/logging"

logging = Google::Cloud::Logging.new
async = logging.async_writer

entry = logging.entry payload: "Job started.",
                      log_name: "my_app_log"
entry.resource.type = "gae_app"
entry.resource.labels[:module_id] = "1"
entry.resource.labels[:version_id] = "20150925t173233"

async.write_entries entry
# File lib/google/cloud/logging/async_writer.rb, line 132
def write_entries entries, log_name: nil, resource: nil, labels: nil
  synchronize do
    raise "AsyncWriter has been stopped" if @stopped

    Array(entries).each do |entry|
      # Update the entry to have all the data directly on it
      entry.log_name ||= log_name
      if entry.resource.nil? || entry.resource.empty?
        entry.resource = resource
      end
      entry.labels = labels if entry.labels.nil? || entry.labels.empty?

      # Add the entry to the batch
      @batch ||= Batch.new self
      next if @batch.try_add entry

      # If we can't add to the batch, publish and create a new batch
      publish_batch!
      @batch = Batch.new self
      @batch.add entry
    end

    @thread_pool ||= Concurrent::ThreadPoolExecutor.new \
      max_threads: @threads, max_queue: @max_queue
    @thread ||= Thread.new { run_background }

    publish_batch! if @batch&.ready?

    @cond.broadcast
  end
  self
end

Protected Instance Methods

default_error_callbacks() click to toggle source
# File lib/google/cloud/logging/async_writer.rb, line 416
def default_error_callbacks
  # This is memoized to reduce calls to the configuration.
  @default_error_callbacks ||= begin
    error_callback = Google::Cloud::Logging.configure.on_error
    error_callback ||= Google::Cloud.configure.on_error
    if error_callback
      [error_callback]
    else
      []
    end
  end
end
error!(error) click to toggle source

Sets the last_error and calls all error callbacks.

# File lib/google/cloud/logging/async_writer.rb, line 407
def error! error
  error_callbacks = synchronize do
    @last_error = error
    @error_callbacks
  end
  error_callbacks = default_error_callbacks if error_callbacks.empty?
  error_callbacks.each { |error_callback| error_callback.call error }
end
publish_batch!() click to toggle source
# File lib/google/cloud/logging/async_writer.rb, line 398
def publish_batch!
  return unless @batch

  batch_to_be_published = @batch
  @batch = nil
  publish_batch_async batch_to_be_published
end
publish_batch_async(batch) click to toggle source
# File lib/google/cloud/logging/async_writer.rb, line 429
def publish_batch_async batch
  Concurrent::Promises.future_on(
    @thread_pool, batch.entries
  ) do |entries|
    write_entries_with entries
  end
rescue Concurrent::RejectedExecutionError => e
  async_error = AsyncWriterError.new(
    "Error writing entries: #{e.message}",
    batch.entries
  )
  # Manually set backtrace so we don't have to raise
  async_error.set_backtrace caller
  error! async_error
end
run_background() click to toggle source
# File lib/google/cloud/logging/async_writer.rb, line 378
def run_background
  synchronize do
    until @stopped
      if @batch.nil?
        @cond.wait
        next
      end

      if @batch.ready?
        # interval met, publish the batch...
        publish_batch!
        @cond.wait
      else
        # still waiting for the interval to publish the batch...
        @cond.wait @batch.publish_wait
      end
    end
  end
end
write_entries_with(entries) click to toggle source
# File lib/google/cloud/logging/async_writer.rb, line 445
def write_entries_with entries
  logging.write_entries entries, partial_success: partial_success
rescue StandardError => e
  write_error = AsyncWriteEntriesError.new(
    "Error writing entries: #{e.message}",
    entries
  )
  # Manually set backtrace so we don't have to raise
  write_error.set_backtrace caller
  error! write_error
end