class Google::Cloud::PubSub::AsyncPublisher::Batch

@private

Constants

Item

Attributes

items[R]
ordering_key[R]

Public Class Methods

new(publisher, ordering_key) click to toggle source
Calls superclass method
# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 31
def initialize publisher, ordering_key
  # init MonitorMixin
  super()

  @publisher = publisher
  @ordering_key = ordering_key
  @items = []
  @queue = []
  @default_message_bytes = publisher.topic_name.bytesize + 2
  @total_message_bytes = @default_message_bytes
  @publishing = false
  @stopping = false
  @canceled = false
end

Public Instance Methods

add(msg, callback) click to toggle source

Adds a message and callback to the batch.

The method will indicate how the message is added. It will either be added to the active list of items, it will be queued to be picked up once the active publishing job has been completed, or it will indicate that the batch is full and a publishing job should be created.

@param [Google::Cloud::PubSub::V1::PubsubMessage] msg The message. @param [Proc, nil] callback The callback.

@return [Symbol] The state of the batch.

* `:added` - Added to the active list of items to be published.
* `:queued` - Batch is publishing, and the messsage is queued.
* `:full` - Batch is full and ready to be published, and the
  message is queued.
# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 65
def add msg, callback
  synchronize do
    raise AsyncPublisherStopped if @stopping
    raise OrderingKeyError, @ordering_key if @canceled

    if @publishing
      queue_add msg, callback
      :queued
    elsif try_add msg, callback
      :added
    else
      queue_add msg, callback
      :full
    end
  end
end
cancel!() click to toggle source

Cancel the batch and hault futher batches until resumed. See {#resume!} and {#canceled?}.

@return [Array<Item}] All items, including queued items

# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 209
def cancel!
  synchronize do
    @canceled = true
    @items + @queue
  end
end
canceled?() click to toggle source

Indicates whether the batch has been canceled due to an error while publishing. See {#cancel!} and {#resume!}.

@return [Boolean]

# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 242
def canceled?
  # This does not need to be synchronized because nothing un-stops
  synchronize { @canceled }
end
empty?() click to toggle source

Determines whether the batch is empty and ready to be culled.

# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 250
def empty?
  synchronize do
    return false if @publishing || @canceled || @stopping

    @items.empty? && @queue.empty?
  end
end
publish!(stop: nil) click to toggle source

Marks the batch to be published.

The method will indicate whether a new publishing job should be started to publish the batch. See {publishing?}.

@param [Boolean] stop Indicates whether the batch should also be

marked for stopping, and any existing publish job should publish
all items until the batch is empty.

@return [Boolean] Returns whether a new publishing job should be

started to publish the batch. If the batch is already being
published then this will return `false`.
# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 96
def publish! stop: nil
  synchronize do
    @stopping = true if stop

    return false if @canceled

    # If we are already publishing, do not indicate a new job needs to
    # be started.
    return false if @publishing

    @publishing = !(@items.empty? && @queue.empty?)
  end
end
publishing?() click to toggle source

Indicates whether the batch has an active publishing job.

@return [Boolean]

# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 115
def publishing?
  # This probably does not need to be synchronized
  @publishing
end
rebalance!() click to toggle source

Fills the batch by sequentially moving the queued items that will fit into the active item list.

This method is only intended to be used by the active publishing job.

# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 138
def rebalance!
  synchronize do
    return [] if @canceled

    until @queue.empty?
      item = @queue.first
      if try_add item.msg, item.callback
        @queue.shift
        next
      end
      break
    end

    @items
  end
end
reset!() click to toggle source

Resets the batch after a successful publish. This clears the active item list and moves the queued items that will fit into the active item list.

If the batch has enough queued items to fill the batch again, the publishing job should continue to publish the reset batch until the batch indicated it should stop.

This method is only intended to be used by the active publishing job.

@return [Boolean] Whether the active publishing job should continue

publishing after the reset.
# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 170
def reset!
  synchronize do
    @items = []
    @total_message_bytes = @default_message_bytes

    if @canceled
      @queue = []
      @publishing = false
      return false
    end

    until @queue.empty?
      item = @queue.first
      added = try_add item.msg, item.callback
      break unless added
      @queue.shift
    end

    return false unless @publishing
    if @items.empty?
      @publishing = false
      return false
    else
      return true if stopping?
      if @queue.empty?
        @publishing = false
        return false
      end
    end
  end
  true
end
resume!() click to toggle source

Resume the batch and proceed to publish messages. See {#cancel!} and {#canceled?}.

@return [Boolean] Whether the batch was resumed.

# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 222
def resume!
  synchronize do
    # Return false if the batch is not canceled
    return false unless @canceled

    @items = []
    @queue = []
    @total_message_bytes = @default_message_bytes
    @publishing = false
    @canceled = false
  end
  true
end
stopping?() click to toggle source

Indicates whether the batch has been stopped and all items will be published until the batch is empty.

@return [Boolean]

# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 126
def stopping?
  # This does not need to be synchronized because nothing un-stops
  @stopping
end

Protected Instance Methods

items_add(msg, callback) click to toggle source
# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 260
def items_add msg, callback
  item = Item.new msg, callback
  @items << item
  @total_message_bytes += item.bytesize + 2
end
queue_add(msg, callback) click to toggle source
# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 282
def queue_add msg, callback
  item = Item.new msg, callback
  @queue << item
end
total_message_bytes() click to toggle source
# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 291
def total_message_bytes
  @total_message_bytes
end
total_message_count() click to toggle source
# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 287
def total_message_count
  @items.count
end
try_add(msg, callback) click to toggle source
# File lib/google/cloud/pubsub/async_publisher/batch.rb, line 266
def try_add msg, callback
  if @items.empty?
    # Always add when empty, even if bytesize is bigger than total
    items_add msg, callback
    return true
  end
  new_message_count = total_message_count + 1
  new_message_bytes = total_message_bytes + msg.to_proto.bytesize + 2
  if new_message_count > @publisher.max_messages ||
     new_message_bytes >= @publisher.max_bytes
    return false
  end
  items_add msg, callback
  true
end