class Hanami::Events::Adapter::CloudPubsub

Adapter for Google Cloud Pub/Sub

@api private

Attributes

listeners[R]
logger[R]
middleware[R]
topic_registry[R]

Public Class Methods

new(params) click to toggle source
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 16
def initialize(params)
  @pubsub = params[:pubsub]
  @logger = params[:logger] || Logger.new($stdout)
  @listen = params[:listen] || false
  @listeners = Concurrent::Array.new
  @serializer_type = params.fetch(:serializer, :json).to_sym
  @topic_registry = {}
  @mutex = Mutex.new
  @middleware = ::Hanami::Events::CloudPubsub.config.client_middleware
end

Public Instance Methods

broadcast(name, input_payload, **message_opts) click to toggle source

Brodcasts event to all subscribes

@param event [Symbol, String] the event name @param payload [Hash] the event data

# File lib/hanami/events/adapter/cloud_pubsub.rb, line 31
def broadcast(name, input_payload, **message_opts)
  event_name = namespaced(name)
  topic = topic_for event_name
  serialized_payload = serializer.serialize(input_payload)
  attrs = { id: SecureRandom.uuid, event_name: event_name }

  middleware.invoke(serialized_payload, **attrs, **message_opts) do |payload, **opts|
    topic.publish_async(payload, **opts) do |result|
      msg = result.message.grpc.to_h

      if result.succeeded?
        logger.info "Published #{name.inspect} published", **msg
      else
        logger.warn "Failed to broadcast #{name.inspect} event", error: result.error, **msg # rubocop:disable Layout/LineLength
      end
    end
  end
end
flush_messages() click to toggle source
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 68
def flush_messages
  pubs = topic_registry.values.map(&:async_publisher).compact
  pubs.each(&:stop).map(&:wait!)
end
subscribe(name, id:, auto_ack: true, **subscriber_opts, &block) click to toggle source

Subscribes block for selected event

@param event_name [Symbol, String] the event name @param id [String] A unique identifier for the subscriber @param subscriber_opts [String] Additional options for the subscriber @param block [Block] to execute when event is broadcasted

# File lib/hanami/events/adapter/cloud_pubsub.rb, line 56
def subscribe(name, id:, auto_ack: true, **subscriber_opts, &block)
  event_name = namespaced(name)
  namespaced_id = namespaced(id)

  logger.debug("Subscribed listener \"#{id}\" for event \"#{event_name}\"")

  sub = Hanami::Events::CloudPubsub::Subscriber.new(event_name, block, logger)
  topic = topic_for event_name

  register_listener(event_name, topic, namespaced_id, auto_ack, subscriber_opts, sub)
end

Private Instance Methods

call_subscriber(sub) click to toggle source

rubocop:enable Metrics/ParameterLists

# File lib/hanami/events/adapter/cloud_pubsub.rb, line 95
def call_subscriber(sub)
  proc do |message|
    data = message.data
    payload = serializer.deserialize(data)
    event_name = message.attributes['event_name']

    sub.call(event_name, payload, message)
  end
end
dead_letter_topic() click to toggle source

rubocop:enable Layout/LineLength

# File lib/hanami/events/adapter/cloud_pubsub.rb, line 123
def dead_letter_topic
  conf = Hanami::Events::CloudPubsub.config.auto_retry

  return unless conf.enabled

  topic_for namespaced(conf.dead_letter_topic_name)
end
namespaced(val, sep: '.') click to toggle source
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 131
def namespaced(val, sep: '.')
  [Hanami::Events::CloudPubsub.namespace, val].compact.join(sep)
end
register_listener(event_name, topic, subscriber_id, auto_ack, subscriber_opts, sub) click to toggle source

rubocop:disable Metrics/ParameterLists

# File lib/hanami/events/adapter/cloud_pubsub.rb, line 78
def register_listener(event_name, topic, subscriber_id, auto_ack, subscriber_opts, sub)
  listener = ::Hanami::Events::CloudPubsub::Listener.new(
    subscriber_id: subscriber_id,
    event_name: event_name,
    handler: call_subscriber(sub),
    logger: logger,
    topic: topic,
    subscriber_opts: subscriber_opts,
    dead_letter_topic: dead_letter_topic,
    auto_ack: auto_ack
  )

  @listeners << listener
  listener.register
end
serializer() click to toggle source
# File lib/hanami/events/adapter/cloud_pubsub.rb, line 105
def serializer
  @serializer ||= Hanami::Events::Serializer[@serializer_type].new
end
topic_for(name) click to toggle source

rubocop:disable Layout/LineLength

# File lib/hanami/events/adapter/cloud_pubsub.rb, line 110
def topic_for(name)
  return @topic_registry[name.to_s] if @topic_registry[name.to_s]

  topic = @pubsub.find_topic(name) ||
          (Hanami::Events::CloudPubsub.auto_create_topics && @pubsub.create_topic(name)) ||
          raise(::Hanami::Events::CloudPubsub::Errors::TopicNotFoundError, "no topic named: #{name}")

  topic.enable_message_ordering!

  @topic_registry[name.to_s] = topic
end