class Akasha::AsyncEventRouter
Event
router working that can run in the background, providing eventual consistency. Can use the same EventListeners as the synchronous EventRouter
.
Constants
- DEFAULT_CHECKPOINT_STRATEGY
- DEFAULT_PAGE_SIZE
- DEFAULT_POLL_SECONDS
- DEFAULT_PROJECTION_STREAM
- STREAM_NAME_SEP
Public Instance Methods
connect!(repository, projection_name: nil, checkpoint_strategy: DEFAULT_CHECKPOINT_STRATEGY, page_size: DEFAULT_PAGE_SIZE, poll: DEFAULT_POLL_SECONDS)
click to toggle source
# File lib/akasha/async_event_router.rb, line 14 def connect!(repository, projection_name: nil, checkpoint_strategy: DEFAULT_CHECKPOINT_STRATEGY, page_size: DEFAULT_PAGE_SIZE, poll: DEFAULT_POLL_SECONDS) projection_name = projection_name(repository) if projection_name.nil? repository.merge_all_by_event(into: projection_name, only: registered_event_names) projection_stream = repository.store.streams[projection_name] checkpoint = checkpoint_strategy.is_a?(Class) ? checkpoint_strategy.new(projection_stream) : checkpoint_strategy Thread.new do run_forever(projection_stream, checkpoint, page_size, poll) end end
Private Instance Methods
projection_name(repository)
click to toggle source
# File lib/akasha/async_event_router.rb, line 50 def projection_name(repository) parts = [] parts << repository.namespace unless repository.namespace.nil? parts << DEFAULT_PROJECTION_STREAM parts.join(STREAM_NAME_SEP) end
run_forever(projection_stream, checkpoint, page_size, poll)
click to toggle source
TODO: Make it stoppable.
# File lib/akasha/async_event_router.rb, line 30 def run_forever(projection_stream, checkpoint, page_size, poll) position = checkpoint.latest loop do projection_stream.read_events(position, page_size, poll: poll) do |events| begin events.each do |event| route(event.name, event.metadata[:aggregate_id], **event.data) position = checkpoint.ack(position) end rescue RuntimeError => e puts e # TODO: Decide on a strategy. position = checkpoint.ack(position) end end end rescue RutimeError => e puts e # TODO: Decide on a strategy. raise end