class PubsubClient::PublisherFactory
Build and memoize the Publisher
, accounting for GRPC's requirements around forking.
Constants
- Memo
Attributes
publishers[RW]
Public Class Methods
new()
click to toggle source
# File lib/pubsub_client/publisher_factory.rb, line 8 def initialize @mutex = Mutex.new @publishers = {} end
Public Instance Methods
build(topic_name)
click to toggle source
@param topic_name [String] @return [Publisher]
# File lib/pubsub_client/publisher_factory.rb, line 15 def build(topic_name) # GRPC fails when attempting to use a connection created in a process that gets # forked with the message # # "grpc cannot be used before and after forking" # # Also creating a new publsher incurs significant overhead as it connects to # PubSub. # # To prevent incurring overhead, memoize the publisher per process. return publishers[topic_name].publisher if publishers[topic_name]&.pid == current_pid # We are in a multi-threaded world and need to be careful not to build the publisher # in multiple threads. Lock the mutex so that only one thread can enter this block # at a time. @mutex.synchronize do # It's possible two threads made it to this point, but since we have a lock we # know that one will have built the publisher before the second is able to enter. # If we detect that case, then bail out so as to not rebuild the publisher. unless publishers[topic_name]&.pid == current_pid publishers[topic_name] = Memo.new(build_publisher(topic_name), Process.pid) end end publishers[topic_name].publisher end
Private Instance Methods
build_publisher(topic_name)
click to toggle source
# File lib/pubsub_client/publisher_factory.rb, line 54 def build_publisher(topic_name) pubsub = Google::Cloud::PubSub.new topic = pubsub.topic(topic_name) raise InvalidTopicError, "The topic #{topic_name} does not exist" unless topic publisher = Publisher.new(topic) at_exit { publisher.flush } publisher end
current_pid()
click to toggle source
Used for testing to simulate when a process is forked. In those cases, this helps us test that the `.build` method creates different publishers.
# File lib/pubsub_client/publisher_factory.rb, line 50 def current_pid Process.pid end