class Google::Cloud::PubSub::Subscriber::Sequencer

@private The sequencer's job is simple, keep track of all the streams's recieved message and deliver the messages with an ordering_key in the order they were recieved. The sequencer ensures only one callback can be performed at a time per ordering_key.

Public Class Methods

new(&block) click to toggle source

@private Create an empty Subscriber::Sequencer object.

Calls superclass method
# File lib/google/cloud/pubsub/subscriber/sequencer.rb, line 32
def initialize &block
  raise ArgumentError if block.nil?

  super() # to init MonitorMixin

  @seq_hash = Hash.new { |hash, key| hash[key] = [] }
  @process_callback = block
end

Public Instance Methods

add(message) click to toggle source

@private Add a ReceivedMessage to the sequencer.

# File lib/google/cloud/pubsub/subscriber/sequencer.rb, line 43
def add message
  # Messages without ordering_key are not managed by the sequencer
  if message.ordering_key.empty?
    @process_callback.call message
    return
  end

  perform_callback = synchronize do
    # The purpose of this block is to add the message to the
    # sequencer, and to return whether the message should be processed
    # immediately, or whether it will be processed later by #next. We
    # want to ensure that these operations happen atomically.

    @seq_hash[message.ordering_key].push message
    @seq_hash[message.ordering_key].count == 1
  end

  @process_callback.call message if perform_callback
end
inspect() click to toggle source

@private

# File lib/google/cloud/pubsub/subscriber/sequencer.rb, line 106
def inspect
  "#<#{self.class.name} (#{self})>"
end
next(message) click to toggle source

@private Indicate a ReceivedMessage was processed, and the next in the queue can now be processed.

# File lib/google/cloud/pubsub/subscriber/sequencer.rb, line 66
def next message
  # Messages without ordering_key are not managed by the sequencer
  return if message.ordering_key.empty?

  next_message = synchronize do
    # The purpose of this block is to remove the message that was
    # processed from the sequencer, and to return the next message to
    # be processed. We want to ensure that these operations happen
    # atomically.

    # The message should be at index 0, so this should be a very quick
    # operation.
    if @seq_hash[message.ordering_key].first != message
      # Raising this error will stop the other messages with this
      # ordering key from being processed by the callback (delivered).
      raise OrderedMessageDeliveryError, message
    end

    # Remove the message
    @seq_hash[message.ordering_key].shift

    # Retrieve the next message to be processed, or nil if empty
    next_msg = @seq_hash[message.ordering_key].first

    # Remove the ordering_key from hash when empty
    @seq_hash.delete message.ordering_key if next_msg.nil?

    # Return the next message to be processed, or nil if empty
    next_msg
  end

  @process_callback.call next_message unless next_message.nil?
end
to_s() click to toggle source

@private

# File lib/google/cloud/pubsub/subscriber/sequencer.rb, line 101
def to_s
  "#{@seq_hash.count}/#{@seq_hash.values.sum(&:count)}"
end