class Google::Cloud::PubSub::Subscriber::Inventory

@private

Constants

InventoryItem

Attributes

bytesize[R]
extension[R]
limit[R]
max_duration_per_lease_extension[R]
stream[R]
use_legacy_flow_control[R]

Public Class Methods

new(stream, limit:, bytesize:, extension:, max_duration_per_lease_extension:, use_legacy_flow_control: super()) click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 40
def initialize stream, limit:, bytesize:, extension:, max_duration_per_lease_extension:,
               use_legacy_flow_control:
  super()
  @stream = stream
  @limit = limit
  @bytesize = bytesize
  @extension = extension
  @max_duration_per_lease_extension = max_duration_per_lease_extension
  @use_legacy_flow_control = use_legacy_flow_control
  @inventory = {}
  @wait_cond = new_cond
end

Public Instance Methods

ack_ids() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 53
def ack_ids
  @inventory.keys
end
add(*rec_msgs) click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 57
def add *rec_msgs
  rec_msgs.flatten!
  rec_msgs.compact!
  return if rec_msgs.empty?

  synchronize do
    rec_msgs.each do |rec_msg|
      @inventory[rec_msg.ack_id] = InventoryItem.from rec_msg
    end
    @wait_cond.broadcast
  end
end
count() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 89
def count
  synchronize do
    @inventory.count
  end
end
empty?() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 101
def empty?
  synchronize do
    @inventory.empty?
  end
end
full?() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 126
def full?
  synchronize do
    @inventory.count >= limit || @inventory.values.sum(&:bytesize) >= bytesize
  end
end
remove(*ack_ids) click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 70
def remove *ack_ids
  ack_ids.flatten!
  ack_ids.compact!
  return if ack_ids.empty?

  synchronize do
    @inventory.delete_if { |ack_id, _| ack_ids.include? ack_id }
    @wait_cond.broadcast
  end
end
remove_expired!() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 81
def remove_expired!
  synchronize do
    extension_time = Time.new - extension
    @inventory.delete_if { |_ack_id, item| item.pulled_at < extension_time }
    @wait_cond.broadcast
  end
end
start() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 107
def start
  @background_thread ||= Thread.new { background_run }

  self
end
stop() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 113
def stop
  synchronize do
    @stopped = true
    @wait_cond.broadcast
  end

  self
end
stopped?() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 122
def stopped?
  synchronize { @stopped }
end
total_bytesize() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 95
def total_bytesize
  synchronize do
    @inventory.values.sum(&:bytesize)
  end
end

Protected Instance Methods

background_run() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 134
def background_run
  delay_target = nil

  until stopped?
    if empty?
      delay_target = nil

      synchronize { @wait_cond.wait } # wait until broadcast
      next
    end

    delay_target ||= calc_target
    delay_gap = delay_target - Time.now

    unless delay_gap.positive?
      delay_target = calc_target
      stream.renew_lease!
      next
    end

    synchronize { @wait_cond.wait delay_gap }
  end
end
calc_delay() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 162
def calc_delay
  delay = (stream.subscriber.deadline - 3) * rand(0.8..0.9)
  delay = [delay, max_duration_per_lease_extension].min if max_duration_per_lease_extension.positive?
  delay
end
calc_target() click to toggle source
# File lib/google/cloud/pubsub/subscriber/inventory.rb, line 158
def calc_target
  Time.now + calc_delay
end