class Subserver::Listener
The Listener
is a standalone thread which:
-
Starts Google
Pubsub
subscription threads which:
a. Instantiate the Subscription class b. Run the middleware chain c. call subscriber #perform
A Listener
can exit due to shutdown (listner_stopped) or due to an error during message processing (listener_died)
If an error occurs during message processing, the Listener
calls the Manager
to create a new one to replace itself and exits.
Attributes
subscriber[R]
thread[R]
Public Class Methods
new(mgr, subscriber)
click to toggle source
# File lib/subserver/listener.rb, line 30 def initialize(mgr, subscriber) @mgr = mgr @valid = true @done = false @thread = nil @reloader = Subserver.options[:reloader] @subscriber = subscriber @subscription = retrieve_subscription @logging = (mgr.options[:message_logger] || Subserver::MessageLogger).new end
Public Instance Methods
kill()
click to toggle source
# File lib/subserver/listener.rb, line 54 def kill @done = true return if !@thread # Hard stop the listener and shutdown thread after timeout passes. @pubsub_listener.stop @thread.raise ::Subserver::Shutdown end
name()
click to toggle source
# File lib/subserver/listener.rb, line 41 def name @subscriber.name end
start()
click to toggle source
# File lib/subserver/listener.rb, line 62 def start @thread ||= safe_thread("listener", &method(:run)) end
stop()
click to toggle source
# File lib/subserver/listener.rb, line 45 def stop @done = true return if !@thread # Stop the listener and wait for current messages to finish processing. @pubsub_listener.stop.wait! @mgr.listener_stopped(self) end
valid?()
click to toggle source
# File lib/subserver/listener.rb, line 66 def valid? @valid end
Private Instance Methods
connect_subscriber()
click to toggle source
# File lib/subserver/listener.rb, line 82 def connect_subscriber options = @subscriber.get_subserver_options logger.debug("Connecting to subscription with options: #{options}") @pubsub_listener = @subscription.listen streams: options[:streams], threads: options[:threads] do |received_message| logger.debug("Message Received: #{received_message}") process_message(received_message) end end
execute(subscriber, received_message)
click to toggle source
# File lib/subserver/listener.rb, line 124 def execute(subscriber, received_message) subscriber.new.perform(received_message) end
process_message(received_message)
click to toggle source
# File lib/subserver/listener.rb, line 104 def process_message(received_message) begin logger.debug("Executing Middleware") @reloader.call do Subserver.middleware.invoke(@subscriber, received_message) do execute(@subscriber, received_message) end end rescue Subserver::Shutdown # Reject message if shutdown received_message.reject! rescue StandardError => error handle_exception error, { context: 'Exception raised during message processing.', message: received_message } raise end end
retrieve_subscription()
click to toggle source
# File lib/subserver/listener.rb, line 72 def retrieve_subscription subscription_name = @subscriber.get_subserver_options[:subscription] subscription = Pubsub.client.subscription subscription_name rescue nil if subscription.nil? logger.error "ArgumentError: Invalid Subscription name: #{subscription_name} in subscriber #{@subscriber.name}. Please ensure your Pubsub subscription exists." @valid = false end subscription end
run()
click to toggle source
# File lib/subserver/listener.rb, line 91 def run begin # This begins the listener process in a forked thread fire_event(:listener_startup, reverse: false, reraise: true) connect_subscriber @pubsub_listener.start rescue Subserver::Shutdown @mgr.listener_stopped(self) rescue Exception => ex @mgr.listener_died(self, @subscriber, ex) end end