class Google::Cloud::PubSub::Subscriber

Subscriber object used to stream and process messages from a Subscription. See {Google::Cloud::PubSub::Subscription#listen}

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

@attr_reader [String] subscription_name The name of the subscription the

messages are pulled from.

@attr_reader [Proc] callback The procedure that will handle the messages

received from the subscription.

@attr_reader [Numeric] deadline The default number of seconds the stream

will hold received messages before modifying the message's ack
deadline. The minimum is 10, the maximum is 600. Default is 60.

@attr_reader [Boolean] message_ordering Whether message ordering has

been enabled.

@attr_reader [Integer] streams The number of concurrent streams to open

to pull messages from the subscription. Default is 4

@attr_reader [Integer] callback_threads The number of threads used to

handle the received messages. Default is 8

@attr_reader [Integer] push_threads The number of threads to handle

acknowledgement ({ReceivedMessage#ack!}) and delay messages
({ReceivedMessage#nack!}, {ReceivedMessage#modify_ack_deadline!}).
Default is 4.

Attributes

buffer[R]

@private Implementation attributes.

callback[R]
callback_threads[R]
deadline[R]
message_ordering[R]
push_threads[R]
service[R]

@private Implementation attributes.

stream_pool[R]

@private Implementation attributes.

streams[R]
subscription_name[R]
thread_pool[R]

@private Implementation attributes.

Public Class Methods

new(subscription_name, callback, deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}) click to toggle source

@private Create an empty {Subscriber} object.

Calls superclass method
# File lib/google/cloud/pubsub/subscriber.rb, line 81
def initialize subscription_name, callback, deadline: nil, message_ordering: nil, streams: nil, inventory: nil,
               threads: {}, service: nil
  super() # to init MonitorMixin

  @callback = callback
  @error_callbacks = []
  @subscription_name = subscription_name
  @deadline = deadline || 60
  @streams = streams || 2
  coerce_inventory inventory
  @message_ordering = message_ordering
  @callback_threads = Integer(threads[:callback] || 8)
  @push_threads = Integer(threads[:push] || 4)

  @service = service

  @started = @stopped = nil

  stream_pool = Array.new @streams do
    Thread.new { Stream.new self }
  end
  @stream_pool = stream_pool.map(&:value)

  @buffer = TimedUnaryBuffer.new self
end

Public Instance Methods

error!(error) click to toggle source

@private returns error object from the stream thread.

# File lib/google/cloud/pubsub/subscriber.rb, line 347
def error! error
  error_callbacks = synchronize do
    @last_error = error
    @error_callbacks
  end
  error_callbacks = default_error_callbacks if error_callbacks.empty?
  error_callbacks.each { |error_callback| error_callback.call error }
end
inspect() click to toggle source

@private

# File lib/google/cloud/pubsub/subscriber.rb, line 364
def inspect
  "#<#{self.class.name} #{self}>"
end
inventory()

@deprecated Use {#max_outstanding_messages}.

inventory_bytesize()

@deprecated Use {#max_outstanding_bytes}.

inventory_extension()

@deprecated Use {#max_total_lease_duration}.

inventory_limit()

@deprecated Use {#max_outstanding_messages}.

last_error() click to toggle source

The most recent unhandled error to occur while listening to messages on the subscriber.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.

@return [Exception, nil] error The most recent error raised.

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start listening for messages and errors.
subscriber.start

# If an error was raised, it can be retrieved here:
subscriber.last_error #=> nil

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!
# File lib/google/cloud/pubsub/subscriber.rb, line 274
def last_error
  synchronize { @last_error }
end
max_duration_per_lease_extension() click to toggle source

The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).

@return [Integer] The maximum number of seconds.

# File lib/google/cloud/pubsub/subscriber.rb, line 330
def max_duration_per_lease_extension
  @inventory[:max_duration_per_lease_extension]
end
max_outstanding_bytes() click to toggle source

The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).

@return [Integer] The maximum number of bytes.

# File lib/google/cloud/pubsub/subscriber.rb, line 296
def max_outstanding_bytes
  @inventory[:max_outstanding_bytes]
end
Also aliased as: inventory_bytesize
max_outstanding_messages() click to toggle source

The number of received messages to be collected by subscriber. Default is 1,000.

@return [Integer] The maximum number of messages.

# File lib/google/cloud/pubsub/subscriber.rb, line 283
def max_outstanding_messages
  @inventory[:max_outstanding_messages]
end
Also aliased as: inventory_limit, inventory
max_total_lease_duration() click to toggle source

The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).

@return [Integer] The maximum number of seconds.

# File lib/google/cloud/pubsub/subscriber.rb, line 318
def max_total_lease_duration
  @inventory[:max_total_lease_duration]
end
Also aliased as: inventory_extension
on_error(&block) click to toggle source

Register to be notified of errors when raised.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.

Multiple error handlers can be added.

@yield [callback] The block to be called when an error is raised. @yieldparam [Exception] error The error raised.

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Register to be notified when unhandled errors occur.
subscriber.on_error do |error|
  # log error
  puts error
end

# Start listening for messages and errors.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!
# File lib/google/cloud/pubsub/subscriber.rb, line 238
def on_error &block
  synchronize do
    @error_callbacks << block
  end
end
start() click to toggle source

Starts the subscriber pulling from the subscription and processing the received messages.

@return [Subscriber] returns self so calls can be chained.

# File lib/google/cloud/pubsub/subscriber.rb, line 113
def start
  start_pool = synchronize do
    @started = true
    @stopped = false

    # Start the buffer before the streams are all started
    @buffer.start
    @stream_pool.map do |stream|
      Thread.new { stream.start }
    end
  end
  start_pool.map(&:join)

  self
end
started?() click to toggle source

Whether the subscriber has been started.

@return [boolean] `true` when started, `false` otherwise.

# File lib/google/cloud/pubsub/subscriber.rb, line 190
def started?
  synchronize { @started }
end
stop() click to toggle source

Immediately stops the subscriber. No new messages will be pulled from the subscription. Use {#wait!} to block until all received messages have been processed or released: All actions taken on received messages that have not yet been sent to the API will be sent to the API. All received but unprocessed messages will be released back to the API and redelivered.

@return [Subscriber] returns self so calls can be chained.

# File lib/google/cloud/pubsub/subscriber.rb, line 138
def stop
  synchronize do
    @started = false
    @stopped = true
    @stream_pool.map(&:stop)
    wait_stop_buffer_thread!
    self
  end
end
stop!(timeout = nil) click to toggle source

Stop this subscriber and block until the subscriber is fully stopped and all received messages have been processed or released, or until `timeout` seconds have passed.

The same as calling {#stop} and {#wait!}.

@param [Number, nil] timeout The number of seconds to block until the

subscriber is fully stopped. Default will block indefinitely.

@return [Subscriber] returns self so calls can be chained.

# File lib/google/cloud/pubsub/subscriber.rb, line 180
def stop! timeout = nil
  stop
  wait! timeout
end
stopped?() click to toggle source

Whether the subscriber has been stopped.

@return [boolean] `true` when stopped, `false` otherwise.

# File lib/google/cloud/pubsub/subscriber.rb, line 199
def stopped?
  synchronize { @stopped }
end
stream_inventory() click to toggle source

@private

# File lib/google/cloud/pubsub/subscriber.rb, line 336
def stream_inventory
  {
    limit:                            @inventory[:max_outstanding_messages].fdiv(@streams).ceil,
    bytesize:                         @inventory[:max_outstanding_bytes].fdiv(@streams).ceil,
    extension:                        @inventory[:max_total_lease_duration],
    max_duration_per_lease_extension: @inventory[:max_duration_per_lease_extension],
    use_legacy_flow_control:          @inventory[:use_legacy_flow_control]
  }
end
to_s() click to toggle source

@private

# File lib/google/cloud/pubsub/subscriber.rb, line 358
def to_s
  "(subscription: #{subscription_name}, streams: [#{stream_pool.map(&:to_s).join(', ')}])"
end
use_legacy_flow_control?() click to toggle source

Whether to enforce flow control at the client side only or to enforce it at both the client and the server. For more details about flow control see cloud.google.com/pubsub/docs/pull#config.

@return [Boolean] `true` when only client side flow control is enforced, `false` when both client and server side flow control are enforced.

# File lib/google/cloud/pubsub/subscriber.rb, line 309
def use_legacy_flow_control?
  @inventory[:use_legacy_flow_control]
end
wait!(timeout = nil) click to toggle source

Blocks until the subscriber is fully stopped and all received messages have been processed or released, or until `timeout` seconds have passed.

Does not stop the subscriber. To stop the subscriber, first call {#stop} and then call {#wait!} to block until the subscriber is stopped.

@param [Number, nil] timeout The number of seconds to block until the

subscriber is fully stopped. Default will block indefinitely.

@return [Subscriber] returns self so calls can be chained.

# File lib/google/cloud/pubsub/subscriber.rb, line 162
def wait! timeout = nil
  wait_stop_buffer_thread!
  @wait_stop_buffer_thread.join timeout
  self
end

Protected Instance Methods

coerce_inventory(inventory) click to toggle source
# File lib/google/cloud/pubsub/subscriber.rb, line 382
def coerce_inventory inventory
  @inventory = inventory
  if @inventory.is_a? Hash
    @inventory = @inventory.dup
    # Support deprecated field names
    @inventory[:max_outstanding_messages] ||= @inventory.delete :limit
    @inventory[:max_outstanding_bytes] ||= @inventory.delete :bytesize
    @inventory[:max_total_lease_duration] ||= @inventory.delete :extension
  else
    @inventory = { max_outstanding_messages: @inventory }
  end
  @inventory[:max_outstanding_messages] = Integer(@inventory[:max_outstanding_messages] || 1000)
  @inventory[:max_outstanding_bytes] = Integer(@inventory[:max_outstanding_bytes] || 100_000_000)
  @inventory[:max_total_lease_duration] = Integer(@inventory[:max_total_lease_duration] || 3600)
  @inventory[:max_duration_per_lease_extension] = Integer(@inventory[:max_duration_per_lease_extension] || 0)
  @inventory[:use_legacy_flow_control] = @inventory[:use_legacy_flow_control] || false
end
default_error_callbacks() click to toggle source
# File lib/google/cloud/pubsub/subscriber.rb, line 400
def default_error_callbacks
  # This is memoized to reduce calls to the configuration.
  @default_error_callbacks ||= begin
    error_callback = Google::Cloud::PubSub.configure.on_error
    error_callback ||= Google::Cloud.configure.on_error
    if error_callback
      [error_callback]
    else
      []
    end
  end
end
wait_stop_buffer_thread!() click to toggle source

Starts a new thread to call wait! (blocking) on each Stream and then stop the TimedUnaryBuffer.

# File lib/google/cloud/pubsub/subscriber.rb, line 372
def wait_stop_buffer_thread!
  synchronize do
    @wait_stop_buffer_thread ||= Thread.new do
      @stream_pool.map(&:wait!)
      # Shutdown the buffer TimerTask (and flush the buffer) after the streams are all stopped.
      @buffer.stop
    end
  end
end