module Karafka::Testing::RSpec::Helpers

RSpec helpers module that needs to be included

Public Class Methods

included(base) click to toggle source

Adds all the needed extra functionalities to the rspec group @param base [Class] RSpec example group we want to extend

# File lib/karafka/testing/rspec/helpers.rb, line 15
def included(base)
  # This is an internal buffer for keeping "to be sent" messages before
  # we run the consume
  base.let(:_karafka_raw_data) { [] }
  # Clear the messages buffer after each spec, so nothing will leak
  # in between them
  base.after { _karafka_raw_data.clear }
end

Public Instance Methods

karafka_consumer_for(requested_topic) click to toggle source

Creates a consumer instance for given topic @param requested_topic [String, Symbol] name of the topic for which we want to

create a consumer instance

@return [Object] described_class instance @raise [Karafka::Testing::Errors::TopicNotFoundError] raised when we're unable to find

topic that was requested

@example Creates a MyConsumer consumer instance with settings for `my_requested_topic`

RSpec.describe MyConsumer do
  subject(:consumer) { karafka_consumer_for(:my_requested_topic) }
end
# File lib/karafka/testing/rspec/helpers.rb, line 36
def karafka_consumer_for(requested_topic)
  selected_topic = nil

  # @note Remove in 2.0. This won't work without the global state
  ::Karafka::App.consumer_groups.each do |consumer_group|
    consumer_group.topics.each do |topic|
      selected_topic = topic if topic.name == requested_topic.to_s
    end
  end

  raise Karafka::Testing::Errors::TopicNotFoundError, requested_topic unless selected_topic

  described_class.new(selected_topic)
end
publish_for_karafka(raw_payload, opts = {}) click to toggle source

Adds a new Karafka params instance with given payload and options into an internal buffer that will be used to simulate messages delivery to the consumer

@param raw_payload [String] anything you want to send @param opts [Hash] additional options with which you want to overwrite the

message defaults (key, offset, etc)

@example Send a json message to consumer

before do
  publish_for_karafka({ 'hello' => 'world' }.to_json)
end

@example Send a json message to consumer and simulate, that it is partition 6

before do
  publish_for_karafka({ 'hello' => 'world' }.to_json, 'partition' => 6)
end
# File lib/karafka/testing/rspec/helpers.rb, line 67
def publish_for_karafka(raw_payload, opts = {})
  metadata = Karafka::Params::Metadata.new(
    **metadata_defaults.merge(opts)
  ).freeze

  _karafka_raw_data << Karafka::Params::Params.new(raw_payload, metadata)
  subject.params_batch = Karafka::Params::ParamsBatch.new(_karafka_raw_data)
end

Private Instance Methods

metadata_defaults() click to toggle source

@return [Hash] message default options

# File lib/karafka/testing/rspec/helpers.rb, line 79
def metadata_defaults
  {
    deserializer: subject.topic.deserializer,
    create_time: Time.now,
    headers: {},
    is_control_record: false,
    key: nil,
    offset: 0,
    partition: 0,
    receive_time: Time.now,
    topic: subject.topic.name
  }
end