class Google::Cloud::PubSub::Subscriber::Stream

@private

Attributes

callback_thread_pool[R]

@private Implementation attributes.

inventory[R]

@private Inventory.

sequencer[R]

@private Sequencer.

subscriber[R]

@private Subscriber attributes.

Public Class Methods

new(subscriber) click to toggle source

@private Create an empty Subscriber::Stream object.

Calls superclass method
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 51
def initialize subscriber
  super() # to init MonitorMixin

  @subscriber = subscriber

  @request_queue = nil
  @stopped = nil
  @paused  = nil
  @pause_cond = new_cond

  @inventory = Inventory.new self, **@subscriber.stream_inventory

  @sequencer = Sequencer.new(&method(:perform_callback_async)) if subscriber.message_ordering

  @callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads

  @stream_keepalive_task = Concurrent::TimerTask.new(
    execution_interval: 30
  ) do
    # push empty request every 30 seconds to keep stream alive
    push Google::Cloud::PubSub::V1::StreamingPullRequest.new unless inventory.empty?
  end.execute
end

Public Instance Methods

acknowledge(*messages) click to toggle source

@private

# File lib/google/cloud/pubsub/subscriber/stream.rb, line 132
def acknowledge *messages
  ack_ids = coerce_ack_ids messages
  return true if ack_ids.empty?

  synchronize do
    @inventory.remove ack_ids
    @subscriber.buffer.acknowledge ack_ids
  end

  true
end
inspect() click to toggle source

@private

# File lib/google/cloud/pubsub/subscriber/stream.rb, line 199
def inspect
  "#<#{self.class.name} #{self}>"
end
modify_ack_deadline(deadline, *messages) click to toggle source

@private

# File lib/google/cloud/pubsub/subscriber/stream.rb, line 146
def modify_ack_deadline deadline, *messages
  mod_ack_ids = coerce_ack_ids messages
  return true if mod_ack_ids.empty?

  synchronize do
    @inventory.remove mod_ack_ids
    @subscriber.buffer.modify_ack_deadline deadline, mod_ack_ids
  end

  true
end
paused?() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 115
def paused?
  synchronize { @paused }
end
push(request) click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 174
def push request
  synchronize { @request_queue.push request }
end
release(*messages) click to toggle source

@private

# File lib/google/cloud/pubsub/subscriber/stream.rb, line 160
def release *messages
  ack_ids = coerce_ack_ids messages
  return if ack_ids.empty?

  synchronize do
    # Remove from inventory if the message was not explicitly acked or
    # nacked in the callback
    @inventory.remove ack_ids
    # Check whether to unpause the stream only after the callback is
    # completed and the thread is being reclaimed.
    unpause_streaming!
  end
end
renew_lease!() click to toggle source

@private

# File lib/google/cloud/pubsub/subscriber/stream.rb, line 180
def renew_lease!
  synchronize do
    return true if @inventory.empty?

    @inventory.remove_expired!
    @subscriber.buffer.renew_lease @subscriber.deadline, @inventory.ack_ids
    unpause_streaming!
  end

  true
end
running?() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 119
def running?
  !stopped?
end
start() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 75
def start
  synchronize do
    break if @background_thread

    @inventory.start

    start_streaming!
  end

  self
end
stop() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 87
def stop
  synchronize do
    break if @stopped

    # Close the stream by pushing the sentinel value.
    # The unary pusher does not use the stream, so it can close here.
    @request_queue&.push self

    # Signal to the background thread that we are stopped.
    @stopped = true
    @pause_cond.broadcast

    # Now that the reception thread is stopped, immediately stop the
    # callback thread pool. All queued callbacks will see the stream
    # is stopped and perform a noop.
    @callback_thread_pool.shutdown

    # Once all the callbacks are stopped, we can stop the inventory.
    @inventory.stop
  end

  self
end
stopped?() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 111
def stopped?
  synchronize { @stopped }
end
to_s() click to toggle source

@private

# File lib/google/cloud/pubsub/subscriber/stream.rb, line 193
def to_s
  seq_str = "sequenced: #{sequencer}, " if sequencer
  "(inventory: #{@inventory.count}, #{seq_str}status: #{status}, thread: #{thread_status})"
end
wait!(timeout = nil) click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 123
def wait! timeout = nil
  # Wait for all queued callbacks to be processed.
  @callback_thread_pool.wait_for_termination timeout

  self
end

Protected Instance Methods

background_run() click to toggle source

rubocop:disable all

