module Cloudenvoy::Publisher
Use this module to define publishers. The module provides a simple DSL for transforming and publishing data objects to Pub/Sub.
Publishers must at least implement the `payload` method, which defines how arguments are mapped to a Hash or String message.
E.g.
class UserPublisher
include Cloudenvoy::Publisher # Specify publishing options cloudenvoy_options topic: 'my-topic' # Format message objects def payload(user) { id: user.id, name: user.name } end
end
Public Class Methods
Add class method to including class
# File lib/cloudenvoy/publisher.rb, line 30 def self.included(base) base.extend(ClassMethods) base.attr_accessor :msg_args, :message, :publishing_started_at, :publishing_ended_at # Register subscriber Cloudenvoy.publishers.add(base) end
Build a new publisher instance.
@param [Array<any>] msg_args The list of payload args.
# File lib/cloudenvoy/publisher.rb, line 99 def initialize(msg_args: nil) @msg_args = msg_args || [] end
Public Instance Methods
Return the Cloudenvoy
logger instance.
@return [Logger, any] The cloudenvoy logger.
# File lib/cloudenvoy/publisher.rb, line 136 def logger @logger ||= PublisherLogger.new(self) end
Send the instantiated Publisher
(message) to Pub/Sub.
@return [Cloudenvoy::Message] The created message.
# File lib/cloudenvoy/publisher.rb, line 158 def publish # Format and publish message resp = execute_middleware_chain # Log job completion and return result logger.info("Published message in #{publishing_duration}s") { { duration: publishing_duration } } resp rescue StandardError => e logger.info("Publishing failed after #{publishing_duration}s") { { duration: publishing_duration } } raise(e) end
Return the time taken (in seconds) to format and publish the message. This duration includes the middlewares and the actual publish method.
@return [Float] The time taken in seconds as a floating point number.
# File lib/cloudenvoy/publisher.rb, line 146 def publishing_duration return 0.0 unless publishing_ended_at && publishing_started_at (publishing_ended_at - publishing_started_at).ceil(3) end
Return the topic to publish to. The topic name can be dynamically evaluated at runtime based on publishing arguments.
Defaults to the topic specified via cloudenvoy_options.
@param [Any] *_args The publisher arguments.
@return [String] The topic name.
# File lib/cloudenvoy/publisher.rb, line 114 def topic(*_args) self.class.default_topic end
Private Instance Methods
Execute the subscriber process method through the middleware chain.
@return [Any] The result of the perform method.
# File lib/cloudenvoy/publisher.rb, line 196 def execute_middleware_chain self.publishing_started_at = Time.now Cloudenvoy.config.publisher_middleware.invoke(self) do begin publish_message rescue StandardError => e logger.error([e, e.backtrace.join("\n")].join("\n")) try(:on_error, e) return raise(e) end end ensure self.publishing_ended_at = Time.now end
Internal logic used to build, publish and capture message on the publisher.
@return [Cloudenvoy::Message] The published message
# File lib/cloudenvoy/publisher.rb, line 181 def publish_message # Publish message to pub/sub and attach created # message to publisher self.message = PubSubClient.publish( topic(*msg_args), payload(*msg_args), metadata(*msg_args) ) end