module Cloudenvoy::Publisher

Use this module to define publishers. The module provides a simple DSL for transforming and publishing data objects to Pub/Sub.

Publishers must at least implement the `payload` method, which defines how arguments are mapped to a Hash or String message.

E.g.

class UserPublisher

include Cloudenvoy::Publisher

# Specify publishing options
cloudenvoy_options topic: 'my-topic'

# Format message objects
def payload(user)
  {
    id: user.id,
    name: user.name
  }
end

end

Public Class Methods

included(base) click to toggle source

Add class method to including class

# File lib/cloudenvoy/publisher.rb, line 30
def self.included(base)
  base.extend(ClassMethods)
  base.attr_accessor :msg_args, :message, :publishing_started_at, :publishing_ended_at

  # Register subscriber
  Cloudenvoy.publishers.add(base)
end
new(msg_args: nil) click to toggle source

Build a new publisher instance.

@param [Array<any>] msg_args The list of payload args.

# File lib/cloudenvoy/publisher.rb, line 99
def initialize(msg_args: nil)
  @msg_args = msg_args || []
end

Public Instance Methods

logger() click to toggle source

Return the Cloudenvoy logger instance.

@return [Logger, any] The cloudenvoy logger.

# File lib/cloudenvoy/publisher.rb, line 136
def logger
  @logger ||= PublisherLogger.new(self)
end
metadata(*_args) click to toggle source

Publisher can optionally define message attributes (metadata). Message attributes are sent to Pub/Sub and can be used for filtering.

@param [Any] *_args The publisher arguments.

@return [Hash] The message attributes.

# File lib/cloudenvoy/publisher.rb, line 127
def metadata(*_args)
  {}
end
publish() click to toggle source

Send the instantiated Publisher (message) to Pub/Sub.

@return [Cloudenvoy::Message] The created message.

# File lib/cloudenvoy/publisher.rb, line 158
def publish
  # Format and publish message
  resp = execute_middleware_chain

  # Log job completion and return result
  logger.info("Published message in #{publishing_duration}s") { { duration: publishing_duration } }
  resp
rescue StandardError => e
  logger.info("Publishing failed after #{publishing_duration}s") { { duration: publishing_duration } }
  raise(e)
end
publishing_duration() click to toggle source

Return the time taken (in seconds) to format and publish the message. This duration includes the middlewares and the actual publish method.

@return [Float] The time taken in seconds as a floating point number.

# File lib/cloudenvoy/publisher.rb, line 146
def publishing_duration
  return 0.0 unless publishing_ended_at && publishing_started_at

  (publishing_ended_at - publishing_started_at).ceil(3)
end
topic(*_args) click to toggle source

Return the topic to publish to. The topic name can be dynamically evaluated at runtime based on publishing arguments.

Defaults to the topic specified via cloudenvoy_options.

@param [Any] *_args The publisher arguments.

@return [String] The topic name.

# File lib/cloudenvoy/publisher.rb, line 114
def topic(*_args)
  self.class.default_topic
end

Private Instance Methods

execute_middleware_chain() click to toggle source

Execute the subscriber process method through the middleware chain.

@return [Any] The result of the perform method.

# File lib/cloudenvoy/publisher.rb, line 196
def execute_middleware_chain
  self.publishing_started_at = Time.now

  Cloudenvoy.config.publisher_middleware.invoke(self) do
    begin
      publish_message
    rescue StandardError => e
      logger.error([e, e.backtrace.join("\n")].join("\n"))
      try(:on_error, e)
      return raise(e)
    end
  end
ensure
  self.publishing_ended_at = Time.now
end
publish_message() click to toggle source

Internal logic used to build, publish and capture message on the publisher.

@return [Cloudenvoy::Message] The published message

# File lib/cloudenvoy/publisher.rb, line 181
def publish_message
  # Publish message to pub/sub and attach created
  # message to publisher
  self.message = PubSubClient.publish(
    topic(*msg_args),
    payload(*msg_args),
    metadata(*msg_args)
  )
end