class Hanami::Events::CloudPubsub::Runner
Responsible for starting and managing the work processes
Attributes
Public Class Methods
# File lib/hanami/events/cloud_pubsub/runner.rb, line 12 def initialize(adapter:, logger:, sleep_time: 30) @logger = logger @adapter = adapter @sleep_time = sleep_time end
Public Instance Methods
Print out some useful debugging information
Called on TTIN to inspect the state of the runner in a terminal friendly output. This provides a simple debugging if the runner gets stuck for some reason.
See: github.com/mperham/sidekiq/blob/e447dae961ebc894f12848d9f33446a07ffc67dc/bin/sidekiqload#L74
# File lib/hanami/events/cloud_pubsub/runner.rb, line 88 def debug_info <<~MSG ╔══════ BACKTRACES #{Thread.list.flat_map { |thr| ThreadInspector.new(thr).to_s }.join("\n")} ╠══════ LISTENERS #{adapter.listeners.map { |lis| "║ #{lis.format}" }.join("\n")} ║ ╠══════ GENERAL ║ ready?: #{ready?} ║ healthy?: #{healthy?} ║ threads: #{Thread.list.count} ║ threads running: #{Thread.list.select { |thread| thread.status == 'run' }.count} ╚══════ MSG end
(optional) Kill all subscribers
If a gracefully_shutdown
times out or fails, this method will be called. It is a last ditch effort to salvage resources and is used as a “damage control” mechanism.
Should we provide a mechanism to report what caused a forced shutdown?
# File lib/hanami/events/cloud_pubsub/runner.rb, line 57 def force_shutdown!; end
Will be called on SIGINT, TERM
Responsible for gracefully shutting down the runner. This may involve waiting for messages to finish processing, etc. If this method succesfully runs, there should be no potential for undefined behavior.
# File lib/hanami/events/cloud_pubsub/runner.rb, line 40 def gracefully_shutdown stop logger.info "Gracefully shutting down CloudPubsub runner: #{self}" adapter.listeners.each { |l| l.wait(@sleep_time) } adapter.flush_messages handle_on_shutdown self end
# File lib/hanami/events/cloud_pubsub/runner.rb, line 104 def handle_on_shutdown return if CloudPubsub.on_shutdown_handlers.empty? logger.info('Calling custom on_shutdown handler') CloudPubsub.on_shutdown_handlers.each do |handler| handler.call(adapter) rescue StandardError => e logger.warn("Shutdown handler failed (#{e.message})") end end
Is the runner healthy enough to keep going?
Indicated whether or not the runner is healthy, useful for determing whether or not the process should be restarted
# File lib/hanami/events/cloud_pubsub/runner.rb, line 77 def healthy? ready? end
Is the runner ready to start processing events?
Starting the runner may be asyncronous (spawning threads, etc) Instead of making `start` blocking, expose a way to probe for readiness After this check occurs, `healthy?` will be honored.
This pattern is similar to Kubernete's healthiness and readniess probes and is much more useful than only having a `healthy?` check
See: kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/
# File lib/hanami/events/cloud_pubsub/runner.rb, line 69 def ready? adapter.listeners.all?(&:started?) end
Called to start the runner (subscribes to topics/etc)
# File lib/hanami/events/cloud_pubsub/runner.rb, line 19 def start(_options = {}) CloudPubsub.finalize_settings! logger.info 'Starting CloudPubsub listeners' adapter.listeners.map(&:start) self end
Will be called on TSTP
Stop processing new events (unsubscribe from topics, etc)
# File lib/hanami/events/cloud_pubsub/runner.rb, line 29 def stop logger.info 'Stopping CloudPubsub listeners' adapter.listeners.each(&:stop) self end