class Hanami::Events::CloudPubsub::Runner

Responsible for starting and managing the work processes

Attributes

adapter[R]
logger[R]

Public Class Methods

new(adapter:, logger:, sleep_time: 30) click to toggle source
# File lib/hanami/events/cloud_pubsub/runner.rb, line 12
def initialize(adapter:, logger:, sleep_time: 30)
  @logger = logger
  @adapter = adapter
  @sleep_time = sleep_time
end

Public Instance Methods

debug_info() click to toggle source

Print out some useful debugging information

Called on TTIN to inspect the state of the runner in a terminal friendly output. This provides a simple debugging if the runner gets stuck for some reason.

See: github.com/mperham/sidekiq/blob/e447dae961ebc894f12848d9f33446a07ffc67dc/bin/sidekiqload#L74

# File lib/hanami/events/cloud_pubsub/runner.rb, line 88
        def debug_info
          <<~MSG
            ╔══════ BACKTRACES
            #{Thread.list.flat_map { |thr| ThreadInspector.new(thr).to_s }.join("\n")}
            ╠══════ LISTENERS
            #{adapter.listeners.map { |lis| "║ #{lis.format}" }.join("\n")}
            ║
            ╠══════ GENERAL
            ║ ready?: #{ready?}
            ║ healthy?: #{healthy?}
            ║ threads: #{Thread.list.count}
            ║ threads running: #{Thread.list.select { |thread| thread.status == 'run' }.count}
            ╚══════
          MSG
        end
force_shutdown!() click to toggle source

(optional) Kill all subscribers

If a gracefully_shutdown times out or fails, this method will be called. It is a last ditch effort to salvage resources and is used as a “damage control” mechanism.

Should we provide a mechanism to report what caused a forced shutdown?

# File lib/hanami/events/cloud_pubsub/runner.rb, line 57
def force_shutdown!; end
gracefully_shutdown() click to toggle source

Will be called on SIGINT, TERM

Responsible for gracefully shutting down the runner. This may involve waiting for messages to finish processing, etc. If this method succesfully runs, there should be no potential for undefined behavior.

# File lib/hanami/events/cloud_pubsub/runner.rb, line 40
def gracefully_shutdown
  stop
  logger.info "Gracefully shutting down CloudPubsub runner: #{self}"
  adapter.listeners.each { |l| l.wait(@sleep_time) }
  adapter.flush_messages
  handle_on_shutdown

  self
end
handle_on_shutdown() click to toggle source
# File lib/hanami/events/cloud_pubsub/runner.rb, line 104
def handle_on_shutdown
  return if CloudPubsub.on_shutdown_handlers.empty?

  logger.info('Calling custom on_shutdown handler')

  CloudPubsub.on_shutdown_handlers.each do |handler|
    handler.call(adapter)
  rescue StandardError => e
    logger.warn("Shutdown handler failed (#{e.message})")
  end
end
healthy?() click to toggle source

Is the runner healthy enough to keep going?

Indicated whether or not the runner is healthy, useful for determing whether or not the process should be restarted

# File lib/hanami/events/cloud_pubsub/runner.rb, line 77
def healthy?
  ready?
end
ready?() click to toggle source

Is the runner ready to start processing events?

Starting the runner may be asyncronous (spawning threads, etc) Instead of making `start` blocking, expose a way to probe for readiness After this check occurs, `healthy?` will be honored.

This pattern is similar to Kubernete's healthiness and readniess probes and is much more useful than only having a `healthy?` check

See: kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/

# File lib/hanami/events/cloud_pubsub/runner.rb, line 69
def ready?
  adapter.listeners.all?(&:started?)
end
start(_options = {}) click to toggle source

Called to start the runner (subscribes to topics/etc)

# File lib/hanami/events/cloud_pubsub/runner.rb, line 19
def start(_options = {})
  CloudPubsub.finalize_settings!
  logger.info 'Starting CloudPubsub listeners'
  adapter.listeners.map(&:start)
  self
end
stop() click to toggle source

Will be called on TSTP

Stop processing new events (unsubscribe from topics, etc)

# File lib/hanami/events/cloud_pubsub/runner.rb, line 29
def stop
  logger.info 'Stopping CloudPubsub listeners'
  adapter.listeners.each(&:stop)
  self
end