class Google::Cloud::PubSub::Subscriber
Subscriber
object used to stream and process messages from a Subscription
. See {Google::Cloud::PubSub::Subscription#listen}
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen do |received_message| # process message received_message.acknowledge! end # Start background threads that will call the block passed to listen. subscriber.start # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
@attr_reader [String] subscription_name
The name of the subscription the
messages are pulled from.
@attr_reader [Proc] callback The procedure that will handle the messages
received from the subscription.
@attr_reader [Numeric] deadline The default number of seconds the stream
will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.
@attr_reader [Boolean] message_ordering
Whether message ordering has
been enabled.
@attr_reader [Integer] streams The number of concurrent streams to open
to pull messages from the subscription. Default is 4
@attr_reader [Integer] callback_threads
The number of threads used to
handle the received messages. Default is 8
@attr_reader [Integer] push_threads
The number of threads to handle
acknowledgement ({ReceivedMessage#ack!}) and delay messages ({ReceivedMessage#nack!}, {ReceivedMessage#modify_ack_deadline!}). Default is 4.
Attributes
@private Implementation attributes.
@private Implementation attributes.
@private Implementation attributes.
@private Implementation attributes.
Public Class Methods
@private Create an empty {Subscriber} object.
# File lib/google/cloud/pubsub/subscriber.rb, line 81 def initialize subscription_name, callback, deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}, service: nil super() # to init MonitorMixin @callback = callback @error_callbacks = [] @subscription_name = subscription_name @deadline = deadline || 60 @streams = streams || 2 coerce_inventory inventory @message_ordering = message_ordering @callback_threads = Integer(threads[:callback] || 8) @push_threads = Integer(threads[:push] || 4) @service = service @started = @stopped = nil stream_pool = Array.new @streams do Thread.new { Stream.new self } end @stream_pool = stream_pool.map(&:value) @buffer = TimedUnaryBuffer.new self end
Public Instance Methods
@private returns error object from the stream thread.
# File lib/google/cloud/pubsub/subscriber.rb, line 347 def error! error error_callbacks = synchronize do @last_error = error @error_callbacks end error_callbacks = default_error_callbacks if error_callbacks.empty? error_callbacks.each { |error_callback| error_callback.call error } end
@private
# File lib/google/cloud/pubsub/subscriber.rb, line 364 def inspect "#<#{self.class.name} #{self}>" end
@deprecated Use {#max_total_lease_duration}.
The most recent unhandled error to occur while listening to messages on the subscriber.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
@return [Exception, nil] error The most recent error raised.
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen do |received_message| # process message received_message.acknowledge! end # Start listening for messages and errors. subscriber.start # If an error was raised, it can be retrieved here: subscriber.last_error #=> nil # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
# File lib/google/cloud/pubsub/subscriber.rb, line 274 def last_error synchronize { @last_error } end
The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).
@return [Integer] The maximum number of seconds.
# File lib/google/cloud/pubsub/subscriber.rb, line 330 def max_duration_per_lease_extension @inventory[:max_duration_per_lease_extension] end
The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).
@return [Integer] The maximum number of bytes.
# File lib/google/cloud/pubsub/subscriber.rb, line 296 def max_outstanding_bytes @inventory[:max_outstanding_bytes] end
The number of received messages to be collected by subscriber. Default is 1,000.
@return [Integer] The maximum number of messages.
# File lib/google/cloud/pubsub/subscriber.rb, line 283 def max_outstanding_messages @inventory[:max_outstanding_messages] end
The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).
@return [Integer] The maximum number of seconds.
# File lib/google/cloud/pubsub/subscriber.rb, line 318 def max_total_lease_duration @inventory[:max_total_lease_duration] end
Register to be notified of errors when raised.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
Multiple error handlers can be added.
@yield [callback] The block to be called when an error is raised. @yieldparam [Exception] error The error raised.
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen do |received_message| # process message received_message.acknowledge! end # Register to be notified when unhandled errors occur. subscriber.on_error do |error| # log error puts error end # Start listening for messages and errors. subscriber.start # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
# File lib/google/cloud/pubsub/subscriber.rb, line 238 def on_error &block synchronize do @error_callbacks << block end end
Starts the subscriber pulling from the subscription and processing the received messages.
@return [Subscriber] returns self so calls can be chained.
# File lib/google/cloud/pubsub/subscriber.rb, line 113 def start start_pool = synchronize do @started = true @stopped = false # Start the buffer before the streams are all started @buffer.start @stream_pool.map do |stream| Thread.new { stream.start } end end start_pool.map(&:join) self end
Whether the subscriber has been started.
@return [boolean] `true` when started, `false` otherwise.
# File lib/google/cloud/pubsub/subscriber.rb, line 190 def started? synchronize { @started } end
Immediately stops the subscriber. No new messages will be pulled from the subscription. Use {#wait!} to block until all received messages have been processed or released: All actions taken on received messages that have not yet been sent to the API will be sent to the API. All received but unprocessed messages will be released back to the API and redelivered.
@return [Subscriber] returns self so calls can be chained.
# File lib/google/cloud/pubsub/subscriber.rb, line 138 def stop synchronize do @started = false @stopped = true @stream_pool.map(&:stop) wait_stop_buffer_thread! self end end
Stop this subscriber and block until the subscriber is fully stopped and all received messages have been processed or released, or until `timeout` seconds have passed.
The same as calling {#stop} and {#wait!}.
@param [Number, nil] timeout The number of seconds to block until the
subscriber is fully stopped. Default will block indefinitely.
@return [Subscriber] returns self so calls can be chained.
# File lib/google/cloud/pubsub/subscriber.rb, line 180 def stop! timeout = nil stop wait! timeout end
Whether the subscriber has been stopped.
@return [boolean] `true` when stopped, `false` otherwise.
# File lib/google/cloud/pubsub/subscriber.rb, line 199 def stopped? synchronize { @stopped } end
@private
# File lib/google/cloud/pubsub/subscriber.rb, line 336 def stream_inventory { limit: @inventory[:max_outstanding_messages].fdiv(@streams).ceil, bytesize: @inventory[:max_outstanding_bytes].fdiv(@streams).ceil, extension: @inventory[:max_total_lease_duration], max_duration_per_lease_extension: @inventory[:max_duration_per_lease_extension], use_legacy_flow_control: @inventory[:use_legacy_flow_control] } end
@private
# File lib/google/cloud/pubsub/subscriber.rb, line 358 def to_s "(subscription: #{subscription_name}, streams: [#{stream_pool.map(&:to_s).join(', ')}])" end
Whether to enforce flow control at the client side only or to enforce it at both the client and the server. For more details about flow control see cloud.google.com/pubsub/docs/pull#config.
@return [Boolean] `true` when only client side flow control is enforced, `false` when both client and server side flow control are enforced.
# File lib/google/cloud/pubsub/subscriber.rb, line 309 def use_legacy_flow_control? @inventory[:use_legacy_flow_control] end
Blocks until the subscriber is fully stopped and all received messages have been processed or released, or until `timeout` seconds have passed.
Does not stop the subscriber. To stop the subscriber, first call {#stop} and then call {#wait!} to block until the subscriber is stopped.
@param [Number, nil] timeout The number of seconds to block until the
subscriber is fully stopped. Default will block indefinitely.
@return [Subscriber] returns self so calls can be chained.
# File lib/google/cloud/pubsub/subscriber.rb, line 162 def wait! timeout = nil wait_stop_buffer_thread! @wait_stop_buffer_thread.join timeout self end
Protected Instance Methods
# File lib/google/cloud/pubsub/subscriber.rb, line 382 def coerce_inventory inventory @inventory = inventory if @inventory.is_a? Hash @inventory = @inventory.dup # Support deprecated field names @inventory[:max_outstanding_messages] ||= @inventory.delete :limit @inventory[:max_outstanding_bytes] ||= @inventory.delete :bytesize @inventory[:max_total_lease_duration] ||= @inventory.delete :extension else @inventory = { max_outstanding_messages: @inventory } end @inventory[:max_outstanding_messages] = Integer(@inventory[:max_outstanding_messages] || 1000) @inventory[:max_outstanding_bytes] = Integer(@inventory[:max_outstanding_bytes] || 100_000_000) @inventory[:max_total_lease_duration] = Integer(@inventory[:max_total_lease_duration] || 3600) @inventory[:max_duration_per_lease_extension] = Integer(@inventory[:max_duration_per_lease_extension] || 0) @inventory[:use_legacy_flow_control] = @inventory[:use_legacy_flow_control] || false end
# File lib/google/cloud/pubsub/subscriber.rb, line 400 def default_error_callbacks # This is memoized to reduce calls to the configuration. @default_error_callbacks ||= begin error_callback = Google::Cloud::PubSub.configure.on_error error_callback ||= Google::Cloud.configure.on_error if error_callback [error_callback] else [] end end end
Starts a new thread to call wait! (blocking) on each Stream
and then stop the TimedUnaryBuffer
.
# File lib/google/cloud/pubsub/subscriber.rb, line 372 def wait_stop_buffer_thread! synchronize do @wait_stop_buffer_thread ||= Thread.new do @stream_pool.map(&:wait!) # Shutdown the buffer TimerTask (and flush the buffer) after the streams are all stopped. @buffer.stop end end end