class Hanami::Events::Adapter::CloudPubsub
Adapter
for Google Cloud Pub/Sub
@api private
Attributes
listeners[R]
logger[R]
middleware[R]
topic_registry[R]
Public Class Methods
new(params)
click to toggle source
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 16 def initialize(params) @pubsub = params[:pubsub] @logger = params[:logger] || Logger.new($stdout) @listen = params[:listen] || false @listeners = Concurrent::Array.new @serializer_type = params.fetch(:serializer, :json).to_sym @topic_registry = {} @mutex = Mutex.new @middleware = ::Hanami::Events::CloudPubsub.config.client_middleware end
Public Instance Methods
broadcast(name, input_payload, **message_opts)
click to toggle source
Brodcasts event to all subscribes
@param event [Symbol, String] the event name @param payload [Hash] the event data
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 31 def broadcast(name, input_payload, **message_opts) event_name = namespaced(name) topic = topic_for event_name serialized_payload = serializer.serialize(input_payload) attrs = { id: SecureRandom.uuid, event_name: event_name } middleware.invoke(serialized_payload, **attrs, **message_opts) do |payload, **opts| topic.publish_async(payload, **opts) do |result| msg = result.message.grpc.to_h if result.succeeded? logger.info "Published #{name.inspect} published", **msg else logger.warn "Failed to broadcast #{name.inspect} event", error: result.error, **msg # rubocop:disable Layout/LineLength end end end end
flush_messages()
click to toggle source
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 68 def flush_messages pubs = topic_registry.values.map(&:async_publisher).compact pubs.each(&:stop).map(&:wait!) end
subscribe(name, id:, auto_ack: true, **subscriber_opts, &block)
click to toggle source
Subscribes block for selected event
@param event_name [Symbol, String] the event name @param id [String] A unique identifier for the subscriber @param subscriber_opts [String] Additional options for the subscriber @param block [Block] to execute when event is broadcasted
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 56 def subscribe(name, id:, auto_ack: true, **subscriber_opts, &block) event_name = namespaced(name) namespaced_id = namespaced(id) logger.debug("Subscribed listener \"#{id}\" for event \"#{event_name}\"") sub = Hanami::Events::CloudPubsub::Subscriber.new(event_name, block, logger) topic = topic_for event_name register_listener(event_name, topic, namespaced_id, auto_ack, subscriber_opts, sub) end
Private Instance Methods
call_subscriber(sub)
click to toggle source
rubocop:enable Metrics/ParameterLists
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 95 def call_subscriber(sub) proc do |message| data = message.data payload = serializer.deserialize(data) event_name = message.attributes['event_name'] sub.call(event_name, payload, message) end end
dead_letter_topic()
click to toggle source
rubocop:enable Layout/LineLength
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 123 def dead_letter_topic conf = Hanami::Events::CloudPubsub.config.auto_retry return unless conf.enabled topic_for namespaced(conf.dead_letter_topic_name) end
namespaced(val, sep: '.')
click to toggle source
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 131 def namespaced(val, sep: '.') [Hanami::Events::CloudPubsub.namespace, val].compact.join(sep) end
register_listener(event_name, topic, subscriber_id, auto_ack, subscriber_opts, sub)
click to toggle source
rubocop:disable Metrics/ParameterLists
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 78 def register_listener(event_name, topic, subscriber_id, auto_ack, subscriber_opts, sub) listener = ::Hanami::Events::CloudPubsub::Listener.new( subscriber_id: subscriber_id, event_name: event_name, handler: call_subscriber(sub), logger: logger, topic: topic, subscriber_opts: subscriber_opts, dead_letter_topic: dead_letter_topic, auto_ack: auto_ack ) @listeners << listener listener.register end
serializer()
click to toggle source
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 105 def serializer @serializer ||= Hanami::Events::Serializer[@serializer_type].new end
topic_for(name)
click to toggle source
rubocop:disable Layout/LineLength
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 110 def topic_for(name) return @topic_registry[name.to_s] if @topic_registry[name.to_s] topic = @pubsub.find_topic(name) || (Hanami::Events::CloudPubsub.auto_create_topics && @pubsub.create_topic(name)) || raise(::Hanami::Events::CloudPubsub::Errors::TopicNotFoundError, "no topic named: #{name}") topic.enable_message_ordering! @topic_registry[name.to_s] = topic end