class Google::Cloud::PubSub::Subscriber::Stream
@private
Attributes
callback_thread_pool[R]
@private Implementation attributes.
inventory[R]
@private Inventory
.
sequencer[R]
@private Sequencer
.
subscriber[R]
@private Subscriber
attributes.
Public Class Methods
new(subscriber)
click to toggle source
@private Create an empty Subscriber::Stream
object.
Calls superclass method
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 51 def initialize subscriber super() # to init MonitorMixin @subscriber = subscriber @request_queue = nil @stopped = nil @paused = nil @pause_cond = new_cond @inventory = Inventory.new self, **@subscriber.stream_inventory @sequencer = Sequencer.new(&method(:perform_callback_async)) if subscriber.message_ordering @callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads @stream_keepalive_task = Concurrent::TimerTask.new( execution_interval: 30 ) do # push empty request every 30 seconds to keep stream alive push Google::Cloud::PubSub::V1::StreamingPullRequest.new unless inventory.empty? end.execute end
Public Instance Methods
acknowledge(*messages)
click to toggle source
@private
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 132 def acknowledge *messages ack_ids = coerce_ack_ids messages return true if ack_ids.empty? synchronize do @inventory.remove ack_ids @subscriber.buffer.acknowledge ack_ids end true end
inspect()
click to toggle source
@private
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 199 def inspect "#<#{self.class.name} #{self}>" end
modify_ack_deadline(deadline, *messages)
click to toggle source
@private
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 146 def modify_ack_deadline deadline, *messages mod_ack_ids = coerce_ack_ids messages return true if mod_ack_ids.empty? synchronize do @inventory.remove mod_ack_ids @subscriber.buffer.modify_ack_deadline deadline, mod_ack_ids end true end
paused?()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 115 def paused? synchronize { @paused } end
push(request)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 174 def push request synchronize { @request_queue.push request } end
release(*messages)
click to toggle source
@private
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 160 def release *messages ack_ids = coerce_ack_ids messages return if ack_ids.empty? synchronize do # Remove from inventory if the message was not explicitly acked or # nacked in the callback @inventory.remove ack_ids # Check whether to unpause the stream only after the callback is # completed and the thread is being reclaimed. unpause_streaming! end end
renew_lease!()
click to toggle source
@private
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 180 def renew_lease! synchronize do return true if @inventory.empty? @inventory.remove_expired! @subscriber.buffer.renew_lease @subscriber.deadline, @inventory.ack_ids unpause_streaming! end true end
running?()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 119 def running? !stopped? end
start()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 75 def start synchronize do break if @background_thread @inventory.start start_streaming! end self end
stop()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 87 def stop synchronize do break if @stopped # Close the stream by pushing the sentinel value. # The unary pusher does not use the stream, so it can close here. @request_queue&.push self # Signal to the background thread that we are stopped. @stopped = true @pause_cond.broadcast # Now that the reception thread is stopped, immediately stop the # callback thread pool. All queued callbacks will see the stream # is stopped and perform a noop. @callback_thread_pool.shutdown # Once all the callbacks are stopped, we can stop the inventory. @inventory.stop end self end
stopped?()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 111 def stopped? synchronize { @stopped } end
to_s()
click to toggle source
@private
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 193 def to_s seq_str = "sequenced: #{sequencer}, " if sequencer "(inventory: #{@inventory.count}, #{seq_str}status: #{status}, thread: #{thread_status})" end
wait!(timeout = nil)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 123 def wait! timeout = nil # Wait for all queued callbacks to be processed. @callback_thread_pool.wait_for_termination timeout self end
Protected Instance Methods
background_run()
click to toggle source
rubocop:disable all
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 210 def background_run synchronize do # Don't allow a stream to restart if already stopped return if @stopped @stopped = false @paused = false # signal to the previous queue to shut down old_queue = [] old_queue = @request_queue.quit_and_dump_queue if @request_queue # Always create a new request queue @request_queue = EnumeratorQueue.new self @request_queue.push initial_input_request old_queue.each { |obj| @request_queue.push obj } end # Call the StreamingPull API to get the response enumerator enum = @subscriber.service.streaming_pull @request_queue.each loop do synchronize do if @paused && !@stopped @pause_cond.wait next end end # Break loop, close thread if stopped break if synchronize { @stopped } begin # Cannot syncronize the enumerator, causes deadlock response = enum.next # Use synchronize so both changes happen atomically synchronize do # Create receipt of received messages reception @subscriber.buffer.modify_ack_deadline @subscriber.deadline, response.received_messages.map(&:ack_id) # Add received messages to inventory @inventory.add response.received_messages end response.received_messages.each do |rec_msg_grpc| rec_msg = ReceivedMessage.from_grpc(rec_msg_grpc, self) # No need to synchronize the callback future register_callback rec_msg end synchronize { pause_streaming! } rescue StopIteration break end end # Has the loop broken but we aren't stopped? # Could be GRPC has thrown an internal error, so restart. raise RestartStream unless synchronize { @stopped } # We must be stopped, tell the stream to quit. stop rescue GRPC::Cancelled, GRPC::DeadlineExceeded, GRPC::Internal, GRPC::ResourceExhausted, GRPC::Unauthenticated, GRPC::Unavailable # Restart the stream with an incremental back for a retriable error. retry rescue RestartStream retry rescue StandardError => e @subscriber.error! e retry end
coerce_ack_ids(messages)
click to toggle source
Makes sure the values are the `ack_id`. If given several {ReceivedMessage} objects extract the `ack_id` values.
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 374 def coerce_ack_ids messages Array(messages).flatten.map do |msg| msg.respond_to?(:ack_id) ? msg.ack_id : msg.to_s end end
initial_input_request()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 359 def initial_input_request Google::Cloud::PubSub::V1::StreamingPullRequest.new.tap do |req| req.subscription = @subscriber.subscription_name req.stream_ack_deadline_seconds = @subscriber.deadline req.modify_deadline_ack_ids += @inventory.ack_ids req.modify_deadline_seconds += @inventory.ack_ids.map { @subscriber.deadline } req.client_id = @subscriber.service.client_id req.max_outstanding_messages = @inventory.use_legacy_flow_control ? 0 : @inventory.limit req.max_outstanding_bytes = @inventory.use_legacy_flow_control ? 0 : @inventory.bytesize end end
pause_streaming!()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 331 def pause_streaming! return unless pause_streaming? @paused = true end
pause_streaming?()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 337 def pause_streaming? return if @stopped return if @paused @inventory.full? end
perform_callback_async(rec_msg)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 298 def perform_callback_async rec_msg return unless callback_thread_pool.running? Concurrent::Promises.future_on( callback_thread_pool, rec_msg, &method(:perform_callback_sync) ) end
perform_callback_sync(rec_msg)
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 306 def perform_callback_sync rec_msg @subscriber.callback.call rec_msg unless stopped? rescue StandardError => e @subscriber.error! e ensure release rec_msg if @sequencer && running? begin @sequencer.next rec_msg rescue OrderedMessageDeliveryError => e @subscriber.error! e end end end
register_callback(rec_msg)
click to toggle source
rubocop:enable all
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 288 def register_callback rec_msg if @sequencer # Add the message to the sequencer to invoke the callback. @sequencer.add rec_msg else # Call user provided code for received message perform_callback_async rec_msg end end
start_streaming!()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 321 def start_streaming! # A Stream will only ever have one background thread. If the thread # dies because it was stopped, or because of an unhandled error that # could not be recovered from, so be it. return if @background_thread # create new background thread to handle new enumerator @background_thread = Thread.new { background_run } end
status()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 380 def status return "stopped" if stopped? return "paused" if paused? "running" end
thread_status()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 386 def thread_status return "not started" if @background_thread.nil? status = @background_thread.status return "error" if status.nil? return "stopped" if status == false status end
unpause_streaming!()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 344 def unpause_streaming! return unless unpause_streaming? @paused = nil # signal to the background thread that we are unpaused @pause_cond.broadcast end
unpause_streaming?()
click to toggle source
# File lib/google/cloud/pubsub/subscriber/stream.rb, line 352 def unpause_streaming? return if @stopped return if @paused.nil? @inventory.count < @inventory.limit * 0.8 end