class Google::Cloud::Logging::AsyncWriter
AsyncWriter
buffers, batches, and transmits log entries efficiently. Writing log entries is asynchronous and will not block.
Batches that cannot be delivered immediately are queued. When the queue is full new batch requests will raise errors that can be consumed using the {#on_error} callback. This provides back pressure in case the writer cannot keep up with requests.
This object is thread-safe; it may accept write requests from multiple threads simultaneously, and will serialize them when executing in the background thread.
@example
require "google/cloud/logging" logging = Google::Cloud::Logging.new async = logging.async_writer entry1 = logging.entry payload: "Job started." entry2 = logging.entry payload: "Job completed." labels = { job_size: "large", job_code: "red" } resource = logging.resource "gae_app", "module_id" => "1", "version_id" => "20150925t173233" async.write_entries [entry1, entry2], log_name: "my_app_log", resource: resource, labels: labels
Attributes
@private Implementation accessors
@private Implementation accessors
@private Implementation accessors
@private Implementation accessors
@private Implementation accessors
@private Implementation accessors
@private Implementation accessors
Public Class Methods
@private Creates a new AsyncWriter
instance.
# File lib/google/cloud/logging/async_writer.rb, line 68 def initialize logging, max_count: 10_000, max_bytes: 10_000_000, max_queue: 100, interval: 5, threads: 10, partial_success: false # init MonitorMixin super() @logging = logging @max_count = max_count @max_bytes = max_bytes @max_queue = max_queue @interval = interval @threads = threads @partial_success = partial_success @error_callbacks = [] @cond = new_cond # Make sure all buffered messages are sent when process exits. at_exit { stop } end
Public Instance Methods
Forces all entries in the current batch to be published immediately.
@return [AsyncWriter] returns self so calls can be chained.
# File lib/google/cloud/logging/async_writer.rb, line 280 def flush synchronize do publish_batch! @cond.broadcast end self end
The most recent unhandled error to occur while transmitting log entries.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume buffering, batching, and transmitting log entries.
@return [Exception, nil] error The most recent error raised.
@example
require "google/cloud/logging" logging = Google::Cloud::Logging.new resource = logging.resource "gae_app", module_id: "1", version_id: "20150925t173233" async = logging.async_writer logger = async.logger "my_app_log", resource, env: :production logger.info "Job started." # If an error was raised, it can be retrieved here: async.last_error #=> nil
# File lib/google/cloud/logging/async_writer.rb, line 371 def last_error synchronize { @last_error } end
Creates a logger instance that is API-compatible with Ruby's standard library [Logger](ruby-doc.org/stdlib/libdoc/logger/rdoc).
The logger will use AsyncWriter
to transmit log entries on a background thread.
@param [String] log_name A log resource name to be associated with the
written log entries.
@param [Google::Cloud::Logging::Resource] resource The monitored
resource to be associated with written log entries.
@param [Hash] labels A set of user-defined data to be associated with
written log entries.
@return [Google::Cloud::Logging::Logger] a Logger
object that can be
used in place of a ruby standard library logger object.
@example
require "google/cloud/logging" logging = Google::Cloud::Logging.new resource = logging.resource "gae_app", module_id: "1", version_id: "20150925t173233" async = logging.async_writer logger = async.logger "my_app_log", resource, env: :production logger.info "Job started."
# File lib/google/cloud/logging/async_writer.rb, line 195 def logger log_name, resource, labels = {} Logger.new self, log_name, resource, labels end
Register to be notified of errors when raised.
If an unhandled error has occurred the writer will attempt to recover from the error and resume buffering, batching, and transmitting log entries
Multiple error handlers can be added.
@yield [callback] The block to be called when an error is raised. @yieldparam [Exception] error The error raised.
@example
require "google/cloud/logging" require "google/cloud/error_reporting" logging = Google::Cloud::Logging.new resource = logging.resource "gae_app", module_id: "1", version_id: "20150925t173233" async = logging.async_writer # Register to be notified when unhandled errors occur. async.on_error do |error| # error can be a AsyncWriterError or AsyncWriteEntriesError Google::Cloud::ErrorReporting.report error end logger = async.logger "my_app_log", resource, env: :production logger.info "Job started."
# File lib/google/cloud/logging/async_writer.rb, line 338 def on_error &block synchronize do @error_callbacks << block end end
Whether the writer has been started.
@return [boolean] `true` when started, `false` otherwise.
# File lib/google/cloud/logging/async_writer.rb, line 293 def started? !stopped? end
Begins the process of stopping the writer. Entries already in the queue will be published, but no new entries can be added. Use {#wait!} to block until the writer is fully stopped and all pending entries have been published.
@return [AsyncWriter] returns self so calls can be chained.
# File lib/google/cloud/logging/async_writer.rb, line 206 def stop synchronize do break if @stopped @stopped = true publish_batch! @cond.broadcast @thread_pool&.shutdown end self end
Stop this asynchronous writer and block until it has been stopped.
@param [Number, nil] timeout The maximum number of seconds to wait for
shutdown to complete. Will wait forever when the value is `nil`. The default value is `nil`.
@param [Boolean] force If set to true, and the writer hasn't stopped
within the given timeout, kill it forcibly by terminating the thread. This should be used with extreme caution, as it can leave RPCs unfinished. Default is false.
@return [Symbol] Returns `:new` if {#write_entries} has never been
called on the AsyncWriter, `:stopped` if it was already stopped at the time of invocation, `:waited` if it stopped during the timeout period, `:timeout` if it is still running after the timeout, or `:forced` if it was forcibly killed.
# File lib/google/cloud/logging/async_writer.rb, line 258 def stop! timeout = nil, force: nil return :new unless @thread_pool return :stopped if stopped? stop wait! timeout if synchronize { @thread_pool.shutdown? } return :waited if timeout elsif force @thread_pool.kill return :forced end :timeout end
Whether the writer has been stopped.
@return [boolean] `true` when stopped, `false` otherwise.
# File lib/google/cloud/logging/async_writer.rb, line 301 def stopped? synchronize { @stopped } end
Blocks until the writer is fully stopped, all pending entries have been published, and all callbacks have completed. Does not stop the writer. To stop the writer, first call {#stop} and then call {#wait!} to block until the writer is stopped.
@param [Number, nil] timeout The maximum number of seconds to wait for
shutdown to complete. Will wait forever when the value is `nil`. The default value is `nil`.
@return [AsyncWriter] returns self so calls can be chained.
# File lib/google/cloud/logging/async_writer.rb, line 230 def wait! timeout = nil synchronize do if @thread_pool @thread_pool.shutdown @thread_pool.wait_for_termination timeout end end self end
Asynchronously write one or more log entries to the Stackdriver Logging
service.
Unlike the main write_entries
method, this method usually does not block. The actual write RPCs will happen in the background, and may be batched with related calls. However, if the queue is full, this method will block until enough space has cleared out.
@param [Google::Cloud::Logging::Entry,
Array<Google::Cloud::Logging::Entry>] entries One or more entry objects to write. The log entries must have values for all required fields.
@param [String] log_name A default log ID for those log entries in
`entries` that do not specify their own `log_name`. See also {Entry#log_name=}.
@param [Resource] resource A default monitored resource for those log
entries in entries that do not specify their own resource. See also {Entry#resource}.
@param [Hash{Symbol,String => String}] labels User-defined `key:value`
items that are added to the `labels` field of each log entry in `entries`, except when a log entry specifies its own `key:value` item with the same key. See also {Entry#labels=}.
@return [Google::Cloud::Logging::AsyncWriter] Returns self.
@example
require "google/cloud/logging" logging = Google::Cloud::Logging.new async = logging.async_writer entry = logging.entry payload: "Job started.", log_name: "my_app_log" entry.resource.type = "gae_app" entry.resource.labels[:module_id] = "1" entry.resource.labels[:version_id] = "20150925t173233" async.write_entries entry
# File lib/google/cloud/logging/async_writer.rb, line 132 def write_entries entries, log_name: nil, resource: nil, labels: nil synchronize do raise "AsyncWriter has been stopped" if @stopped Array(entries).each do |entry| # Update the entry to have all the data directly on it entry.log_name ||= log_name if entry.resource.nil? || entry.resource.empty? entry.resource = resource end entry.labels = labels if entry.labels.nil? || entry.labels.empty? # Add the entry to the batch @batch ||= Batch.new self next if @batch.try_add entry # If we can't add to the batch, publish and create a new batch publish_batch! @batch = Batch.new self @batch.add entry end @thread_pool ||= Concurrent::ThreadPoolExecutor.new \ max_threads: @threads, max_queue: @max_queue @thread ||= Thread.new { run_background } publish_batch! if @batch&.ready? @cond.broadcast end self end
Protected Instance Methods
# File lib/google/cloud/logging/async_writer.rb, line 416 def default_error_callbacks # This is memoized to reduce calls to the configuration. @default_error_callbacks ||= begin error_callback = Google::Cloud::Logging.configure.on_error error_callback ||= Google::Cloud.configure.on_error if error_callback [error_callback] else [] end end end
Sets the last_error
and calls all error callbacks.
# File lib/google/cloud/logging/async_writer.rb, line 407 def error! error error_callbacks = synchronize do @last_error = error @error_callbacks end error_callbacks = default_error_callbacks if error_callbacks.empty? error_callbacks.each { |error_callback| error_callback.call error } end
# File lib/google/cloud/logging/async_writer.rb, line 398 def publish_batch! return unless @batch batch_to_be_published = @batch @batch = nil publish_batch_async batch_to_be_published end
# File lib/google/cloud/logging/async_writer.rb, line 429 def publish_batch_async batch Concurrent::Promises.future_on( @thread_pool, batch.entries ) do |entries| write_entries_with entries end rescue Concurrent::RejectedExecutionError => e async_error = AsyncWriterError.new( "Error writing entries: #{e.message}", batch.entries ) # Manually set backtrace so we don't have to raise async_error.set_backtrace caller error! async_error end
# File lib/google/cloud/logging/async_writer.rb, line 378 def run_background synchronize do until @stopped if @batch.nil? @cond.wait next end if @batch.ready? # interval met, publish the batch... publish_batch! @cond.wait else # still waiting for the interval to publish the batch... @cond.wait @batch.publish_wait end end end end
# File lib/google/cloud/logging/async_writer.rb, line 445 def write_entries_with entries logging.write_entries entries, partial_success: partial_success rescue StandardError => e write_error = AsyncWriteEntriesError.new( "Error writing entries: #{e.message}", entries ) # Manually set backtrace so we don't have to raise write_error.set_backtrace caller error! write_error end