module Deimos
Parent module.
Class to consume messages coming from a Kafka topic Note: According to the docs, instances of your handler will be created for every incoming message/batch. This class should be lightweight.
Class to consume messages. Can be used with integration testing frameworks. Assumes that you have a topic with only one partition.
Generates a new consumer.
Constants
- VERSION
Public Class Methods
Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes. @param producer_classes [Array<Class>|Class]
# File lib/deimos/producer.rb, line 15 def disable_producers(*producer_classes, &block) if producer_classes.any? _disable_producer_classes(producer_classes, &block) return end if Thread.current[:frk_disable_all_producers] # nested disable block yield return end begin Thread.current[:frk_disable_all_producers] = true yield ensure Thread.current[:frk_disable_all_producers] = false end end
@param schema [String|Symbol] @param namespace [String] @return [Deimos::SchemaBackends::Base]
# File lib/deimos.rb, line 58 def schema_backend(schema:, namespace:) schema_backend_class.new(schema: schema, namespace: namespace) end
@return [Class < Deimos::SchemaBackends::Base]
# File lib/deimos.rb, line 47 def schema_backend_class backend = Deimos.config.schema.backend.to_s require "deimos/schema_backends/#{backend}" "Deimos::SchemaBackends::#{backend.classify}".constantize end
Start the DB producers to send Kafka messages. @param thread_count [Integer] the number of threads to start.
# File lib/deimos.rb, line 64 def start_db_backend!(thread_count: 1) if self.config.producers.backend != :db raise('Publish backend is not set to :db, exiting') end if thread_count.nil? || thread_count.zero? raise('Thread count is not given or set to zero, exiting') end producers = (1..thread_count).map do Deimos::Utils::DbProducer. new(self.config.db_producer.logger || self.config.logger) end executor = Sigurd::Executor.new(producers, sleep_seconds: 5, logger: self.config.logger) signal_handler = Sigurd::SignalHandler.new(executor) signal_handler.run! end