module Deimos

Parent module.

Class to consume messages coming from a Kafka topic Note: According to the docs, instances of your handler will be created for every incoming message/batch. This class should be lightweight.

Class to consume messages. Can be used with integration testing frameworks. Assumes that you have a topic with only one partition.

Generates a new consumer.

Constants

VERSION

Public Class Methods

disable_producers(*producer_classes) { || ... } click to toggle source

Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes. @param producer_classes [Array<Class>|Class]

# File lib/deimos/producer.rb, line 15
def disable_producers(*producer_classes, &block)
  if producer_classes.any?
    _disable_producer_classes(producer_classes, &block)
    return
  end

  if Thread.current[:frk_disable_all_producers] # nested disable block
    yield
    return
  end

  begin
    Thread.current[:frk_disable_all_producers] = true
    yield
  ensure
    Thread.current[:frk_disable_all_producers] = false
  end
end
schema_backend(schema:, namespace:) click to toggle source

@param schema [String|Symbol] @param namespace [String] @return [Deimos::SchemaBackends::Base]

# File lib/deimos.rb, line 58
def schema_backend(schema:, namespace:)
  schema_backend_class.new(schema: schema, namespace: namespace)
end
schema_backend_class() click to toggle source

@return [Class < Deimos::SchemaBackends::Base]

# File lib/deimos.rb, line 47
def schema_backend_class
  backend = Deimos.config.schema.backend.to_s

  require "deimos/schema_backends/#{backend}"

  "Deimos::SchemaBackends::#{backend.classify}".constantize
end
start_db_backend!(thread_count: 1) click to toggle source

Start the DB producers to send Kafka messages. @param thread_count [Integer] the number of threads to start.

# File lib/deimos.rb, line 64
def start_db_backend!(thread_count: 1)
  if self.config.producers.backend != :db
    raise('Publish backend is not set to :db, exiting')
  end

  if thread_count.nil? || thread_count.zero?
    raise('Thread count is not given or set to zero, exiting')
  end

  producers = (1..thread_count).map do
    Deimos::Utils::DbProducer.
      new(self.config.db_producer.logger || self.config.logger)
  end
  executor = Sigurd::Executor.new(producers,
                                  sleep_seconds: 5,
                                  logger: self.config.logger)
  signal_handler = Sigurd::SignalHandler.new(executor)
  signal_handler.run!
end