class Google::Cloud::PubSub::Project
# Project
Represents the project that pubsub messages are pushed to and pulled from. {Topic} is a named resource to which messages are sent by publishers. {Subscription} is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application. {Message} is a combination of data and attributes that a publisher sends to a topic and is eventually delivered to subscribers.
See {Google::Cloud#pubsub}
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic" topic.publish "task completed"
Attributes
@private The Service
object.
Public Class Methods
@private Creates a new Pub/Sub Project
instance.
# File lib/google/cloud/pubsub/project.rb, line 55 def initialize service @service = service end
Public Instance Methods
Creates a new schema.
@param [String] schema_id The ID to use for the schema, which will
become the final component of the schema's resource name. Required. The schema ID (relative name) must start with a letter, and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`), underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent signs (`%`). It must be between 3 and 255 characters in length, and it must not start with `goog`.
@param [String, Symbol] type The type of the schema. Required. Possible
values are case-insensitive and include: * `PROTOCOL_BUFFER` - A Protocol Buffer schema definition. * `AVRO` - An Avro schema definition.
@param [String] definition The definition of the schema. Required. This
should be a string representing the full definition of the schema that is a valid schema definition of the type specified in `type`.
@param [String] project If the schema belongs to a project other
than the one currently connected to, the alternate project ID can be specified here. Optional.
@return [Google::Cloud::PubSub::Schema]
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new definition = "..." schema = pubsub.create_schema "my-schema", :avro, definition schema.name #=> "projects/my-project/schemas/my-schema"
# File lib/google/cloud/pubsub/project.rb, line 548 def create_schema schema_id, type, definition, project: nil ensure_service! type = type.to_s.upcase grpc = service.create_schema schema_id, type, definition, project: project Schema.from_grpc grpc, service end
Creates a new topic.
@param [String] topic_name Name of a topic. Required.
The value can be a simple topic ID (relative name), in which case the current project ID will be supplied, or a fully-qualified topic name in the form `projects/{project_id}/topics/{topic_id}`. The topic ID (relative name) must start with a letter, and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`), underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent signs (`%`). It must be between 3 and 255 characters in length, and it must not start with `goog`.
@param [Hash] labels A hash of user-provided labels associated with
the topic. You can use these to organize and group your topics. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See [Creating and Managing Labels](https://cloud.google.com/pubsub/docs/labels).
@param [String] kms_key The Cloud
KMS encryption key that will be used
to protect access to messages published on this topic. Optional. For example: `projects/a/locations/b/keyRings/c/cryptoKeys/d`
@param [Array<String>] persistence_regions The list of GCP region IDs
where messages that are published to the topic may be persisted in storage. Optional.
@param [Hash] async A hash of values to configure the topic's
{AsyncPublisher} that is created when {Topic#publish_async} is called. Optional. Hash keys and values may include the following: * `:max_bytes` (Integer) The maximum size of messages to be collected before the batch is published. Default is 1,000,000 (1MB). * `:max_messages` (Integer) The maximum number of messages to be collected before the batch is published. Default is 100. * `:interval` (Numeric) The number of seconds to collect messages before the batch is published. Default is 0.01. * `:threads` (Hash) The number of threads to create to handle concurrent calls by the publisher: * `:publish` (Integer) The number of threads used to publish messages. Default is 2. * `:callback` (Integer) The number of threads to handle the published messages' callbacks. Default is 4. * `:flow_control` (Hash) The client flow control settings for message publishing: * `:message_limit` (Integer) The maximum number of messages allowed to wait to be published. Default is `10 * max_messages`. * `:byte_limit` (Integer) The maximum total size of messages allowed to wait to be published. Default is `10 * max_bytes`. * `:limit_exceeded_behavior` (Symbol) The action to take when publish flow control limits are exceeded. Possible values include: `:ignore` - Flow control is disabled. `:error` - Calls to {Topic#publish_async} will raise {FlowControlLimitError} when publish flow control limits are exceeded. `:block` - Calls to {Topic#publish_async} will block until capacity is available when publish flow control limits are exceeded. The default value is `:ignore`.
@param [String] schema_name The name of the schema that messages
published should be validated against. Optional. The value can be a simple schema ID (relative name), in which case the current project ID will be supplied, or a fully-qualified schema name in the form `projects/{project_id}/schemas/{schema_id}`. If provided, `message_encoding` must also be provided.
@param [String, Symbol] message_encoding The encoding of messages validated
against the schema identified by `schema_name`. Optional. Values include: * `JSON` - JSON encoding. * `BINARY` - Binary encoding, as defined by the schema type. For some schema types, binary encoding may not be available.
@param [Numeric] retention Indicates the minimum number of seconds to retain a message
after it is published to the topic. If this field is set, messages published to the topic within the `retention` number of seconds are always available to subscribers. For instance, it allows any attached subscription to [seek to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) that is up to `retention` number of seconds in the past. If this field is not set, message retention is controlled by settings on individual subscriptions. Cannot be less than 600 (10 minutes) or more than 604,800 (7 days).
@return [Google::Cloud::PubSub::Topic]
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.create_topic "my-topic"
# File lib/google/cloud/pubsub/project.rb, line 258 def create_topic topic_name, labels: nil, kms_key: nil, persistence_regions: nil, async: nil, schema_name: nil, message_encoding: nil, retention: nil ensure_service! grpc = service.create_topic topic_name, labels: labels, kms_key_name: kms_key, persistence_regions: persistence_regions, schema_name: schema_name, message_encoding: message_encoding, retention: retention Topic.from_grpc grpc, service, async: async end
The Pub/Sub project connected to.
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new( project_id: "my-project", credentials: "/path/to/keyfile.json" ) pubsub.project_id #=> "my-project"
# File lib/google/cloud/pubsub/project.rb, line 71 def project_id service.project end
Retrieves schema by name.
@param [String] schema_name Name of a schema. The value can
be a simple schema ID, in which case the current project ID will be supplied, or a fully-qualified schema name in the form `projects/{project_id}/schemas/{schema_id}`.
@param view [Symbol, String, nil] Possible values:
* `BASIC` - Include the `name` and `type` of the schema, but not the `definition`. * `FULL` - Include all Schema object fields. The default value is `FULL`.
@param [String] project If the schema belongs to a project other
than the one currently connected to, the alternate project ID can be specified here. Not used if a fully-qualified schema name is provided for `schema_name`.
@param [Boolean] skip_lookup Optionally create a {Schema} object
without verifying the schema resource exists on the Pub/Sub service. Calls made on this object will raise errors if the service resource does not exist. Default is `false`.
@return [Google::Cloud::PubSub::Schema, nil] Returns `nil` if
the schema does not exist.
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new schema = pubsub.schema "my-schema" schema.name #=> "projects/my-project/schemas/my-schema" schema.type #=> :PROTOCOL_BUFFER schema.definition # The schema definition
@example Skip the lookup against the service with `skip_lookup`:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new # No API call is made to retrieve the schema information. # The default project is used in the name. schema = pubsub.schema "my-schema", skip_lookup: true schema.name #=> "projects/my-project/schemas/my-schema" schema.type #=> nil schema.definition #=> nil
@example Omit the schema definition with `view: :basic`:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new schema = pubsub.schema "my-schema", view: :basic schema.name #=> "projects/my-project/schemas/my-schema" schema.type #=> :PROTOCOL_BUFFER schema.definition #=> nil
# File lib/google/cloud/pubsub/project.rb, line 501 def schema schema_name, view: nil, project: nil, skip_lookup: nil ensure_service! options = { project: project } return Schema.from_name schema_name, view, service, options if skip_lookup view ||= :FULL grpc = service.get_schema schema_name, view, options Schema.from_grpc grpc, service rescue Google::Cloud::NotFoundError nil end
Retrieves a list of schemas for the given project.
@param view [String, Symbol, nil] The set of fields to return in the response. Possible values:
* `BASIC` - Include the `name` and `type` of the schema, but not the `definition`. * `FULL` - Include all Schema object fields. The default value is `FULL`.
@param [String] token A previously-returned page token representing
part of the larger set of results to view.
@param [Integer] max Maximum number of schemas to return.
@return [Array<Google::Cloud::PubSub::Schema>] (See
{Google::Cloud::PubSub::Schema::List})
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new schemas = pubsub.schemas schemas.each do |schema| puts schema.name end
@example Retrieve all schemas: (See {Schema::List#all})
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new schemas = pubsub.schemas schemas.all do |schema| puts schema.name end
# File lib/google/cloud/pubsub/project.rb, line 592 def schemas view: nil, token: nil, max: nil ensure_service! view ||= :FULL options = { token: token, max: max } grpc = service.list_schemas view, options Schema::List.from_grpc grpc, service, view, max end
Retrieves a list of snapshots for the given project.
@param [String] token A previously-returned page token representing
part of the larger set of results to view.
@param [Integer] max Maximum number of snapshots to return.
@return [Array<Google::Cloud::PubSub::Snapshot>] (See
{Google::Cloud::PubSub::Snapshot::List})
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new snapshots = pubsub.snapshots snapshots.each do |snapshot| puts snapshot.name end
@example Retrieve all snapshots: (See {Snapshot::List#all})
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new snapshots = pubsub.snapshots snapshots.all do |snapshot| puts snapshot.name end
# File lib/google/cloud/pubsub/project.rb, line 436 def snapshots token: nil, max: nil ensure_service! options = { token: token, max: max } grpc = service.list_snapshots options Snapshot::List.from_grpc grpc, service, max end
Retrieves subscription by name.
@param [String] subscription_name Name of a subscription. The value can
be a simple subscription ID, in which case the current project ID will be supplied, or a fully-qualified subscription name in the form `projects/{project_id}/subscriptions/{subscription_id}`.
@param [String] project If the subscription belongs to a project other
than the one currently connected to, the alternate project ID can be specified here. Not used if a fully-qualified subscription name is provided for `subscription_name`.
@param [Boolean] skip_lookup Optionally create a {Subscription} object
without verifying the subscription resource exists on the Pub/Sub service. Calls made on this object will raise errors if the service resource does not exist. Default is `false`.
@return [Google::Cloud::PubSub::Subscription, nil] Returns `nil` if
the subscription does not exist
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-sub" sub.name #=> "projects/my-project/subscriptions/my-sub"
@example Skip the lookup against the service with `skip_lookup`:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new # No API call is made to retrieve the subscription information. sub = pubsub.subscription "my-sub", skip_lookup: true sub.name #=> "projects/my-project/subscriptions/my-sub"
# File lib/google/cloud/pubsub/project.rb, line 354 def subscription subscription_name, project: nil, skip_lookup: nil ensure_service! options = { project: project } return Subscription.from_name subscription_name, service, options if skip_lookup grpc = service.get_subscription subscription_name, options Subscription.from_grpc grpc, service rescue Google::Cloud::NotFoundError nil end
Retrieves a list of subscriptions for the given project.
@param [String] token A previously-returned page token representing
part of the larger set of results to view.
@param [Integer] max Maximum number of subscriptions to return.
@return [Array<Google::Cloud::PubSub::Subscription>] (See
{Google::Cloud::PubSub::Subscription::List})
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new subs = pubsub.subscriptions subs.each do |sub| puts sub.name end
@example Retrieve all subscriptions: (See {Subscription::List#all})
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new subs = pubsub.subscriptions subs.all do |sub| puts sub.name end
# File lib/google/cloud/pubsub/project.rb, line 396 def subscriptions token: nil, max: nil ensure_service! options = { token: token, max: max } grpc = service.list_subscriptions options Subscription::List.from_grpc grpc, service, max end
Retrieves topic by name.
@param [String] topic_name Name of a topic. The value can be a simple
topic ID (relative name), in which case the current project ID will be supplied, or a fully-qualified topic name in the form `projects/{project_id}/topics/{topic_id}`.
@param [String] project If the topic belongs to a project other than
the one currently connected to, the alternate project ID can be specified here. Optional. Not used if a fully-qualified topic name is provided for `topic_name`.
@param [Boolean] skip_lookup Optionally create a {Topic} object
without verifying the topic resource exists on the Pub/Sub service. Calls made on this object will raise errors if the topic resource does not exist. Default is `false`. Optional.
@param [Hash] async A hash of values to configure the topic's
{AsyncPublisher} that is created when {Topic#publish_async} is called. Optional. Hash keys and values may include the following: * `:max_bytes` (Integer) The maximum size of messages to be collected before the batch is published. Default is 1,000,000 (1MB). * `:max_messages` (Integer) The maximum number of messages to be collected before the batch is published. Default is 100. * `:interval` (Numeric) The number of seconds to collect messages before the batch is published. Default is 0.01. * `:threads` (Hash) The number of threads to create to handle concurrent calls by the publisher: * `:publish` (Integer) The number of threads used to publish messages. Default is 2. * `:callback` (Integer) The number of threads to handle the published messages' callbacks. Default is 4. * `:flow_control` (Hash) The client flow control settings for message publishing: * `:message_limit` (Integer) The maximum number of messages allowed to wait to be published. Default is `10 * max_messages`. * `:byte_limit` (Integer) The maximum total size of messages allowed to wait to be published. Default is `10 * max_bytes`. * `:limit_exceeded_behavior` (Symbol) The action to take when publish flow control limits are exceeded. Possible values include: `:ignore` - Flow control is disabled. `:error` - Calls to {Topic#publish_async} will raise {FlowControlLimitError} when publish flow control limits are exceeded. `:block` - Calls to {Topic#publish_async} will block until capacity is available when publish flow control limits are exceeded. The default value is `:ignore`.
@return [Google::Cloud::PubSub::Topic, nil] Returns `nil` if topic
does not exist.
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "existing-topic"
@example By default `nil` will be returned if topic does not exist.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "non-existing-topic" # nil
@example Create topic in a different project with the `project` flag.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "another-topic", project: "another-project"
@example Skip the lookup against the service with `skip_lookup`:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "another-topic", skip_lookup: true
@example Configuring AsyncPublisher
to increase concurrent callbacks:
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topic = pubsub.topic "my-topic", async: { threads: { callback: 16 } } topic.publish_async "task completed" do |result| if result.succeeded? log_publish_success result.data else log_publish_failure result.data, result.error end end topic.async_publisher.stop!
# File lib/google/cloud/pubsub/project.rb, line 161 def topic topic_name, project: nil, skip_lookup: nil, async: nil ensure_service! options = { project: project } return Topic.from_name topic_name, service, options if skip_lookup grpc = service.get_topic topic_name, options Topic.from_grpc grpc, service, async: async rescue Google::Cloud::NotFoundError nil end
Retrieves a list of topics for the given project.
@param [String] token The `token` value returned by the last call to
`topics`; indicates that this is a continuation of a call, and that the system should return the next page of data.
@param [Integer] max Maximum number of topics to return.
@return [Array<Google::Cloud::PubSub::Topic>] (See
{Google::Cloud::PubSub::Topic::List})
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topics = pubsub.topics topics.each do |topic| puts topic.name end
@example Retrieve all topics: (See {Topic::List#all})
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new topics = pubsub.topics topics.all do |topic| puts topic.name end
# File lib/google/cloud/pubsub/project.rb, line 309 def topics token: nil, max: nil ensure_service! options = { token: token, max: max } grpc = service.list_topics options Topic::List.from_grpc grpc, service, max end
Validates a schema type and definition.
@param [String, Symbol] type The type of the schema. Required. Possible
values are case-insensitive and include: * `PROTOCOL_BUFFER` - A Protocol Buffer schema definition. * `AVRO` - An Avro schema definition.
@param [String] definition The definition of the schema. Required. This
should be a string representing the full definition of the schema that is a valid schema definition of the type specified in `type`.
@param [String] project If the schema belongs to a project other
than the one currently connected to, the alternate project ID can be specified here. Optional.
@return [Boolean] `true` if the schema is valid, `false` otherwise.
@example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new definition = "..." pubsub.validate_schema :avro, definition #=> true
# File lib/google/cloud/pubsub/project.rb, line 627 def valid_schema? type, definition, project: nil ensure_service! type = type.to_s.upcase service.validate_schema type, definition, project: project # return type is empty true rescue Google::Cloud::InvalidArgumentError false end
Protected Instance Methods
@private Raise an error unless an active connection to the service is available.
# File lib/google/cloud/pubsub/project.rb, line 642 def ensure_service! raise "Must have active connection to service" unless service end
Call the publish API with arrays of data data and attrs.
# File lib/google/cloud/pubsub/project.rb, line 648 def publish_batch_messages topic_name, batch grpc = service.publish topic_name, batch.messages batch.to_gcloud_messages Array(grpc.message_ids) end