class Google::Cloud::PubSub::AsyncPublisher
Used to publish multiple messages in batches to a topic. See {Google::Cloud::PubSub::Topic#async_publisher}
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.publish_async "task completed" do |result| if result.succeeded? log_publish_success result.data else log_publish_failure result.data, result.error end end topic.async_publisher.stop!
@attr_reader [String] topic_name
The name of the topic the messages are published to. The value is a
fully-qualified topic name in the form `projects/{project_id}/topics/{topic_id}`.
@attr_reader [Integer] max_bytes
The maximum size of messages to be collected before the batch is published.
Default is 1,000,000 (1MB).
@attr_reader [Integer] max_messages
The maximum number of messages to be collected before the batch is
published. Default is 100
@attr_reader [Numeric] interval The number of seconds to collect messages before the batch is published. Default
is 0.01.
@attr_reader [Numeric] publish_threads
The number of threads used to publish messages. Default is 2. @attr_reader [Numeric] callback_threads
The number of threads to handle the published messages' callbacks.
Default is 4
Constants
- PUBLISH_RETRY_ERRORS
rubocop:enable Metrics/AbcSize
Attributes
@private Implementation accessors
@private Implementation accessors
@private Implementation accessors
@private Implementation accessors
@private Implementation accessors
Public Class Methods
@private Create a new instance of the object.
# File lib/google/cloud/pubsub/async_publisher.rb, line 77 def initialize topic_name, service, max_bytes: 1_000_000, max_messages: 100, interval: 0.01, threads: {}, flow_control: {} # init MonitorMixin super() @topic_name = service.topic_path topic_name @service = service @max_bytes = max_bytes @max_messages = max_messages @interval = interval @publish_threads = (threads[:publish] || 2).to_i @callback_threads = (threads[:callback] || 4).to_i @flow_control = { message_limit: 10 * @max_messages, byte_limit: 10 * @max_bytes }.merge(flow_control).freeze @published_at = nil @publish_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @publish_threads @callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @callback_threads @ordered = false @batches = {} @cond = new_cond @flow_controller = FlowController.new(**@flow_control) @thread = Thread.new { run_background } end
Public Instance Methods
Enables message ordering for messages with ordering keys. When enabled, messages published with the same `ordering_key` will be delivered in the order they were published.
See {#message_ordering?}. See {Topic#publish_async}, {Subscription#listen}, and {Message#ordering_key}.
# File lib/google/cloud/pubsub/async_publisher.rb, line 262 def enable_message_ordering! synchronize { @ordered = true } end
Forces all messages in the current batch to be published immediately.
@return [AsyncPublisher] returns self so calls can be chained.
# File lib/google/cloud/pubsub/async_publisher.rb, line 229 def flush synchronize do publish_batches! @cond.signal end self end
Whether message ordering for messages with ordering keys has been enabled. When enabled, messages published with the same `ordering_key` will be delivered in the order they were published. When disabled, messages may be delivered in any order.
See {#enable_message_ordering!}. See {Topic#publish_async}, {Subscription#listen}, and {Message#ordering_key}.
@return [Boolean]
# File lib/google/cloud/pubsub/async_publisher.rb, line 277 def message_ordering? synchronize { @ordered } end
Add a message to the async publisher to be published to the topic. Messages will be collected in batches and published together. See {Google::Cloud::PubSub::Topic#publish_async}
@param [String, File] data The message payload. This will be converted
to bytes encoded as ASCII-8BIT.
@param [Hash] attributes Optional attributes for the message. @param [String] ordering_key Identifies related messages for which
publish order should be respected.
@yield [result] the callback for when the message has been published @yieldparam [PublishResult] result the result of the asynchronous
publish
@raise [Google::Cloud::PubSub::AsyncPublisherStopped] when the
publisher is stopped. (See {#stop} and {#stopped?}.)
@raise [Google::Cloud::PubSub::OrderedMessagesDisabled] when
publishing a message with an `ordering_key` but ordered messages are not enabled. (See {#message_ordering?} and {#enable_message_ordering!}.)
@raise [Google::Cloud::PubSub::OrderingKeyError] when publishing a
message with an `ordering_key` that has already failed when publishing. Use {#resume_publish} to allow this `ordering_key` to be published again.
# File lib/google/cloud/pubsub/async_publisher.rb, line 134 def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback msg = Convert.pubsub_message data, attributes, ordering_key, extra_attrs begin @flow_controller.acquire msg.to_proto.bytesize rescue FlowControlLimitError => e stop_publish ordering_key, e if ordering_key raise end synchronize do raise AsyncPublisherStopped if @stopped raise OrderedMessagesDisabled if !@ordered && !msg.ordering_key.empty? # default is empty string batch = resolve_batch_for_message msg if batch.canceled? @flow_controller.release msg.to_proto.bytesize raise OrderingKeyError, batch.ordering_key end batch_action = batch.add msg, callback if batch_action == :full publish_batches! elsif @published_at.nil? # Set initial time to now to start the background counter @published_at = Time.now end @cond.signal end nil end
Resume publishing ordered messages for the provided ordering key.
@param [String] ordering_key Identifies related messages for which
publish order should be respected.
@return [boolean] `true` when resumed, `false` otherwise.
# File lib/google/cloud/pubsub/async_publisher.rb, line 289 def resume_publish ordering_key synchronize do batch = resolve_batch_for_ordering_key ordering_key return if batch.nil? batch.resume! end end
Whether the publisher has been started.
@return [boolean] `true` when started, `false` otherwise.
# File lib/google/cloud/pubsub/async_publisher.rb, line 242 def started? !stopped? end
Begins the process of stopping the publisher. Messages already in the queue will be published, but no new messages can be added. Use {#wait!} to block until the publisher is fully stopped and all pending messages have been published.
@return [AsyncPublisher] returns self so calls can be chained.
# File lib/google/cloud/pubsub/async_publisher.rb, line 172 def stop synchronize do break if @stopped @stopped = true publish_batches! stop: true @cond.signal @publish_thread_pool.shutdown end self end
Stop this publisher and block until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until `timeout` seconds have passed.
The same as calling {#stop} and {#wait!}.
@param [Number, nil] timeout The number of seconds to block until the
publisher is fully stopped. Default will block indefinitely.
@return [AsyncPublisher] returns self so calls can be chained.
# File lib/google/cloud/pubsub/async_publisher.rb, line 219 def stop! timeout = nil stop wait! timeout end
Whether the publisher has been stopped.
@return [boolean] `true` when stopped, `false` otherwise.
# File lib/google/cloud/pubsub/async_publisher.rb, line 250 def stopped? synchronize { @stopped } end
Blocks until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until `timeout` seconds have passed.
Does not stop the publisher. To stop the publisher, first call {#stop} and then call {#wait!} to block until the publisher is stopped
@param [Number, nil] timeout The number of seconds to block until the
publisher is fully stopped. Default will block indefinitely.
@return [AsyncPublisher] returns self so calls can be chained.
# File lib/google/cloud/pubsub/async_publisher.rb, line 197 def wait! timeout = nil synchronize do @publish_thread_pool.wait_for_termination timeout @callback_thread_pool.shutdown @callback_thread_pool.wait_for_termination timeout end self end
Protected Instance Methods
# File lib/google/cloud/pubsub/async_publisher.rb, line 424 def execute_callback_async callback, publish_result return unless @callback_thread_pool.running? Concurrent::Promises.future_on( @callback_thread_pool, callback, publish_result ) do |cback, p_result| cback.call p_result end end
# File lib/google/cloud/pubsub/async_publisher.rb, line 354 def publish_batch_async topic_name, batch # TODO: raise unless @publish_thread_pool.running? return unless @publish_thread_pool.running? Concurrent::Promises.future_on( @publish_thread_pool, topic_name, batch ) { |t, b| publish_batch_sync t, b } end
# File lib/google/cloud/pubsub/async_publisher.rb, line 420 def publish_batch_error_retryable? error PUBLISH_RETRY_ERRORS.any? { |klass| error.is_a? klass } end
rubocop:disable Metrics/AbcSize
# File lib/google/cloud/pubsub/async_publisher.rb, line 365 def publish_batch_sync topic_name, batch # The only batch methods that are safe to call from the loop are # rebalance! and reset! because they are the only methods that are # synchronized. loop do items = batch.rebalance! unless items.empty? grpc = @service.publish topic_name, items.map(&:msg) items.zip Array(grpc.message_ids) do |item, id| @flow_controller.release item.bytesize next unless item.callback item.msg.message_id = id publish_result = PublishResult.from_grpc item.msg execute_callback_async item.callback, publish_result end end break unless batch.reset! end rescue StandardError => e items = batch.items unless batch.ordering_key.empty? retry if publish_batch_error_retryable? e # Cancel the batch if the error is not to be retried. begin raise OrderingKeyError, batch.ordering_key rescue OrderingKeyError => e # The existing e variable is not set to OrderingKeyError # Get all unsent messages for the callback items = batch.cancel! end end items.each do |item| @flow_controller.release item.bytesize next unless item.callback publish_result = PublishResult.from_error item.msg, e execute_callback_async item.callback, publish_result end # publish will retry indefinitely, as long as there are unsent items. retry if batch.reset! end
# File lib/google/cloud/pubsub/async_publisher.rb, line 344 def publish_batches! stop: nil @batches.reject! { |_ordering_key, batch| batch.empty? } @batches.each_value do |batch| ready = batch.publish! stop: stop publish_batch_async @topic_name, batch if ready end # Set published_at to nil to wait indefinitely @published_at = nil end
# File lib/google/cloud/pubsub/async_publisher.rb, line 321 def resolve_batch_for_message msg @batches[msg.ordering_key] ||= Batch.new self, msg.ordering_key end
# File lib/google/cloud/pubsub/async_publisher.rb, line 325 def resolve_batch_for_ordering_key ordering_key @batches[ordering_key] end
# File lib/google/cloud/pubsub/async_publisher.rb, line 299 def run_background synchronize do until @stopped if @published_at.nil? @cond.wait next end time_since_first_publish = Time.now - @published_at if time_since_first_publish > @interval # interval met, flush the batches... publish_batches! @cond.wait else # still waiting for the interval to publish the batch... timeout = @interval - time_since_first_publish @cond.wait timeout end end end end
# File lib/google/cloud/pubsub/async_publisher.rb, line 329 def stop_publish ordering_key, err synchronize do batch = resolve_batch_for_ordering_key ordering_key return if batch.nil? items = batch.cancel! items.each do |item| @flow_controller.release item.bytesize next unless item.callback publish_result = PublishResult.from_error item.msg, err execute_callback_async item.callback, publish_result end end end