class Google::Cloud::PubSub::Subscriber::Inventory
@private
Constants
- InventoryItem
Attributes
bytesize[R]
extension[R]
limit[R]
max_duration_per_lease_extension[R]
stream[R]
use_legacy_flow_control[R]
Public Class Methods
new(stream, limit:, bytesize:, extension:, max_duration_per_lease_extension:, use_legacy_flow_control: super())
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 40 def initialize stream, limit:, bytesize:, extension:, max_duration_per_lease_extension:, use_legacy_flow_control: super() @stream = stream @limit = limit @bytesize = bytesize @extension = extension @max_duration_per_lease_extension = max_duration_per_lease_extension @use_legacy_flow_control = use_legacy_flow_control @inventory = {} @wait_cond = new_cond end
Public Instance Methods
ack_ids()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 53 def ack_ids @inventory.keys end
add(*rec_msgs)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 57 def add *rec_msgs rec_msgs.flatten! rec_msgs.compact! return if rec_msgs.empty? synchronize do rec_msgs.each do |rec_msg| @inventory[rec_msg.ack_id] = InventoryItem.from rec_msg end @wait_cond.broadcast end end
count()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 89 def count synchronize do @inventory.count end end
empty?()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 101 def empty? synchronize do @inventory.empty? end end
full?()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 126 def full? synchronize do @inventory.count >= limit || @inventory.values.sum(&:bytesize) >= bytesize end end
remove(*ack_ids)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 70 def remove *ack_ids ack_ids.flatten! ack_ids.compact! return if ack_ids.empty? synchronize do @inventory.delete_if { |ack_id, _| ack_ids.include? ack_id } @wait_cond.broadcast end end
remove_expired!()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 81 def remove_expired! synchronize do extension_time = Time.new - extension @inventory.delete_if { |_ack_id, item| item.pulled_at < extension_time } @wait_cond.broadcast end end
start()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 107 def start @background_thread ||= Thread.new { background_run } self end
stop()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 113 def stop synchronize do @stopped = true @wait_cond.broadcast end self end
stopped?()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 122 def stopped? synchronize { @stopped } end
total_bytesize()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 95 def total_bytesize synchronize do @inventory.values.sum(&:bytesize) end end
Protected Instance Methods
background_run()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 134 def background_run delay_target = nil until stopped? if empty? delay_target = nil synchronize { @wait_cond.wait } # wait until broadcast next end delay_target ||= calc_target delay_gap = delay_target - Time.now unless delay_gap.positive? delay_target = calc_target stream.renew_lease! next end synchronize { @wait_cond.wait delay_gap } end end
calc_delay()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 162 def calc_delay delay = (stream.subscriber.deadline - 3) * rand(0.8..0.9) delay = [delay, max_duration_per_lease_extension].min if max_duration_per_lease_extension.positive? delay end
calc_target()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 158 def calc_target Time.now + calc_delay end