class Google::Cloud::PubSub::Service
@private Represents the Pub/Sub service API, including IAM mixins.
Attributes
The same client_id
is used across all streaming pull connections that are created by this client. This is intentional, as it indicates to the server that any guarantees, such as message ordering, made for a stream that is disconnected will be made for the stream that is created to replace it. The attr_accessor allows the value to be replaced for unit testing.
Public Class Methods
Creates a new Service
instance.
# File lib/google/cloud/pubsub/service.rb, line 42 def initialize project, credentials, host: nil, timeout: nil @project = project @credentials = credentials @host = host @timeout = timeout @client_id = SecureRandom.uuid.freeze end
Public Instance Methods
Acknowledges receipt of a message.
# File lib/google/cloud/pubsub/service.rb, line 257 def acknowledge subscription, *ack_ids subscriber.acknowledge subscription: subscription_path(subscription), ack_ids: ack_ids end
Creates a schema in the current (or given) project.
# File lib/google/cloud/pubsub/service.rb, line 344 def create_schema schema_id, type, definition, options = {} schema = Google::Cloud::PubSub::V1::Schema.new( type: type, definition: definition ) schemas.create_schema parent: project_path(options), schema: schema, schema_id: schema_id end
Creates a snapshot on a given subscription.
# File lib/google/cloud/pubsub/service.rb, line 295 def create_snapshot subscription, snapshot_name, labels: nil subscriber.create_snapshot name: snapshot_path(snapshot_name), subscription: subscription_path(subscription), labels: labels end
Creates a subscription on a given topic for a given subscriber.
# File lib/google/cloud/pubsub/service.rb, line 206 def create_subscription topic, subscription_name, options = {} subscriber.create_subscription \ name: subscription_path(subscription_name, options), topic: topic_path(topic), push_config: options[:push_config], ack_deadline_seconds: options[:deadline], retain_acked_messages: options[:retain_acked], message_retention_duration: Convert.number_to_duration(options[:retention]), labels: options[:labels], enable_message_ordering: options[:message_ordering], filter: options[:filter], dead_letter_policy: dead_letter_policy(options), retry_policy: options[:retry_policy] end
Creates the given topic with the given name.
# File lib/google/cloud/pubsub/service.rb, line 124 def create_topic topic_name, labels: nil, kms_key_name: nil, persistence_regions: nil, schema_name: nil, message_encoding: nil, retention: nil, options: {} if persistence_regions message_storage_policy = Google::Cloud::PubSub::V1::MessageStoragePolicy.new( allowed_persistence_regions: Array(persistence_regions) ) end if schema_name || message_encoding unless schema_name && message_encoding raise ArgumentError, "Schema settings must include both schema_name and message_encoding." end schema_settings = Google::Cloud::PubSub::V1::SchemaSettings.new( schema: schema_path(schema_name), encoding: message_encoding.to_s.upcase ) end publisher.create_topic \ name: topic_path(topic_name, options), labels: labels, kms_key_name: kms_key_name, message_storage_policy: message_storage_policy, schema_settings: schema_settings, message_retention_duration: Convert.number_to_duration(retention) end
Delete a schema.
# File lib/google/cloud/pubsub/service.rb, line 368 def delete_schema schema_name schemas.delete_schema name: schema_path(schema_name) end
Deletes an existing snapshot. All pending messages in the snapshot are immediately dropped.
# File lib/google/cloud/pubsub/service.rb, line 309 def delete_snapshot snapshot subscriber.delete_snapshot snapshot: snapshot_path(snapshot) end
Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
# File lib/google/cloud/pubsub/service.rb, line 228 def delete_subscription subscription subscriber.delete_subscription subscription: subscription_path(subscription) end
Deletes the topic with the given name. All subscriptions to this topic are also deleted. Raises GRPC status code 5 if the topic does not exist. After a topic is deleted, a new topic may be created with the same name.
# File lib/google/cloud/pubsub/service.rb, line 167 def delete_topic topic_name publisher.delete_topic topic: topic_path(topic_name) end
Detaches a subscription from its topic. All messages retained in the subscription are dropped. Subsequent `Pull` and `StreamingPull` requests will raise `FAILED_PRECONDITION`. If the subscription is a push subscription, pushes to the endpoint will stop.
# File lib/google/cloud/pubsub/service.rb, line 236 def detach_subscription subscription publisher.detach_subscription subscription: subscription_path(subscription) end
Gets the details of a schema. @param view [String, Symbol, nil] The set of fields to return in the response. Possible values:
* `BASIC` - Include the name and type of the schema, but not the definition. * `FULL` - Include all Schema object fields.
# File lib/google/cloud/pubsub/service.rb, line 360 def get_schema schema_name, view, options = {} schema_view = Google::Cloud::PubSub::V1::SchemaView.const_get view.to_s.upcase schemas.get_schema name: schema_path(schema_name, options), view: schema_view end
Gets the details of a subscription.
# File lib/google/cloud/pubsub/service.rb, line 182 def get_subscription subscription_name, options = {} subscriber.get_subscription subscription: subscription_path(subscription_name, options) end
# File lib/google/cloud/pubsub/service.rb, line 421 def get_subscription_policy subscription_name, options = {} iam.get_iam_policy resource: subscription_path(subscription_name, options) end
Gets the configuration of a topic. Since the topic only has the name attribute, this method is only useful to check the existence of a topic. If other attributes are added in the future, they will be returned here.
# File lib/google/cloud/pubsub/service.rb, line 108 def get_topic topic_name, options = {} publisher.get_topic topic: topic_path(topic_name, options) end
Helper methods
# File lib/google/cloud/pubsub/service.rb, line 409 def get_topic_policy topic_name, options = {} iam.get_iam_policy resource: topic_path(topic_name, options) end
# File lib/google/cloud/pubsub/service.rb, line 76 def iam return mocked_iam if mocked_iam @iam ||= V1::IAMPolicy::Client.new do |config| config.credentials = credentials if credentials config.timeout = timeout if timeout config.endpoint = host if host config.lib_name = "gccl" config.lib_version = Google::Cloud::PubSub::VERSION config.metadata = { "google-cloud-resource-prefix" => "projects/#{@project}" } end end
# File lib/google/cloud/pubsub/service.rb, line 458 def inspect "#<#{self.class.name} (#{@project})>" end
Lists schemas in the current (or given) project. @param view [String, Symbol, nil] Possible values:
* `BASIC` - Include the name and type of the schema, but not the definition. * `FULL` - Include all Schema object fields.
# File lib/google/cloud/pubsub/service.rb, line 332 def list_schemas view, options = {} schema_view = Google::Cloud::PubSub::V1::SchemaView.const_get view.to_s.upcase paged_enum = schemas.list_schemas parent: project_path(options), view: schema_view, page_size: options[:max], page_token: options[:token] paged_enum.response end
Lists snapshots by project.
# File lib/google/cloud/pubsub/service.rb, line 285 def list_snapshots options = {} paged_enum = subscriber.list_snapshots project: project_path(options), page_size: options[:max], page_token: options[:token] paged_enum.response end
Lists matching subscriptions by project.
# File lib/google/cloud/pubsub/service.rb, line 196 def list_subscriptions options = {} paged_enum = subscriber.list_subscriptions project: project_path(options), page_size: options[:max], page_token: options[:token] paged_enum.response end
Lists matching topics.
# File lib/google/cloud/pubsub/service.rb, line 114 def list_topics options = {} paged_enum = publisher.list_topics project: project_path(options), page_size: options[:max], page_token: options[:token] paged_enum.response end
Lists matching subscriptions by project and topic.
# File lib/google/cloud/pubsub/service.rb, line 188 def list_topics_subscriptions topic, options = {} publisher.list_topic_subscriptions topic: topic_path(topic, options), page_size: options[:max], page_token: options[:token] end
Modifies the ack deadline for a specific message.
# File lib/google/cloud/pubsub/service.rb, line 277 def modify_ack_deadline subscription, ids, deadline subscriber.modify_ack_deadline subscription: subscription_path(subscription), ack_ids: Array(ids), ack_deadline_seconds: deadline end
Modifies the PushConfig for a specified subscription.
# File lib/google/cloud/pubsub/service.rb, line 263 def modify_push_config subscription, endpoint, attributes # Convert attributes to strings to match the protobuf definition attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }] push_config = Google::Cloud::PubSub::V1::PushConfig.new( push_endpoint: endpoint, attributes: attributes ) subscriber.modify_push_config subscription: subscription_path(subscription), push_config: push_config end
# File lib/google/cloud/pubsub/service.rb, line 433 def project_path options = {} project_name = options[:project] || project "projects/#{project_name}" end
Adds one or more messages to the topic. Raises GRPC status code 5 if the topic does not exist. The messages parameter is an array of arrays. The first element is the data, second is attributes hash.
# File lib/google/cloud/pubsub/service.rb, line 176 def publish topic, messages publisher.publish topic: topic_path(topic), messages: messages end
# File lib/google/cloud/pubsub/service.rb, line 63 def publisher return mocked_publisher if mocked_publisher @publisher ||= V1::Publisher::Client.new do |config| config.credentials = credentials if credentials config.timeout = timeout if timeout config.endpoint = host if host config.lib_name = "gccl" config.lib_version = Google::Cloud::PubSub::VERSION config.metadata = { "google-cloud-resource-prefix": "projects/#{@project}" } end end
Pulls a single message from the server.
# File lib/google/cloud/pubsub/service.rb, line 242 def pull subscription, options = {} max_messages = options.fetch(:max, 100).to_i return_immediately = !(!options.fetch(:immediate, true)) subscriber.pull subscription: subscription_path(subscription, options), max_messages: max_messages, return_immediately: return_immediately end
# File lib/google/cloud/pubsub/service.rb, line 453 def schema_path schema_name, options = {} return schema_name if schema_name.nil? || schema_name.to_s.include?("/") "#{project_path options}/schemas/#{schema_name}" end
# File lib/google/cloud/pubsub/service.rb, line 89 def schemas return mocked_schemas if mocked_schemas @schemas ||= V1::SchemaService::Client.new do |config| config.credentials = credentials if credentials config.timeout = timeout if timeout config.endpoint = host if host config.lib_name = "gccl" config.lib_version = Google::Cloud::PubSub::VERSION config.metadata = { "google-cloud-resource-prefix" => "projects/#{@project}" } end end
Adjusts the given subscription to a time or snapshot.
# File lib/google/cloud/pubsub/service.rb, line 315 def seek subscription, time_or_snapshot if a_time? time_or_snapshot time = Convert.time_to_timestamp time_or_snapshot subscriber.seek subscription: subscription, time: time else time_or_snapshot = time_or_snapshot.name if time_or_snapshot.is_a? Snapshot subscriber.seek subscription: subscription_path(subscription), snapshot: snapshot_path(time_or_snapshot) end end
# File lib/google/cloud/pubsub/service.rb, line 425 def set_subscription_policy subscription_name, new_policy, options = {} iam.set_iam_policy resource: subscription_path(subscription_name, options), policy: new_policy end
# File lib/google/cloud/pubsub/service.rb, line 413 def set_topic_policy topic_name, new_policy, options = {} iam.set_iam_policy resource: topic_path(topic_name, options), policy: new_policy end
# File lib/google/cloud/pubsub/service.rb, line 448 def snapshot_path snapshot_name, options = {} return snapshot_name if snapshot_name.nil? || snapshot_name.to_s.include?("/") "#{project_path options}/snapshots/#{snapshot_name}" end
# File lib/google/cloud/pubsub/service.rb, line 251 def streaming_pull request_enum subscriber.streaming_pull request_enum end
# File lib/google/cloud/pubsub/service.rb, line 50 def subscriber return mocked_subscriber if mocked_subscriber @subscriber ||= V1::Subscriber::Client.new do |config| config.credentials = credentials if credentials config.timeout = timeout if timeout config.endpoint = host if host config.lib_name = "gccl" config.lib_version = Google::Cloud::PubSub::VERSION config.metadata = { "google-cloud-resource-prefix": "projects/#{@project}" } end end
# File lib/google/cloud/pubsub/service.rb, line 443 def subscription_path subscription_name, options = {} return subscription_name if subscription_name.to_s.include? "/" "#{project_path options}/subscriptions/#{subscription_name}" end
# File lib/google/cloud/pubsub/service.rb, line 429 def test_subscription_permissions subscription_name, permissions, options = {} iam.test_iam_permissions resource: subscription_path(subscription_name, options), permissions: permissions end
# File lib/google/cloud/pubsub/service.rb, line 417 def test_topic_permissions topic_name, permissions, options = {} iam.test_iam_permissions resource: topic_path(topic_name, options), permissions: permissions end
# File lib/google/cloud/pubsub/service.rb, line 438 def topic_path topic_name, options = {} return topic_name if topic_name.to_s.include? "/" "#{project_path options}/topics/#{topic_name}" end
# File lib/google/cloud/pubsub/service.rb, line 301 def update_snapshot snapshot_obj, *fields mask = Google::Protobuf::FieldMask.new paths: fields.map(&:to_s) subscriber.update_snapshot snapshot: snapshot_obj, update_mask: mask end
# File lib/google/cloud/pubsub/service.rb, line 221 def update_subscription subscription_obj, *fields mask = Google::Protobuf::FieldMask.new paths: fields.map(&:to_s) subscriber.update_subscription subscription: subscription_obj, update_mask: mask end
# File lib/google/cloud/pubsub/service.rb, line 157 def update_topic topic_obj, *fields mask = Google::Protobuf::FieldMask.new paths: fields.map(&:to_s) publisher.update_topic topic: topic_obj, update_mask: mask end
Validates a message against a schema.
@param message_data [String] Message
to validate against the provided `schema_spec`. @param message_encoding [Google::Cloud::PubSub::V1::Encoding] The encoding expected for messages. @param schema_name [String] Name of the schema against which to validate. @param project [String] Name of the project if not the default project. @param type [String] Ad-hoc schema type against which to validate. @param definition [String] Ad-hoc schema definition against which to validate.
# File lib/google/cloud/pubsub/service.rb, line 393 def validate_message message_data, message_encoding, schema_name: nil, project: nil, type: nil, definition: nil if type && definition schema = Google::Cloud::PubSub::V1::Schema.new( type: type, definition: definition ) end schemas.validate_message parent: project_path(project: project), name: schema_path(schema_name), schema: schema, message: message_data, encoding: message_encoding end
Validate the definition string intended for a schema.
# File lib/google/cloud/pubsub/service.rb, line 374 def validate_schema type, definition, options = {} schema = Google::Cloud::PubSub::V1::Schema.new( type: type, definition: definition ) schemas.validate_schema parent: project_path(options), schema: schema end
Protected Instance Methods
# File lib/google/cloud/pubsub/service.rb, line 464 def a_time? obj return false unless obj.respond_to? :to_time # Rails' String#to_time returns nil if the string doesn't parse. return false if obj.to_time.nil? true end
# File lib/google/cloud/pubsub/service.rb, line 471 def dead_letter_policy options return nil unless options[:dead_letter_topic_name] policy = Google::Cloud::PubSub::V1::DeadLetterPolicy.new dead_letter_topic: options[:dead_letter_topic_name] if options[:dead_letter_max_delivery_attempts] policy.max_delivery_attempts = options[:dead_letter_max_delivery_attempts] end policy end