class Citrus::EventBus::Subscriber
Public Class Methods
new(configuration = Configuration.new)
click to toggle source
# File lib/citrus/event_bus/subscriber.rb, line 9 def initialize(configuration = Configuration.new) @configuration = configuration @consumer = Poseidon::PartitionConsumer.new( SecureRandom.uuid, configuration.host, configuration.port, configuration.topic, 0, :latest_offset ) @messages_cache = [] end
Public Instance Methods
call()
click to toggle source
# File lib/citrus/event_bus/subscriber.rb, line 22 def call @messages_cache += simulate_blocking_io_because_poseidon_is_not_my_application if @messages_cache.empty? message = @messages_cache.shift @configuration.event_serializer.load(message.value) end
simulate_blocking_io_because_poseidon_is_not_my_application()
click to toggle source
# File lib/citrus/event_bus/subscriber.rb, line 28 def simulate_blocking_io_because_poseidon_is_not_my_application loop do messages = @consumer.fetch(:max_wait => 10_000, :min_bytes => 1) break(messages) if messages.any? end end