class Google::Cloud::PubSub::FlowController
@private
Used to control the flow of messages passing through it.
Attributes
awaiting[R]
@private Implementation accessors
byte_limit[R]
limit_exceeded_behavior[R]
message_limit[R]
outstanding_bytes[R]
@private Implementation accessors
outstanding_messages[R]
@private Implementation accessors
Public Class Methods
new(message_limit: 1000, byte_limit: 10_000_000, limit_exceeded_behavior: :ignore)
click to toggle source
# File lib/google/cloud/pubsub/flow_controller.rb, line 35 def initialize message_limit: 1000, byte_limit: 10_000_000, limit_exceeded_behavior: :ignore unless [:ignore, :error, :block].include? limit_exceeded_behavior raise ArgumentError, "limit_exceeded_behavior must be one of :ignore, :error, :block" end if [:error, :block].include?(limit_exceeded_behavior) && message_limit < 1 raise ArgumentError, "Flow control message limit (#{message_limit}) exceeded by a single message, would block forever" end @mutex = Mutex.new @message_limit = message_limit @byte_limit = byte_limit @limit_exceeded_behavior = limit_exceeded_behavior @outstanding_messages = 0 @outstanding_bytes = 0 @awaiting = [] end
Public Instance Methods
acquire(message_size)
click to toggle source
# File lib/google/cloud/pubsub/flow_controller.rb, line 53 def acquire message_size return if limit_exceeded_behavior == :ignore @mutex.lock if limit_exceeded_behavior == :error && would_exceed_message_limit? raise FlowControlLimitError, "Flow control message limit (#{message_limit}) would be exceeded" end if limit_exceeded_behavior == :error && would_exceed_byte_limit?(message_size) raise FlowControlLimitError, "Flow control byte limit (#{byte_limit}) would be exceeded, message_size: #{message_size}" end if limit_exceeded_behavior == :block && message_size > byte_limit raise FlowControlLimitError, "Flow control byte limit (#{byte_limit}) exceeded by a single message, would block forever" end acquire_or_wait message_size ensure @mutex.unlock if @mutex.owned? end
release(message_size)
click to toggle source
# File lib/google/cloud/pubsub/flow_controller.rb, line 73 def release message_size return if limit_exceeded_behavior == :ignore @mutex.synchronize do raise "Flow control messages count would be negative" if (@outstanding_messages - 1).negative? raise "Flow control bytes count would be negative" if (@outstanding_bytes - message_size).negative? @outstanding_messages -= 1 @outstanding_bytes -= message_size @awaiting.first.set unless @awaiting.empty? end end
Protected Instance Methods
acquire_or_wait(message_size)
click to toggle source
rubocop:disable Style/IdenticalConditionalBranches rubocop:disable Style/GuardClause
# File lib/google/cloud/pubsub/flow_controller.rb, line 90 def acquire_or_wait message_size waiter = nil while is_new_and_others_wait?(waiter) || would_exceed_byte_limit?(message_size) || would_exceed_message_limit? if waiter.nil? waiter = Concurrent::Event.new # This waiter gets added to the back of the line. @awaiting << waiter else waiter = Concurrent::Event.new # This waiter already in line stays at the head of the line. @awaiting[0] = waiter end @mutex.unlock waiter.wait @mutex.lock end @outstanding_messages += 1 @outstanding_bytes += message_size @awaiting.shift if waiter # Remove the newly released waiter from the head of the queue. # There may be some surplus left; let the next message waiting try to acquire a permit. if !@awaiting.empty? && @outstanding_bytes < byte_limit && @outstanding_messages < message_limit @awaiting.first.set end end
is_new_and_others_wait?(waiter)
click to toggle source
rubocop:enable Style/IdenticalConditionalBranches rubocop:enable Style/GuardClause
# File lib/google/cloud/pubsub/flow_controller.rb, line 123 def is_new_and_others_wait? waiter waiter.nil? && !@awaiting.empty? end
would_exceed_byte_limit?(bytes_requested)
click to toggle source
# File lib/google/cloud/pubsub/flow_controller.rb, line 131 def would_exceed_byte_limit? bytes_requested @outstanding_bytes + bytes_requested > byte_limit end
would_exceed_message_limit?()
click to toggle source
# File lib/google/cloud/pubsub/flow_controller.rb, line 127 def would_exceed_message_limit? @outstanding_messages + 1 > message_limit end