class Hanami::Events::CloudPubsub::Listener

rubocop:disable Metrics/ClassLength: @api private

Attributes

dead_letter_topic[R]
event_name[R]
handler[R]
input_subscriber_opts[R]
logger[R]
middleware[R]
subscriber[R]
subscriber_id[R]
topic[R]

Public Class Methods

new(topic:, logger:, handler:, event_name:, subscriber_id:, subscriber_opts: {}, middleware: CloudPubsub.config.middleware, auto_ack: true, dead_letter_topic: nil) click to toggle source

rubocop:disable Metrics/ParameterLists

# File lib/hanami/events/cloud_pubsub/listener.rb, line 22
def initialize(topic:,
               logger:,
               handler:,
               event_name:,
               subscriber_id:,
               subscriber_opts: {},
               middleware: CloudPubsub.config.middleware,
               auto_ack: true,
               dead_letter_topic: nil)
  @topic = topic
  @logger = logger
  @handler = handler
  @event_name = event_name
  @subscriber_id = subscriber_id
  @input_subscriber_opts = subscriber_opts
  @middleware = middleware
  @dead_letter_topic = dead_letter_topic
  @auto_ack = auto_ack
end

Public Instance Methods

format() click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 79
def format
  subscriber.to_s
end
register() click to toggle source

rubocop:enable Metrics/ParameterLists

# File lib/hanami/events/cloud_pubsub/listener.rb, line 43
def register
  subscription = subscription_for(subscriber_id)
  apply_subscription_options(subscription)
  listener = subscription.listen(**subscriber_options) { |m| handle_message(m) }
  logger.debug("Registered listener for #{subscriber_id} with: #{subscriber_options}")

  @subscriber = listener

  self
end
shutdown() click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 63
def shutdown
  stop
  wait
  self
end
start() click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 54
def start
  ensure_subscriber!
  @subscriber.start
end
started?() click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 59
def started?
  @subscriber&.started?
end
stop() click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 69
def stop
  subscriber.stop
  self
end
wait(timeout = 60) click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 74
def wait(timeout = 60)
  subscriber.wait!(timeout)
  self
end

Private Instance Methods

apply_dead_letter_options(sub) click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 152
def apply_dead_letter_options(sub)
  attempts = CloudPubsub.config.auto_retry.max_attempts

  sub.dead_letter_topic = dead_letter_topic if sub.dead_letter_topic&.name != dead_letter_topic&.name
  sub.dead_letter_max_delivery_attempts = attempts if sub.dead_letter_topic&.name != dead_letter_topic&.name
rescue StandardError => e
  run_error_handlers(e, nil)
end
apply_deadline_options(sub) click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 146
def apply_deadline_options(sub)
  sub.deadline = subscriber_options[:deadline] if sub.deadline != subscriber_options[:deadline]
rescue StandardError => e
  run_error_handlers(e, nil)
end
apply_retry_options(sub) click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 161
def apply_retry_options(sub)
  retry_policy = build_retry_policy
  sub.retry_policy = retry_policy if sub.retry_policy&.to_grpc != retry_policy&.to_grpc
rescue StandardError => e
  run_error_handlers(e, nil)
end
apply_subscription_options(sub) click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 138
def apply_subscription_options(sub)
  apply_deadline_options(sub)
  apply_retry_options(sub)
  apply_dead_letter_options(sub)
rescue StandardError => e
  run_error_handlers(e, nil)
end
build_retry_policy() click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 168
def build_retry_policy
  return unless Hanami::Events::CloudPubsub.config.auto_retry.enabled

  Google::Cloud::PubSub::RetryPolicy.new(
    minimum_backoff: CloudPubsub.config.auto_retry.minimum_backoff,
    maximum_backoff: CloudPubsub.config.auto_retry.maximum_backoff
  )
end
ensure_subscriber!() click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 111
def ensure_subscriber!
  raise Errors::NoSubscriberError, 'No subsriber has been registered' unless @subscriber
end
ensure_topic_names_match!(sub_name, found_subscription) click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 121
def ensure_topic_names_match!(sub_name, found_subscription)
  parsed_name = found_subscription.topic.name.split('/').last

  return true if parsed_name == @event_name

  raise Errors::SubscriptionTopicNameMismatch,
        "a subscription already exists for #{sub_name} " \
        "but its name #{found_subscription.topic.name} does not match #{@event_name}"
end
handle_message(msg) click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 85
def handle_message(msg)
  run_handler(msg)
end
run_error_handlers(err, message) click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 115
def run_error_handlers(err, message)
  CloudPubsub.error_handlers.each do |handler|
    SafeErrorHandler.call(handler, err, message)
  end
end
run_handler(message) click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 89
def run_handler(message)
  middleware.invoke(message) { handler.call(message) }
  message.ack! if @auto_ack
rescue StandardError => e
  run_error_handlers(e, message.message_id.to_s)
  message.nack! if CloudPubsub.config.auto_retry.enabled
  raise
end
subscriber_options() click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 131
def subscriber_options
  @subscriber_options ||= {
    **CloudPubsub.config.subscriber.to_h,
    **input_subscriber_opts
  }
end
subscription_for(name) click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 98
def subscription_for(name)
  found_subscription = topic.find_subscription(name)

  if found_subscription
    ensure_topic_names_match!(name, found_subscription)
    found_subscription
  elsif CloudPubsub.auto_create_subscriptions
    topic.create_subscription(name)
  else
    raise Errors::SubscriptionNotFoundError, "no subscription named: #{name}"
  end
end