# File lib/google/cloud/pubsub/subscriber/stream.rb, line 210
def background_run
  synchronize do
    # Don't allow a stream to restart if already stopped
    return if @stopped

    @stopped = false
    @paused  = false

    # signal to the previous queue to shut down
    old_queue = []
    old_queue = @request_queue.quit_and_dump_queue if @request_queue

    # Always create a new request queue
    @request_queue = EnumeratorQueue.new self
    @request_queue.push initial_input_request
    old_queue.each { |obj| @request_queue.push obj }
  end

  # Call the StreamingPull API to get the response enumerator
  enum = @subscriber.service.streaming_pull @request_queue.each

  loop do
    synchronize do
      if @paused && !@stopped
        @pause_cond.wait
        next
      end
    end

    # Break loop, close thread if stopped
    break if synchronize { @stopped }

    begin
      # Cannot syncronize the enumerator, causes deadlock
      response = enum.next

      # Use synchronize so both changes happen atomically
      synchronize do
        # Create receipt of received messages reception
        @subscriber.buffer.modify_ack_deadline @subscriber.deadline, response.received_messages.map(&:ack_id)

        # Add received messages to inventory
        @inventory.add response.received_messages
      end

      response.received_messages.each do |rec_msg_grpc|
        rec_msg = ReceivedMessage.from_grpc(rec_msg_grpc, self)
        # No need to synchronize the callback future
        register_callback rec_msg
      end
      synchronize { pause_streaming! }
    rescue StopIteration
      break
    end
  end

  # Has the loop broken but we aren't stopped?
  # Could be GRPC has thrown an internal error, so restart.
  raise RestartStream unless synchronize { @stopped }

  # We must be stopped, tell the stream to quit.
  stop
rescue GRPC::Cancelled, GRPC::DeadlineExceeded, GRPC::Internal,
       GRPC::ResourceExhausted, GRPC::Unauthenticated,
       GRPC::Unavailable
  # Restart the stream with an incremental back for a retriable error.

  retry
rescue RestartStream
  retry
rescue StandardError => e
  @subscriber.error! e

  retry
end
coerce_ack_ids(messages) click to toggle source

Makes sure the values are the `ack_id`. If given several {ReceivedMessage} objects extract the `ack_id` values.

# File lib/google/cloud/pubsub/subscriber/stream.rb, line 374
def coerce_ack_ids messages
  Array(messages).flatten.map do |msg|
    msg.respond_to?(:ack_id) ? msg.ack_id : msg.to_s
  end
end
initial_input_request() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 359
def initial_input_request
  Google::Cloud::PubSub::V1::StreamingPullRequest.new.tap do |req|
    req.subscription = @subscriber.subscription_name
    req.stream_ack_deadline_seconds = @subscriber.deadline
    req.modify_deadline_ack_ids += @inventory.ack_ids
    req.modify_deadline_seconds += @inventory.ack_ids.map { @subscriber.deadline }
    req.client_id = @subscriber.service.client_id
    req.max_outstanding_messages = @inventory.use_legacy_flow_control ? 0 : @inventory.limit
    req.max_outstanding_bytes = @inventory.use_legacy_flow_control ? 0 : @inventory.bytesize
  end
end
pause_streaming!() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 331
def pause_streaming!
  return unless pause_streaming?

  @paused = true
end
pause_streaming?() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 337
def pause_streaming?
  return if @stopped
  return if @paused

  @inventory.full?
end
perform_callback_async(rec_msg) click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 298
def perform_callback_async rec_msg
  return unless callback_thread_pool.running?

  Concurrent::Promises.future_on(
    callback_thread_pool, rec_msg, &method(:perform_callback_sync)
  )
end
perform_callback_sync(rec_msg) click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 306
def perform_callback_sync rec_msg
  @subscriber.callback.call rec_msg unless stopped?
rescue StandardError => e
  @subscriber.error! e
ensure
  release rec_msg
  if @sequencer && running?
    begin
      @sequencer.next rec_msg
    rescue OrderedMessageDeliveryError => e
      @subscriber.error! e
    end
  end
end
register_callback(rec_msg) click to toggle source

rubocop:enable all

# File lib/google/cloud/pubsub/subscriber/stream.rb, line 288
def register_callback rec_msg
  if @sequencer
    # Add the message to the sequencer to invoke the callback.
    @sequencer.add rec_msg
  else
    # Call user provided code for received message
    perform_callback_async rec_msg
  end
end
start_streaming!() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 321
def start_streaming!
  # A Stream will only ever have one background thread. If the thread
  # dies because it was stopped, or because of an unhandled error that
  # could not be recovered from, so be it.
  return if @background_thread

  # create new background thread to handle new enumerator
  @background_thread = Thread.new { background_run }
end
status() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 380
def status
  return "stopped" if stopped?
  return "paused" if paused?
  "running"
end
thread_status() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 386
def thread_status
  return "not started" if @background_thread.nil?

  status = @background_thread.status
  return "error" if status.nil?
  return "stopped" if status == false
  status
end
unpause_streaming!() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 344
def unpause_streaming!
  return unless unpause_streaming?

  @paused = nil
  # signal to the background thread that we are unpaused
  @pause_cond.broadcast
end
unpause_streaming?() click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 352
def unpause_streaming?
  return if @stopped
  return if @paused.nil?

  @inventory.count < @inventory.limit * 0.8
end