class Hanami::Events::CloudPubsub::Listener
rubocop:disable Metrics/ClassLength: @api private
Attributes
dead_letter_topic[R]
event_name[R]
handler[R]
input_subscriber_opts[R]
logger[R]
middleware[R]
subscriber[R]
subscriber_id[R]
topic[R]
Public Class Methods
new(topic:, logger:, handler:, event_name:, subscriber_id:, subscriber_opts: {}, middleware: CloudPubsub.config.middleware, auto_ack: true, dead_letter_topic: nil)
click to toggle source
rubocop:disable Metrics/ParameterLists
# File lib/hanami/events/cloud_pubsub/listener.rb, line 22 def initialize(topic:, logger:, handler:, event_name:, subscriber_id:, subscriber_opts: {}, middleware: CloudPubsub.config.middleware, auto_ack: true, dead_letter_topic: nil) @topic = topic @logger = logger @handler = handler @event_name = event_name @subscriber_id = subscriber_id @input_subscriber_opts = subscriber_opts @middleware = middleware @dead_letter_topic = dead_letter_topic @auto_ack = auto_ack end
Public Instance Methods
format()
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 79 def format subscriber.to_s end
register()
click to toggle source
rubocop:enable Metrics/ParameterLists
# File lib/hanami/events/cloud_pubsub/listener.rb, line 43 def register subscription = subscription_for(subscriber_id) apply_subscription_options(subscription) listener = subscription.listen(**subscriber_options) { |m| handle_message(m) } logger.debug("Registered listener for #{subscriber_id} with: #{subscriber_options}") @subscriber = listener self end
shutdown()
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 63 def shutdown stop wait self end
start()
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 54 def start ensure_subscriber! @subscriber.start end
started?()
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 59 def started? @subscriber&.started? end
stop()
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 69 def stop subscriber.stop self end
wait(timeout = 60)
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 74 def wait(timeout = 60) subscriber.wait!(timeout) self end
Private Instance Methods
apply_dead_letter_options(sub)
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 152 def apply_dead_letter_options(sub) attempts = CloudPubsub.config.auto_retry.max_attempts sub.dead_letter_topic = dead_letter_topic if sub.dead_letter_topic&.name != dead_letter_topic&.name sub.dead_letter_max_delivery_attempts = attempts if sub.dead_letter_topic&.name != dead_letter_topic&.name rescue StandardError => e run_error_handlers(e, nil) end
apply_deadline_options(sub)
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 146 def apply_deadline_options(sub) sub.deadline = subscriber_options[:deadline] if sub.deadline != subscriber_options[:deadline] rescue StandardError => e run_error_handlers(e, nil) end
apply_retry_options(sub)
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 161 def apply_retry_options(sub) retry_policy = build_retry_policy sub.retry_policy = retry_policy if sub.retry_policy&.to_grpc != retry_policy&.to_grpc rescue StandardError => e run_error_handlers(e, nil) end
apply_subscription_options(sub)
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 138 def apply_subscription_options(sub) apply_deadline_options(sub) apply_retry_options(sub) apply_dead_letter_options(sub) rescue StandardError => e run_error_handlers(e, nil) end
build_retry_policy()
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 168 def build_retry_policy return unless Hanami::Events::CloudPubsub.config.auto_retry.enabled Google::Cloud::PubSub::RetryPolicy.new( minimum_backoff: CloudPubsub.config.auto_retry.minimum_backoff, maximum_backoff: CloudPubsub.config.auto_retry.maximum_backoff ) end
ensure_subscriber!()
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 111 def ensure_subscriber! raise Errors::NoSubscriberError, 'No subsriber has been registered' unless @subscriber end
ensure_topic_names_match!(sub_name, found_subscription)
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 121 def ensure_topic_names_match!(sub_name, found_subscription) parsed_name = found_subscription.topic.name.split('/').last return true if parsed_name == @event_name raise Errors::SubscriptionTopicNameMismatch, "a subscription already exists for #{sub_name} " \ "but its name #{found_subscription.topic.name} does not match #{@event_name}" end
handle_message(msg)
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 85 def handle_message(msg) run_handler(msg) end
run_error_handlers(err, message)
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 115 def run_error_handlers(err, message) CloudPubsub.error_handlers.each do |handler| SafeErrorHandler.call(handler, err, message) end end
run_handler(message)
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 89 def run_handler(message) middleware.invoke(message) { handler.call(message) } message.ack! if @auto_ack rescue StandardError => e run_error_handlers(e, message.message_id.to_s) message.nack! if CloudPubsub.config.auto_retry.enabled raise end
subscriber_options()
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 131 def subscriber_options @subscriber_options ||= { **CloudPubsub.config.subscriber.to_h, **input_subscriber_opts } end
subscription_for(name)
click to toggle source
# File lib/hanami/events/cloud_pubsub/listener.rb, line 98 def subscription_for(name) found_subscription = topic.find_subscription(name) if found_subscription ensure_topic_names_match!(name, found_subscription) found_subscription elsif CloudPubsub.auto_create_subscriptions topic.create_subscription(name) else raise Errors::SubscriptionNotFoundError, "no subscription named: #{name}" end end