class Karafka::Server

Karafka consuming server class

Constants

FORCEFUL_EXIT_CODE

What system exit code should we use when we terminated forcefully

SUPERVISION_CHECK_FACTOR

This factor allows us to calculate how many times we have to sleep before a forceful shutdown

SUPERVISION_SLEEP

How long should we sleep between checks on shutting down consumers

Attributes

consumer_groups[W]

Writer for list of consumer groups that we want to consume in our current process context

consumer_threads[RW]

Set of consuming threads. Each consumer thread contains a single consumer

Public Class Methods

consumer_groups() click to toggle source

@return [Array<String>] array with names of consumer groups that should be consumed in a

current server context
# File lib/karafka/server.rb, line 35
def consumer_groups
  # If not specified, a server will listed on all the topics
  @consumer_groups ||= Karafka::App.consumer_groups.map(&:name).freeze
end
run() click to toggle source

Method which runs app

# File lib/karafka/server.rb, line 26
def run
  process.on_sigint { stop_supervised }
  process.on_sigquit { stop_supervised }
  process.on_sigterm { stop_supervised }
  run_supervised
end

Private Class Methods

process() click to toggle source

@return [Karafka::Process] process wrapper instance used to catch system signal calls

# File lib/karafka/server.rb, line 43
def process
  Karafka::App.config.internal.process
end
run_supervised() click to toggle source

Starts Karafka with a supervision @note We don't need to sleep because Karafka::Fetcher is locking and waiting to finish loop (and it won't happen until we explicitly want to stop)

# File lib/karafka/server.rb, line 50
def run_supervised
  process.supervise
  Karafka::App.run!
  Karafka::App.config.internal.fetcher.call
end
stop_supervised() click to toggle source

Stops Karafka with a supervision (as long as there is a shutdown timeout) If consumers won't stop in a given time frame, it will force them to exit

# File lib/karafka/server.rb, line 58
def stop_supervised
  Karafka::App.stop!

  # Temporary patch until https://github.com/dry-rb/dry-configurable/issues/93 is fixed
  timeout = Thread.new { Karafka::App.config.shutdown_timeout }.join.value

  # We check from time to time (for the timeout period) if all the threads finished
  # their work and if so, we can just return and normal shutdown process will take place
  (timeout * SUPERVISION_CHECK_FACTOR).to_i.times do
    if consumer_threads.count(&:alive?).zero?
      Thread.new { Karafka.monitor.instrument('app.stopped') }.join
      return
    end

    sleep SUPERVISION_SLEEP
  end

  raise Errors::ForcefulShutdownError
rescue Errors::ForcefulShutdownError => e
  Thread.new { Karafka.monitor.instrument('app.stopping.error', error: e) }.join
  # We're done waiting, lets kill them!
  consumer_threads.each(&:terminate)

  # exit! is not within the instrumentation as it would not trigger due to exit
  Kernel.exit! FORCEFUL_EXIT_CODE
end