class Google::Cloud::PubSub::AsyncPublisher

Used to publish multiple messages in batches to a topic. See {Google::Cloud::PubSub::Topic#async_publisher}

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

topic.async_publisher.stop!

@attr_reader [String] topic_name The name of the topic the messages are published to. The value is a

fully-qualified topic name in the form `projects/{project_id}/topics/{topic_id}`.

@attr_reader [Integer] max_bytes The maximum size of messages to be collected before the batch is published.

Default is 1,000,000 (1MB).

@attr_reader [Integer] max_messages The maximum number of messages to be collected before the batch is

published. Default is 100

@attr_reader [Numeric] interval The number of seconds to collect messages before the batch is published. Default

is 0.01.

@attr_reader [Numeric] publish_threads The number of threads used to publish messages. Default is 2. @attr_reader [Numeric] callback_threads The number of threads to handle the published messages' callbacks.

Default is 4

Constants

PUBLISH_RETRY_ERRORS

rubocop:enable Metrics/AbcSize

Attributes

batch[R]

@private Implementation accessors

callback_thread_pool[R]

@private Implementation accessors

callback_threads[R]
flow_control[R]
flow_controller[R]

@private Implementation accessors

interval[R]
max_bytes[R]
max_messages[R]
publish_thread_pool[R]

@private Implementation accessors

publish_threads[R]
service[R]

@private Implementation accessors

topic_name[R]

Public Class Methods

new(topic_name, service, max_bytes: 1_000_000, max_messages: 100, interval: 0.01, threads: {}) click to toggle source

@private Create a new instance of the object.

Calls superclass method
# File lib/google/cloud/pubsub/async_publisher.rb, line 77
def initialize topic_name,
               service,
               max_bytes: 1_000_000,
               max_messages: 100,
               interval: 0.01,
               threads: {},
               flow_control: {}
  # init MonitorMixin
  super()
  @topic_name = service.topic_path topic_name
  @service    = service

  @max_bytes        = max_bytes
  @max_messages     = max_messages
  @interval         = interval
  @publish_threads  = (threads[:publish] || 2).to_i
  @callback_threads = (threads[:callback] || 4).to_i
  @flow_control = {
    message_limit: 10 * @max_messages,
    byte_limit: 10 * @max_bytes
  }.merge(flow_control).freeze

  @published_at = nil
  @publish_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @publish_threads
  @callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @callback_threads

  @ordered = false
  @batches = {}
  @cond = new_cond
  @flow_controller = FlowController.new(**@flow_control)
  @thread = Thread.new { run_background }
end

Public Instance Methods

enable_message_ordering!() click to toggle source

Enables message ordering for messages with ordering keys. When enabled, messages published with the same `ordering_key` will be delivered in the order they were published.

See {#message_ordering?}. See {Topic#publish_async}, {Subscription#listen}, and {Message#ordering_key}.

# File lib/google/cloud/pubsub/async_publisher.rb, line 262
def enable_message_ordering!
  synchronize { @ordered = true }
end
flush() click to toggle source

Forces all messages in the current batch to be published immediately.

@return [AsyncPublisher] returns self so calls can be chained.

# File lib/google/cloud/pubsub/async_publisher.rb, line 229
def flush
  synchronize do
    publish_batches!
    @cond.signal
  end

  self
end
message_ordering?() click to toggle source

Whether message ordering for messages with ordering keys has been enabled. When enabled, messages published with the same `ordering_key` will be delivered in the order they were published. When disabled, messages may be delivered in any order.

See {#enable_message_ordering!}. See {Topic#publish_async}, {Subscription#listen}, and {Message#ordering_key}.

@return [Boolean]

# File lib/google/cloud/pubsub/async_publisher.rb, line 277
def message_ordering?
  synchronize { @ordered }
end
publish(data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback) click to toggle source

Add a message to the async publisher to be published to the topic. Messages will be collected in batches and published together. See {Google::Cloud::PubSub::Topic#publish_async}

@param [String, File] data The message payload. This will be converted

to bytes encoded as ASCII-8BIT.

@param [Hash] attributes Optional attributes for the message. @param [String] ordering_key Identifies related messages for which

publish order should be respected.

@yield [result] the callback for when the message has been published @yieldparam [PublishResult] result the result of the asynchronous

publish

@raise [Google::Cloud::PubSub::AsyncPublisherStopped] when the

publisher is stopped. (See {#stop} and {#stopped?}.)

@raise [Google::Cloud::PubSub::OrderedMessagesDisabled] when

publishing a message with an `ordering_key` but ordered messages are
not enabled. (See {#message_ordering?} and
{#enable_message_ordering!}.)

@raise [Google::Cloud::PubSub::OrderingKeyError] when publishing a

message with an `ordering_key` that has already failed when
publishing. Use {#resume_publish} to allow this `ordering_key` to be
published again.
# File lib/google/cloud/pubsub/async_publisher.rb, line 134
def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback
  msg = Convert.pubsub_message data, attributes, ordering_key, extra_attrs
  begin
    @flow_controller.acquire msg.to_proto.bytesize
  rescue FlowControlLimitError => e
    stop_publish ordering_key, e if ordering_key
    raise
  end

  synchronize do
    raise AsyncPublisherStopped if @stopped
    raise OrderedMessagesDisabled if !@ordered && !msg.ordering_key.empty? # default is empty string

    batch = resolve_batch_for_message msg
    if batch.canceled?
      @flow_controller.release msg.to_proto.bytesize
      raise OrderingKeyError, batch.ordering_key
    end
    batch_action = batch.add msg, callback
    if batch_action == :full
      publish_batches!
    elsif @published_at.nil?
      # Set initial time to now to start the background counter
      @published_at = Time.now
    end
    @cond.signal
  end

  nil
end
resume_publish(ordering_key) click to toggle source

Resume publishing ordered messages for the provided ordering key.

@param [String] ordering_key Identifies related messages for which

publish order should be respected.

@return [boolean] `true` when resumed, `false` otherwise.

# File lib/google/cloud/pubsub/async_publisher.rb, line 289
def resume_publish ordering_key
  synchronize do
    batch = resolve_batch_for_ordering_key ordering_key
    return if batch.nil?
    batch.resume!
  end
end
started?() click to toggle source

Whether the publisher has been started.

@return [boolean] `true` when started, `false` otherwise.

# File lib/google/cloud/pubsub/async_publisher.rb, line 242
def started?
  !stopped?
end
stop() click to toggle source

Begins the process of stopping the publisher. Messages already in the queue will be published, but no new messages can be added. Use {#wait!} to block until the publisher is fully stopped and all pending messages have been published.

@return [AsyncPublisher] returns self so calls can be chained.

# File lib/google/cloud/pubsub/async_publisher.rb, line 172
def stop
  synchronize do
    break if @stopped

    @stopped = true
    publish_batches! stop: true
    @cond.signal
    @publish_thread_pool.shutdown
  end

  self
end
stop!(timeout = nil) click to toggle source

Stop this publisher and block until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until `timeout` seconds have passed.

The same as calling {#stop} and {#wait!}.

@param [Number, nil] timeout The number of seconds to block until the

publisher is fully stopped. Default will block indefinitely.

@return [AsyncPublisher] returns self so calls can be chained.

# File lib/google/cloud/pubsub/async_publisher.rb, line 219
def stop! timeout = nil
  stop
  wait! timeout
end
stopped?() click to toggle source

Whether the publisher has been stopped.

@return [boolean] `true` when stopped, `false` otherwise.

# File lib/google/cloud/pubsub/async_publisher.rb, line 250
def stopped?
  synchronize { @stopped }
end
wait!(timeout = nil) click to toggle source

Blocks until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until `timeout` seconds have passed.

Does not stop the publisher. To stop the publisher, first call {#stop} and then call {#wait!} to block until the publisher is stopped

@param [Number, nil] timeout The number of seconds to block until the

publisher is fully stopped. Default will block indefinitely.

@return [AsyncPublisher] returns self so calls can be chained.

# File lib/google/cloud/pubsub/async_publisher.rb, line 197
def wait! timeout = nil
  synchronize do
    @publish_thread_pool.wait_for_termination timeout

    @callback_thread_pool.shutdown
    @callback_thread_pool.wait_for_termination timeout
  end

  self
end

Protected Instance Methods

execute_callback_async(callback, publish_result) click to toggle source
# File lib/google/cloud/pubsub/async_publisher.rb, line 424
def execute_callback_async callback, publish_result
  return unless @callback_thread_pool.running?

  Concurrent::Promises.future_on(
    @callback_thread_pool, callback, publish_result
  ) do |cback, p_result|
    cback.call p_result
  end
end
publish_batch_async(topic_name, batch) click to toggle source
# File lib/google/cloud/pubsub/async_publisher.rb, line 354
def publish_batch_async topic_name, batch
  # TODO: raise unless @publish_thread_pool.running?
  return unless @publish_thread_pool.running?

  Concurrent::Promises.future_on(
    @publish_thread_pool, topic_name, batch
  ) { |t, b| publish_batch_sync t, b }
end
publish_batch_error_retryable?(error) click to toggle source
# File lib/google/cloud/pubsub/async_publisher.rb, line 420
def publish_batch_error_retryable? error
  PUBLISH_RETRY_ERRORS.any? { |klass| error.is_a? klass }
end
publish_batch_sync(topic_name, batch) click to toggle source

rubocop:disable Metrics/AbcSize

# File lib/google/cloud/pubsub/async_publisher.rb, line 365
def publish_batch_sync topic_name, batch
  # The only batch methods that are safe to call from the loop are
  # rebalance! and reset! because they are the only methods that are
  # synchronized.
  loop do
    items = batch.rebalance!

    unless items.empty?
      grpc = @service.publish topic_name, items.map(&:msg)
      items.zip Array(grpc.message_ids) do |item, id|
        @flow_controller.release item.bytesize
        next unless item.callback

        item.msg.message_id = id
        publish_result = PublishResult.from_grpc item.msg
        execute_callback_async item.callback, publish_result
      end
    end

    break unless batch.reset!
  end
rescue StandardError => e
  items = batch.items

  unless batch.ordering_key.empty?
    retry if publish_batch_error_retryable? e
    # Cancel the batch if the error is not to be retried.
    begin
      raise OrderingKeyError, batch.ordering_key
    rescue OrderingKeyError => e
      # The existing e variable is not set to OrderingKeyError
      # Get all unsent messages for the callback
      items = batch.cancel!
    end
  end

  items.each do |item|
    @flow_controller.release item.bytesize
    next unless item.callback

    publish_result = PublishResult.from_error item.msg, e
    execute_callback_async item.callback, publish_result
  end

  # publish will retry indefinitely, as long as there are unsent items.
  retry if batch.reset!
end
publish_batches!(stop: nil) click to toggle source
# File lib/google/cloud/pubsub/async_publisher.rb, line 344
def publish_batches! stop: nil
  @batches.reject! { |_ordering_key, batch| batch.empty? }
  @batches.each_value do |batch|
    ready = batch.publish! stop: stop
    publish_batch_async @topic_name, batch if ready
  end
  # Set published_at to nil to wait indefinitely
  @published_at = nil
end
resolve_batch_for_message(msg) click to toggle source
# File lib/google/cloud/pubsub/async_publisher.rb, line 321
def resolve_batch_for_message msg
  @batches[msg.ordering_key] ||= Batch.new self, msg.ordering_key
end
resolve_batch_for_ordering_key(ordering_key) click to toggle source
# File lib/google/cloud/pubsub/async_publisher.rb, line 325
def resolve_batch_for_ordering_key ordering_key
  @batches[ordering_key]
end
run_background() click to toggle source
# File lib/google/cloud/pubsub/async_publisher.rb, line 299
def run_background
  synchronize do
    until @stopped
      if @published_at.nil?
        @cond.wait
        next
      end

      time_since_first_publish = Time.now - @published_at
      if time_since_first_publish > @interval
        # interval met, flush the batches...
        publish_batches!
        @cond.wait
      else
        # still waiting for the interval to publish the batch...
        timeout = @interval - time_since_first_publish
        @cond.wait timeout
      end
    end
  end
end
stop_publish(ordering_key, err) click to toggle source
# File lib/google/cloud/pubsub/async_publisher.rb, line 329
def stop_publish ordering_key, err
  synchronize do
    batch = resolve_batch_for_ordering_key ordering_key
    return if batch.nil?
    items = batch.cancel!
    items.each do |item|
      @flow_controller.release item.bytesize
      next unless item.callback

      publish_result = PublishResult.from_error item.msg, err
      execute_callback_async item.callback, publish_result
    end
  end
end