class Google::Cloud::PubSub::FlowController

@private

Used to control the flow of messages passing through it.

Attributes

awaiting[R]

@private Implementation accessors

byte_limit[R]
limit_exceeded_behavior[R]
message_limit[R]
outstanding_bytes[R]

@private Implementation accessors

outstanding_messages[R]

@private Implementation accessors

Public Class Methods

new(message_limit: 1000, byte_limit: 10_000_000, limit_exceeded_behavior: :ignore) click to toggle source
# File lib/google/cloud/pubsub/flow_controller.rb, line 35
def initialize message_limit: 1000, byte_limit: 10_000_000, limit_exceeded_behavior: :ignore
  unless [:ignore, :error, :block].include? limit_exceeded_behavior
    raise ArgumentError, "limit_exceeded_behavior must be one of :ignore, :error, :block"
  end
  if [:error, :block].include?(limit_exceeded_behavior) && message_limit < 1
    raise ArgumentError,
          "Flow control message limit (#{message_limit}) exceeded by a single message, would block forever"
  end
  @mutex = Mutex.new
  @message_limit = message_limit
  @byte_limit = byte_limit
  @limit_exceeded_behavior = limit_exceeded_behavior
  @outstanding_messages = 0
  @outstanding_bytes = 0

  @awaiting = []
end

Public Instance Methods

acquire(message_size) click to toggle source
# File lib/google/cloud/pubsub/flow_controller.rb, line 53
def acquire message_size
  return if limit_exceeded_behavior == :ignore
  @mutex.lock
  if limit_exceeded_behavior == :error && would_exceed_message_limit?
    raise FlowControlLimitError, "Flow control message limit (#{message_limit}) would be exceeded"
  end
  if limit_exceeded_behavior == :error && would_exceed_byte_limit?(message_size)
    raise FlowControlLimitError,
          "Flow control byte limit (#{byte_limit}) would be exceeded, message_size: #{message_size}"
  end
  if limit_exceeded_behavior == :block && message_size > byte_limit
    raise FlowControlLimitError,
          "Flow control byte limit (#{byte_limit}) exceeded by a single message, would block forever"
  end

  acquire_or_wait message_size
ensure
  @mutex.unlock if @mutex.owned?
end
release(message_size) click to toggle source
# File lib/google/cloud/pubsub/flow_controller.rb, line 73
def release message_size
  return if limit_exceeded_behavior == :ignore
  @mutex.synchronize do
    raise "Flow control messages count would be negative" if (@outstanding_messages - 1).negative?
    raise "Flow control bytes count would be negative" if (@outstanding_bytes - message_size).negative?

    @outstanding_messages -= 1
    @outstanding_bytes -= message_size
    @awaiting.first.set unless @awaiting.empty?
  end
end

Protected Instance Methods

acquire_or_wait(message_size) click to toggle source

rubocop:disable Style/IdenticalConditionalBranches rubocop:disable Style/GuardClause

# File lib/google/cloud/pubsub/flow_controller.rb, line 90
def acquire_or_wait message_size
  waiter = nil
  while is_new_and_others_wait?(waiter) ||
        would_exceed_byte_limit?(message_size) ||
        would_exceed_message_limit?

    if waiter.nil?
      waiter = Concurrent::Event.new
      # This waiter gets added to the back of the line.
      @awaiting << waiter
    else
      waiter = Concurrent::Event.new
      # This waiter already in line stays at the head of the line.
      @awaiting[0] = waiter
    end
    @mutex.unlock
    waiter.wait
    @mutex.lock
  end
  @outstanding_messages += 1
  @outstanding_bytes += message_size

  @awaiting.shift if waiter # Remove the newly released waiter from the head of the queue.

  # There may be some surplus left; let the next message waiting try to acquire a permit.
  if !@awaiting.empty? && @outstanding_bytes < byte_limit && @outstanding_messages < message_limit
    @awaiting.first.set
  end
end
is_new_and_others_wait?(waiter) click to toggle source

rubocop:enable Style/IdenticalConditionalBranches rubocop:enable Style/GuardClause

# File lib/google/cloud/pubsub/flow_controller.rb, line 123
def is_new_and_others_wait? waiter
  waiter.nil? && !@awaiting.empty?
end
would_exceed_byte_limit?(bytes_requested) click to toggle source
# File lib/google/cloud/pubsub/flow_controller.rb, line 131
def would_exceed_byte_limit? bytes_requested
  @outstanding_bytes + bytes_requested > byte_limit
end
would_exceed_message_limit?() click to toggle source
# File lib/google/cloud/pubsub/flow_controller.rb, line 127
def would_exceed_message_limit?
  @outstanding_messages + 1 > message_limit
end