class Citrus::EventBus::Subscriber

Public Class Methods

new(configuration = Configuration.new) click to toggle source
# File lib/citrus/event_bus/subscriber.rb, line 9
def initialize(configuration = Configuration.new)
  @configuration = configuration
  @consumer      = Poseidon::PartitionConsumer.new(
      SecureRandom.uuid,
      configuration.host,
      configuration.port,
      configuration.topic,
      0,
      :latest_offset
  )
  @messages_cache = []
end

Public Instance Methods

call() click to toggle source
# File lib/citrus/event_bus/subscriber.rb, line 22
def call
  @messages_cache += simulate_blocking_io_because_poseidon_is_not_my_application if @messages_cache.empty?
  message = @messages_cache.shift
  @configuration.event_serializer.load(message.value)
end
simulate_blocking_io_because_poseidon_is_not_my_application() click to toggle source
# File lib/citrus/event_bus/subscriber.rb, line 28
def simulate_blocking_io_because_poseidon_is_not_my_application
  loop do
    messages = @consumer.fetch(:max_wait => 10_000, :min_bytes => 1)
    break(messages) if messages.any?
  end
end