module Cloudenvoy::Backend::GooglePubSub

Interface to GCP Pub/Sub and Pub/Sub local emulator

Public Instance Methods

backend() click to toggle source

Return the backend to use for sending messages.

@return [Google::Cloud::Pub] The low level client instance.

# File lib/cloudenvoy/backend/google_pub_sub.rb, line 34
def backend
  @backend ||= Google::Cloud::PubSub.new({
    project_id: config.gcp_project_id,
    emulator_host: development? ? Cloudenvoy::Config::EMULATOR_HOST : nil
  }.compact)
end
config() click to toggle source

Return the cloudenvoy configuration. See Cloudenvoy#configure.

@return [Cloudenvoy::Config] The library configuration.

# File lib/cloudenvoy/backend/google_pub_sub.rb, line 16
def config
  Cloudenvoy.config
end
development?() click to toggle source

Return true if the current config mode is development.

@return [Boolean] True if Cloudenvoy is run in development mode.

# File lib/cloudenvoy/backend/google_pub_sub.rb, line 25
def development?
  config.mode == :development
end
publish(topic, payload, metadata = {}) click to toggle source

Publish a message to a topic.

@param [String] topic The name of the topic @param [Hash, String] payload The message content. @param [Hash] metadata The message attributes.

@return [Cloudenvoy::Message] The created message.

# File lib/cloudenvoy/backend/google_pub_sub.rb, line 59
def publish(topic, payload, metadata = {})
  # Retrieve the topic
  ps_topic = backend.topic(topic, skip_lookup: true)

  # Publish the message
  ps_msg = ps_topic.publish(payload.to_json, metadata.to_h)

  # Return formatted message
  Message.new(
    id: ps_msg.message_id,
    payload: payload,
    metadata: metadata,
    topic: topic
  )
end
upsert_subscription(topic, name, opts = {}) click to toggle source

Create or update a subscription for a specific topic.

@param [String] topic The name of the topic @param [String] name The name of the subscription @param [Hash] opts The subscription configuration options @option opts [Integer] :deadline The maximum number of seconds after a subscriber receives a message

before the subscriber should acknowledge the message.

@option opts [Boolean] :retain_acked Indicates whether to retain acknowledged messages. If true,

then messages are not expunged from the subscription's backlog, even if they are acknowledged,
until they fall out of the retention window. Default is false.

@option opts [<Type>] :retention How long to retain unacknowledged messages in the subscription's

backlog, from the moment a message is published. If retain_acked is true, then this also configures
the retention of acknowledged messages, and thus configures how far back in time a Subscription#seek
can be done. Cannot be more than 604,800 seconds (7 days) or less than 600 seconds (10 minutes).
Default is 604,800 seconds (7 days).

@option opts [String] :filter An expression written in the Cloud Pub/Sub filter language.

If non-empty, then only Message instances whose attributes field matches the filter are delivered
on this subscription. If empty, then no messages are filtered out. Optional.

@return [Cloudenvoy::Subscription] The upserted subscription.

# File lib/cloudenvoy/backend/google_pub_sub.rb, line 97
def upsert_subscription(topic, name, opts = {})
  sub_config = opts.to_h.merge(endpoint: webhook_url)

  # Auto-create topic in development. In non-development environments
  # the create subscription action raises an error if the topic does
  # not exist
  upsert_topic(topic) if development?

  # Create subscription
  ps_sub =
    begin
      # Retrieve the topic
      ps_topic = backend.topic(topic, skip_lookup: true)

      # Attempt to create the subscription
      ps_topic.subscribe(name, sub_config)
    rescue Google::Cloud::AlreadyExistsError
      # Update endpoint on subscription
      # Topic is not updated as it is name-dependent
      backend.subscription(name).tap do |e|
        sub_config.each do |k, v|
          e.send("#{k}=", v)
        end
      end
    end

  # Return formatted subscription
  Subscription.new(name: ps_sub.name, original: ps_sub)
end
upsert_topic(topic) click to toggle source

Create or update a topic.

@param [String] topic The topic name.

@return [Cloudenvoy::Topic] The upserted topic.

# File lib/cloudenvoy/backend/google_pub_sub.rb, line 134
def upsert_topic(topic)
  ps_topic = begin
    backend.create_topic(topic)
             rescue Google::Cloud::AlreadyExistsError
               backend.topic(topic)
  end

  # Return formatted subscription
  Topic.new(name: ps_topic.name, original: ps_topic)
end
webhook_url() click to toggle source

Return an authenticated endpoint for processing Pub/Sub webhooks.

@return [String] An authenticated endpoint.

# File lib/cloudenvoy/backend/google_pub_sub.rb, line 46
def webhook_url
  "#{config.processor_url}?token=#{Authenticator.verification_token}"
end