module Cloudenvoy::Subscriber

Use this module to define subscribers. Subscribers must implement the message processsing logic in the `process` method.

E.g.

class UserSubscriber

include Cloudenvoy::Subscriber

# Specify subscription options
cloudenvoy_options topics: ['my-topic']

# Process message objects
def process(message)
  ...do something...
end

end

Public Class Methods

execute_from_descriptor(input_payload) click to toggle source

Execute a subscriber from a payload object received from Pub/Sub.

@param [Hash] input_payload The Pub/Sub webhook hash describing the message to process.

@return [Any] The subscriber processing return value.

# File lib/cloudenvoy/subscriber.rb, line 68
def self.execute_from_descriptor(input_payload)
  message = Message.from_descriptor(input_payload)
  subscriber = message.subscriber || raise(InvalidSubscriberError)
  subscriber.execute
end
from_sub_uri(sub_uri) click to toggle source

Return the subscriber class for the provided class name.

@param [String] sub_uri The subscription uri.

@return [Class] The subscriber class

# File lib/cloudenvoy/subscriber.rb, line 39
def self.from_sub_uri(sub_uri)
  klass_name = Subscriber.parse_sub_uri(sub_uri)[0]

  # Check that subscriber class is a valid subscriber
  sub_klass = Object.const_get(klass_name.camelize)

  sub_klass.include?(self) ? sub_klass : nil
end
included(base) click to toggle source

Add class method to including class

# File lib/cloudenvoy/subscriber.rb, line 23
def self.included(base)
  base.extend(ClassMethods)
  base.attr_accessor :message, :process_started_at, :process_ended_at

  # Register subscriber
  Cloudenvoy.subscribers.add(base)
end
new(message:) click to toggle source

Build a new subscriber instance.

@param [Cloudenvoy::Message] message The message to process.

# File lib/cloudenvoy/subscriber.rb, line 142
def initialize(message:)
  @message = message
end
parse_sub_uri(sub_uri) click to toggle source

Parse the subscription name and return the subscriber name and topic.

@param [String] sub_uri The subscription URI

@return [Array<String,String>] A tuple [subscriber_name, topic ]

# File lib/cloudenvoy/subscriber.rb, line 55
def self.parse_sub_uri(sub_uri)
  sub_uri.split('/').last.split('.').last(2)
end

Public Instance Methods

==(other) click to toggle source

Equality operator.

@param [Any] other The object to compare.

@return [Boolean] True if the object is equal.

# File lib/cloudenvoy/subscriber.rb, line 193
def ==(other)
  other.is_a?(self.class) && other.message == message
end
execute() click to toggle source

Execute the subscriber's logic.

@return [Any] The logic return value

# File lib/cloudenvoy/subscriber.rb, line 172
def execute
  logger.info('Processing message...')

  # Process message
  resp = execute_middleware_chain

  # Log processing completion and return result
  logger.info("Processing done after #{process_duration}s") { { duration: process_duration } }
  resp
rescue StandardError => e
  logger.info("Processing failed after #{process_duration}s") { { duration: process_duration } }
  raise(e)
end
logger() click to toggle source

Return the Cloudenvoy logger instance.

@return [Logger, any] The cloudenvoy logger.

# File lib/cloudenvoy/subscriber.rb, line 151
def logger
  @logger ||= SubscriberLogger.new(self)
end
process_duration() click to toggle source

Return the time taken (in seconds) to process the message. This duration includes the middlewares and the actual process method.

@return [Float] The time taken in seconds as a floating point number.

# File lib/cloudenvoy/subscriber.rb, line 161
def process_duration
  return 0.0 unless process_ended_at && process_started_at

  (process_ended_at - process_started_at).ceil(3)
end

Private Instance Methods

execute_middleware_chain() click to toggle source

Execute the subscriber process method through the middleware chain.

@return [Any] The result of the perform method.

# File lib/cloudenvoy/subscriber.rb, line 207
def execute_middleware_chain
  self.process_started_at = Time.now

  Cloudenvoy.config.subscriber_middleware.invoke(self) do
    begin
      process(message)
    rescue StandardError => e
      logger.error([e, e.backtrace.join("\n")].join("\n"))
      try(:on_error, e)
      raise(e)
    end
  end
ensure
  self.process_ended_at = Time.now
end