class Google::Cloud::PubSub::Project

# Project

Represents the project that pubsub messages are pushed to and pulled from. {Topic} is a named resource to which messages are sent by publishers. {Subscription} is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application. {Message} is a combination of data and attributes that a publisher sends to a topic and is eventually delivered to subscribers.

See {Google::Cloud#pubsub}

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.publish "task completed"

Attributes

service[RW]

@private The Service object.

Public Class Methods

new(service) click to toggle source

@private Creates a new Pub/Sub Project instance.

# File lib/google/cloud/pubsub/project.rb, line 55
def initialize service
  @service = service
end

Public Instance Methods

create_schema(schema_id, type, definition, project: nil) click to toggle source

Creates a new schema.

@param [String] schema_id The ID to use for the schema, which will

become the final component of the schema's resource name. Required.

The schema ID (relative name) must start with a letter, and
contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
signs (`%`). It must be between 3 and 255 characters in length, and
it must not start with `goog`.

@param [String, Symbol] type The type of the schema. Required. Possible

values are case-insensitive and include:

  * `PROTOCOL_BUFFER` - A Protocol Buffer schema definition.
  * `AVRO` - An Avro schema definition.

@param [String] definition The definition of the schema. Required. This

should be a string representing the full definition of the schema that
is a valid schema definition of the type specified in `type`.

@param [String] project If the schema belongs to a project other

than the one currently connected to, the alternate project ID can be
specified here. Optional.

@return [Google::Cloud::PubSub::Schema]

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

definition = "..."
schema = pubsub.create_schema "my-schema", :avro, definition
schema.name #=> "projects/my-project/schemas/my-schema"
# File lib/google/cloud/pubsub/project.rb, line 548
def create_schema schema_id, type, definition, project: nil
  ensure_service!
  type = type.to_s.upcase
  grpc = service.create_schema schema_id, type, definition, project: project
  Schema.from_grpc grpc, service
end
Also aliased as: new_schema
create_topic(topic_name, labels: nil, kms_key: nil, persistence_regions: nil, async: nil, schema_name: nil, message_encoding: nil, retention: nil) click to toggle source

Creates a new topic.

@param [String] topic_name Name of a topic. Required.

The value can be a simple topic ID (relative name), in which
case the current project ID will be supplied, or a fully-qualified
topic name in the form `projects/{project_id}/topics/{topic_id}`.

The topic ID (relative name) must start with a letter, and
contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
signs (`%`). It must be between 3 and 255 characters in length, and
it must not start with `goog`.

@param [Hash] labels A hash of user-provided labels associated with

the topic. You can use these to organize and group your topics.
Label keys and values can be no longer than 63 characters, can only
contain lowercase letters, numeric characters, underscores and
dashes. International characters are allowed. Label values are
optional. Label keys must start with a letter and each label in the
list must have a different key. See [Creating and Managing
Labels](https://cloud.google.com/pubsub/docs/labels).

@param [String] kms_key The Cloud KMS encryption key that will be used

to protect access to messages published on this topic. Optional.
For example: `projects/a/locations/b/keyRings/c/cryptoKeys/d`

@param [Array<String>] persistence_regions The list of GCP region IDs

where messages that are published to the topic may be persisted in
storage. Optional.

@param [Hash] async A hash of values to configure the topic's

{AsyncPublisher} that is created when {Topic#publish_async}
is called. Optional.

Hash keys and values may include the following:

* `:max_bytes` (Integer) The maximum size of messages to be collected
  before the batch is published. Default is 1,000,000 (1MB).
* `:max_messages` (Integer) The maximum number of messages to be
  collected before the batch is published. Default is 100.
* `:interval` (Numeric) The number of seconds to collect messages before
  the batch is published. Default is 0.01.
* `:threads` (Hash) The number of threads to create to handle concurrent
  calls by the publisher:

  * `:publish` (Integer) The number of threads used to publish messages.
    Default is 2.
  * `:callback` (Integer) The number of threads to handle the published
    messages' callbacks. Default is 4.
* `:flow_control` (Hash) The client flow control settings for message publishing:
  * `:message_limit` (Integer) The maximum number of messages allowed to wait to be published. Default is
    `10 * max_messages`.
  * `:byte_limit` (Integer) The maximum total size of messages allowed to wait to be published. Default is
    `10 * max_bytes`.
  * `:limit_exceeded_behavior` (Symbol) The action to take when publish flow control limits are exceeded.
    Possible values include: `:ignore` - Flow control is disabled. `:error` - Calls to {Topic#publish_async}
    will raise {FlowControlLimitError} when publish flow control limits are exceeded. `:block` - Calls to
    {Topic#publish_async} will block until capacity is available when publish flow control limits are
    exceeded. The default value is `:ignore`.

@param [String] schema_name The name of the schema that messages

published should be validated against. Optional. The value can be a
simple schema ID (relative name), in which case the current project
ID will be supplied, or a fully-qualified schema name in the form
`projects/{project_id}/schemas/{schema_id}`. If provided,
`message_encoding` must also be provided.

@param [String, Symbol] message_encoding The encoding of messages validated

against the schema identified by `schema_name`. Optional. Values include:

* `JSON` - JSON encoding.
* `BINARY` - Binary encoding, as defined by the schema type. For some
  schema types, binary encoding may not be available.

@param [Numeric] retention Indicates the minimum number of seconds to retain a message

after it is published to the topic. If this field is set, messages published
to the topic within the `retention` number of seconds are always available to
subscribers. For instance, it allows any attached subscription to [seek to a
timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
that is up to `retention` number of seconds in the past. If this field is
not set, message retention is controlled by settings on individual
subscriptions. Cannot be less than 600 (10 minutes) or more than 604,800 (7 days).

@return [Google::Cloud::PubSub::Topic]

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.create_topic "my-topic"
# File lib/google/cloud/pubsub/project.rb, line 258
def create_topic topic_name,
                 labels: nil,
                 kms_key: nil,
                 persistence_regions: nil,
                 async: nil,
                 schema_name: nil,
                 message_encoding: nil,
                 retention: nil
  ensure_service!
  grpc = service.create_topic topic_name,
                              labels:              labels,
                              kms_key_name:        kms_key,
                              persistence_regions: persistence_regions,
                              schema_name:         schema_name,
                              message_encoding:    message_encoding,
                              retention:           retention
  Topic.from_grpc grpc, service, async: async
end
Also aliased as: new_topic
find_schema(schema_name, view: nil, project: nil, skip_lookup: nil)
Alias for: schema
find_schemas(view: nil, token: nil, max: nil)
Alias for: schemas
find_snapshots(token: nil, max: nil)
Alias for: snapshots
find_subscription(subscription_name, project: nil, skip_lookup: nil)
Alias for: subscription
find_subscriptions(token: nil, max: nil)
Alias for: subscriptions
find_topic(topic_name, project: nil, skip_lookup: nil, async: nil)
Alias for: topic
find_topics(token: nil, max: nil)
Alias for: topics
get_schema(schema_name, view: nil, project: nil, skip_lookup: nil)
Alias for: schema
get_subscription(subscription_name, project: nil, skip_lookup: nil)
Alias for: subscription
get_topic(topic_name, project: nil, skip_lookup: nil, async: nil)
Alias for: topic
list_schemas(view: nil, token: nil, max: nil)
Alias for: schemas
list_snapshots(token: nil, max: nil)
Alias for: snapshots
list_subscriptions(token: nil, max: nil)
Alias for: subscriptions
list_topics(token: nil, max: nil)
Alias for: topics
new_schema(schema_id, type, definition, project: nil)
Alias for: create_schema
new_topic(topic_name, labels: nil, kms_key: nil, persistence_regions: nil, async: nil, schema_name: nil, message_encoding: nil, retention: nil)
Alias for: create_topic
project()
Alias for: project_id
project_id() click to toggle source

The Pub/Sub project connected to.

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new(
  project_id: "my-project",
  credentials: "/path/to/keyfile.json"
)

pubsub.project_id #=> "my-project"
# File lib/google/cloud/pubsub/project.rb, line 71
def project_id
  service.project
end
Also aliased as: project
schema(schema_name, view: nil, project: nil, skip_lookup: nil) click to toggle source

Retrieves schema by name.

@param [String] schema_name Name of a schema. The value can

be a simple schema ID, in which case the current project ID
will be supplied, or a fully-qualified schema name in the form
`projects/{project_id}/schemas/{schema_id}`.

@param view [Symbol, String, nil] Possible values:

* `BASIC` - Include the `name` and `type` of the schema, but not the `definition`.
* `FULL` - Include all Schema object fields.

The default value is `FULL`.

@param [String] project If the schema belongs to a project other

than the one currently connected to, the alternate project ID can be
specified here. Not used if a fully-qualified schema name is
provided for `schema_name`.

@param [Boolean] skip_lookup Optionally create a {Schema} object

without verifying the schema resource exists on the Pub/Sub
service. Calls made on this object will raise errors if the service
resource does not exist. Default is `false`.

@return [Google::Cloud::PubSub::Schema, nil] Returns `nil` if

the schema does not exist.

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

schema = pubsub.schema "my-schema"
schema.name #=> "projects/my-project/schemas/my-schema"
schema.type #=> :PROTOCOL_BUFFER
schema.definition # The schema definition

@example Skip the lookup against the service with `skip_lookup`:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

# No API call is made to retrieve the schema information.
# The default project is used in the name.
schema = pubsub.schema "my-schema", skip_lookup: true
schema.name #=> "projects/my-project/schemas/my-schema"
schema.type #=> nil
schema.definition #=> nil

@example Omit the schema definition with `view: :basic`:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

schema = pubsub.schema "my-schema", view: :basic
schema.name #=> "projects/my-project/schemas/my-schema"
schema.type #=> :PROTOCOL_BUFFER
schema.definition #=> nil
# File lib/google/cloud/pubsub/project.rb, line 501
def schema schema_name, view: nil, project: nil, skip_lookup: nil
  ensure_service!
  options = { project: project }
  return Schema.from_name schema_name, view, service, options if skip_lookup
  view ||= :FULL
  grpc = service.get_schema schema_name, view, options
  Schema.from_grpc grpc, service
rescue Google::Cloud::NotFoundError
  nil
end
Also aliased as: get_schema, find_schema
schemas(view: nil, token: nil, max: nil) click to toggle source

Retrieves a list of schemas for the given project.

@param view [String, Symbol, nil] The set of fields to return in the response. Possible values:

  * `BASIC` - Include the `name` and `type` of the schema, but not the `definition`.
  * `FULL` - Include all Schema object fields.

The default value is `FULL`.

@param [String] token A previously-returned page token representing

part of the larger set of results to view.

@param [Integer] max Maximum number of schemas to return.

@return [Array<Google::Cloud::PubSub::Schema>] (See

{Google::Cloud::PubSub::Schema::List})

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

schemas = pubsub.schemas
schemas.each do |schema|
  puts schema.name
end

@example Retrieve all schemas: (See {Schema::List#all})

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

schemas = pubsub.schemas
schemas.all do |schema|
  puts schema.name
end
# File lib/google/cloud/pubsub/project.rb, line 592
def schemas view: nil, token: nil, max: nil
  ensure_service!
  view ||= :FULL
  options = { token: token, max: max }
  grpc = service.list_schemas view, options
  Schema::List.from_grpc grpc, service, view, max
end
Also aliased as: find_schemas, list_schemas
snapshots(token: nil, max: nil) click to toggle source

Retrieves a list of snapshots for the given project.

@param [String] token A previously-returned page token representing

part of the larger set of results to view.

@param [Integer] max Maximum number of snapshots to return.

@return [Array<Google::Cloud::PubSub::Snapshot>] (See

{Google::Cloud::PubSub::Snapshot::List})

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

snapshots = pubsub.snapshots
snapshots.each do |snapshot|
  puts snapshot.name
end

@example Retrieve all snapshots: (See {Snapshot::List#all})

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

snapshots = pubsub.snapshots
snapshots.all do |snapshot|
  puts snapshot.name
end
# File lib/google/cloud/pubsub/project.rb, line 436
def snapshots token: nil, max: nil
  ensure_service!
  options = { token: token, max: max }
  grpc = service.list_snapshots options
  Snapshot::List.from_grpc grpc, service, max
end
Also aliased as: find_snapshots, list_snapshots
subscription(subscription_name, project: nil, skip_lookup: nil) click to toggle source

Retrieves subscription by name.

@param [String] subscription_name Name of a subscription. The value can

be a simple subscription ID, in which case the current project ID
will be supplied, or a fully-qualified subscription name in the form
`projects/{project_id}/subscriptions/{subscription_id}`.

@param [String] project If the subscription belongs to a project other

than the one currently connected to, the alternate project ID can be
specified here. Not used if a fully-qualified subscription name is
provided for `subscription_name`.

@param [Boolean] skip_lookup Optionally create a {Subscription} object

without verifying the subscription resource exists on the Pub/Sub
service. Calls made on this object will raise errors if the service
resource does not exist. Default is `false`.

@return [Google::Cloud::PubSub::Subscription, nil] Returns `nil` if

the subscription does not exist

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-sub"
sub.name #=> "projects/my-project/subscriptions/my-sub"

@example Skip the lookup against the service with `skip_lookup`:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

# No API call is made to retrieve the subscription information.
sub = pubsub.subscription "my-sub", skip_lookup: true
sub.name #=> "projects/my-project/subscriptions/my-sub"
# File lib/google/cloud/pubsub/project.rb, line 354
def subscription subscription_name, project: nil, skip_lookup: nil
  ensure_service!
  options = { project: project }
  return Subscription.from_name subscription_name, service, options if skip_lookup
  grpc = service.get_subscription subscription_name, options
  Subscription.from_grpc grpc, service
rescue Google::Cloud::NotFoundError
  nil
end
subscriptions(token: nil, max: nil) click to toggle source

Retrieves a list of subscriptions for the given project.

@param [String] token A previously-returned page token representing

part of the larger set of results to view.

@param [Integer] max Maximum number of subscriptions to return.

@return [Array<Google::Cloud::PubSub::Subscription>] (See

{Google::Cloud::PubSub::Subscription::List})

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subs = pubsub.subscriptions
subs.each do |sub|
  puts sub.name
end

@example Retrieve all subscriptions: (See {Subscription::List#all})

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subs = pubsub.subscriptions
subs.all do |sub|
  puts sub.name
end
# File lib/google/cloud/pubsub/project.rb, line 396
def subscriptions token: nil, max: nil
  ensure_service!
  options = { token: token, max: max }
  grpc = service.list_subscriptions options
  Subscription::List.from_grpc grpc, service, max
end
topic(topic_name, project: nil, skip_lookup: nil, async: nil) click to toggle source

Retrieves topic by name.

@param [String] topic_name Name of a topic. The value can be a simple

topic ID (relative name), in which case the current project ID will
be supplied, or a fully-qualified topic name in the form
`projects/{project_id}/topics/{topic_id}`.

@param [String] project If the topic belongs to a project other than

the one currently connected to, the alternate project ID can be
specified here. Optional. Not used if a fully-qualified topic name
is provided for `topic_name`.

@param [Boolean] skip_lookup Optionally create a {Topic} object

without verifying the topic resource exists on the Pub/Sub service.
Calls made on this object will raise errors if the topic resource
does not exist. Default is `false`. Optional.

@param [Hash] async A hash of values to configure the topic's

{AsyncPublisher} that is created when {Topic#publish_async}
is called. Optional.

Hash keys and values may include the following:

* `:max_bytes` (Integer) The maximum size of messages to be collected before the batch is published. Default
  is 1,000,000 (1MB).
* `:max_messages` (Integer) The maximum number of messages to be collected before the batch is published.
  Default is 100.
* `:interval` (Numeric) The number of seconds to collect messages before the batch is published. Default is
  0.01.
* `:threads` (Hash) The number of threads to create to handle concurrent calls by the publisher:
  * `:publish` (Integer) The number of threads used to publish messages. Default is 2.
  * `:callback` (Integer) The number of threads to handle the published messages' callbacks. Default is 4.
* `:flow_control` (Hash) The client flow control settings for message publishing:
  * `:message_limit` (Integer) The maximum number of messages allowed to wait to be published. Default is
    `10 * max_messages`.
  * `:byte_limit` (Integer) The maximum total size of messages allowed to wait to be published. Default is
    `10 * max_bytes`.
  * `:limit_exceeded_behavior` (Symbol) The action to take when publish flow control limits are exceeded.
    Possible values include: `:ignore` - Flow control is disabled. `:error` - Calls to {Topic#publish_async}
    will raise {FlowControlLimitError} when publish flow control limits are exceeded. `:block` - Calls to
    {Topic#publish_async} will block until capacity is available when publish flow control limits are
    exceeded. The default value is `:ignore`.

@return [Google::Cloud::PubSub::Topic, nil] Returns `nil` if topic

does not exist.

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "existing-topic"

@example By default `nil` will be returned if topic does not exist.

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "non-existing-topic" # nil

@example Create topic in a different project with the `project` flag.

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "another-topic", project: "another-project"

@example Skip the lookup against the service with `skip_lookup`:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "another-topic", skip_lookup: true

@example Configuring AsyncPublisher to increase concurrent callbacks:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "my-topic",
                     async: { threads: { callback: 16 } }

topic.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

topic.async_publisher.stop!
# File lib/google/cloud/pubsub/project.rb, line 161
def topic topic_name, project: nil, skip_lookup: nil, async: nil
  ensure_service!
  options = { project: project }
  return Topic.from_name topic_name, service, options if skip_lookup
  grpc = service.get_topic topic_name, options
  Topic.from_grpc grpc, service, async: async
rescue Google::Cloud::NotFoundError
  nil
end
Also aliased as: get_topic, find_topic
topics(token: nil, max: nil) click to toggle source

Retrieves a list of topics for the given project.

@param [String] token The `token` value returned by the last call to

`topics`; indicates that this is a continuation of a call, and that
the system should return the next page of data.

@param [Integer] max Maximum number of topics to return.

@return [Array<Google::Cloud::PubSub::Topic>] (See

{Google::Cloud::PubSub::Topic::List})

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topics = pubsub.topics
topics.each do |topic|
  puts topic.name
end

@example Retrieve all topics: (See {Topic::List#all})

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topics = pubsub.topics
topics.all do |topic|
  puts topic.name
end
# File lib/google/cloud/pubsub/project.rb, line 309
def topics token: nil, max: nil
  ensure_service!
  options = { token: token, max: max }
  grpc = service.list_topics options
  Topic::List.from_grpc grpc, service, max
end
Also aliased as: find_topics, list_topics
valid_schema?(type, definition, project: nil) click to toggle source

Validates a schema type and definition.

@param [String, Symbol] type The type of the schema. Required. Possible

values are case-insensitive and include:

  * `PROTOCOL_BUFFER` - A Protocol Buffer schema definition.
  * `AVRO` - An Avro schema definition.

@param [String] definition The definition of the schema. Required. This

should be a string representing the full definition of the schema that
is a valid schema definition of the type specified in `type`.

@param [String] project If the schema belongs to a project other

than the one currently connected to, the alternate project ID can be
specified here. Optional.

@return [Boolean] `true` if the schema is valid, `false` otherwise.

@example

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

definition = "..."
pubsub.validate_schema :avro, definition #=> true
# File lib/google/cloud/pubsub/project.rb, line 627
def valid_schema? type, definition, project: nil
  ensure_service!
  type = type.to_s.upcase
  service.validate_schema type, definition, project: project # return type is empty
  true
rescue Google::Cloud::InvalidArgumentError
  false
end
Also aliased as: validate_schema
validate_schema(type, definition, project: nil)
Alias for: valid_schema?

Protected Instance Methods

ensure_service!() click to toggle source

@private Raise an error unless an active connection to the service is available.

# File lib/google/cloud/pubsub/project.rb, line 642
def ensure_service!
  raise "Must have active connection to service" unless service
end
publish_batch_messages(topic_name, batch) click to toggle source

Call the publish API with arrays of data data and attrs.

# File lib/google/cloud/pubsub/project.rb, line 648
def publish_batch_messages topic_name, batch
  grpc = service.publish topic_name, batch.messages
  batch.to_gcloud_messages Array(grpc.message_ids)
end