class Google::Cloud::PubSub::Subscriber::Sequencer
@private The sequencer's job is simple, keep track of all the streams's recieved message and deliver the messages with an ordering_key in the order they were recieved. The sequencer ensures only one callback can be performed at a time per ordering_key.
Public Class Methods
new(&block)
click to toggle source
@private Create an empty Subscriber::Sequencer
object.
Calls superclass method
# File lib/google/cloud/pubsub/subscriber/sequencer.rb, line 32 def initialize &block raise ArgumentError if block.nil? super() # to init MonitorMixin @seq_hash = Hash.new { |hash, key| hash[key] = [] } @process_callback = block end
Public Instance Methods
add(message)
click to toggle source
@private Add a ReceivedMessage
to the sequencer.
# File lib/google/cloud/pubsub/subscriber/sequencer.rb, line 43 def add message # Messages without ordering_key are not managed by the sequencer if message.ordering_key.empty? @process_callback.call message return end perform_callback = synchronize do # The purpose of this block is to add the message to the # sequencer, and to return whether the message should be processed # immediately, or whether it will be processed later by #next. We # want to ensure that these operations happen atomically. @seq_hash[message.ordering_key].push message @seq_hash[message.ordering_key].count == 1 end @process_callback.call message if perform_callback end
inspect()
click to toggle source
@private
# File lib/google/cloud/pubsub/subscriber/sequencer.rb, line 106 def inspect "#<#{self.class.name} (#{self})>" end
next(message)
click to toggle source
@private Indicate a ReceivedMessage
was processed, and the next in the queue can now be processed.
# File lib/google/cloud/pubsub/subscriber/sequencer.rb, line 66 def next message # Messages without ordering_key are not managed by the sequencer return if message.ordering_key.empty? next_message = synchronize do # The purpose of this block is to remove the message that was # processed from the sequencer, and to return the next message to # be processed. We want to ensure that these operations happen # atomically. # The message should be at index 0, so this should be a very quick # operation. if @seq_hash[message.ordering_key].first != message # Raising this error will stop the other messages with this # ordering key from being processed by the callback (delivered). raise OrderedMessageDeliveryError, message end # Remove the message @seq_hash[message.ordering_key].shift # Retrieve the next message to be processed, or nil if empty next_msg = @seq_hash[message.ordering_key].first # Remove the ordering_key from hash when empty @seq_hash.delete message.ordering_key if next_msg.nil? # Return the next message to be processed, or nil if empty next_msg end @process_callback.call next_message unless next_message.nil? end
to_s()
click to toggle source
@private
# File lib/google/cloud/pubsub/subscriber/sequencer.rb, line 101 def to_s "#{@seq_hash.count}/#{@seq_hash.values.sum(&:count)}" end