module Cloudenvoy::Subscriber
Use this module to define subscribers. Subscribers must implement the message processsing logic in the `process` method.
E.g.
class UserSubscriber
include Cloudenvoy::Subscriber # Specify subscription options cloudenvoy_options topics: ['my-topic'] # Process message objects def process(message) ...do something... end
end
Public Class Methods
Execute a subscriber from a payload object received from Pub/Sub.
@param [Hash] input_payload The Pub/Sub webhook hash describing the message to process.
@return [Any] The subscriber processing return value.
# File lib/cloudenvoy/subscriber.rb, line 68 def self.execute_from_descriptor(input_payload) message = Message.from_descriptor(input_payload) subscriber = message.subscriber || raise(InvalidSubscriberError) subscriber.execute end
Return the subscriber class for the provided class name.
@param [String] sub_uri The subscription uri.
@return [Class] The subscriber class
# File lib/cloudenvoy/subscriber.rb, line 39 def self.from_sub_uri(sub_uri) klass_name = Subscriber.parse_sub_uri(sub_uri)[0] # Check that subscriber class is a valid subscriber sub_klass = Object.const_get(klass_name.camelize) sub_klass.include?(self) ? sub_klass : nil end
Add class method to including class
# File lib/cloudenvoy/subscriber.rb, line 23 def self.included(base) base.extend(ClassMethods) base.attr_accessor :message, :process_started_at, :process_ended_at # Register subscriber Cloudenvoy.subscribers.add(base) end
Build a new subscriber instance.
@param [Cloudenvoy::Message] message The message to process.
# File lib/cloudenvoy/subscriber.rb, line 142 def initialize(message:) @message = message end
Parse the subscription name and return the subscriber name and topic.
@param [String] sub_uri The subscription URI
@return [Array<String,String>] A tuple [subscriber_name, topic ]
# File lib/cloudenvoy/subscriber.rb, line 55 def self.parse_sub_uri(sub_uri) sub_uri.split('/').last.split('.').last(2) end
Public Instance Methods
Equality operator.
@param [Any] other The object to compare.
@return [Boolean] True if the object is equal.
# File lib/cloudenvoy/subscriber.rb, line 193 def ==(other) other.is_a?(self.class) && other.message == message end
Execute the subscriber's logic.
@return [Any] The logic return value
# File lib/cloudenvoy/subscriber.rb, line 172 def execute logger.info('Processing message...') # Process message resp = execute_middleware_chain # Log processing completion and return result logger.info("Processing done after #{process_duration}s") { { duration: process_duration } } resp rescue StandardError => e logger.info("Processing failed after #{process_duration}s") { { duration: process_duration } } raise(e) end
Return the Cloudenvoy
logger instance.
@return [Logger, any] The cloudenvoy logger.
# File lib/cloudenvoy/subscriber.rb, line 151 def logger @logger ||= SubscriberLogger.new(self) end
Return the time taken (in seconds) to process the message. This duration includes the middlewares and the actual process method.
@return [Float] The time taken in seconds as a floating point number.
# File lib/cloudenvoy/subscriber.rb, line 161 def process_duration return 0.0 unless process_ended_at && process_started_at (process_ended_at - process_started_at).ceil(3) end
Private Instance Methods
Execute the subscriber process method through the middleware chain.
@return [Any] The result of the perform method.
# File lib/cloudenvoy/subscriber.rb, line 207 def execute_middleware_chain self.process_started_at = Time.now Cloudenvoy.config.subscriber_middleware.invoke(self) do begin process(message) rescue StandardError => e logger.error([e, e.backtrace.join("\n")].join("\n")) try(:on_error, e) raise(e) end end ensure self.process_ended_at = Time.now end