class EsReadModel::Stream
Public Class Methods
new(head_uri, connection, listener)
click to toggle source
# File lib/es_readmodel/stream.rb, line 11 def initialize(head_uri, connection, listener) @connection = connection @listener = listener @current_etag = nil fetch_first_page(head_uri) end
open(name, connection, listener)
click to toggle source
# File lib/es_readmodel/stream.rb, line 7 def Stream.open(name, connection, listener) Stream.new("/streams/#{name}", connection, listener) end
Public Instance Methods
each_event(&blk)
click to toggle source
# File lib/es_readmodel/stream.rb, line 25 def each_event(&blk) while !@current_page.empty? @current_page.each_event(&blk) fetch(@current_page.newer_events_uri) if @current_page.newer_events_uri end end
wait_for_new_events()
click to toggle source
# File lib/es_readmodel/stream.rb, line 18 def wait_for_new_events while @current_page.empty? sleep 1 fetch(@current_uri) end end
Private Instance Methods
fetch(uri)
click to toggle source
# File lib/es_readmodel/stream.rb, line 65 def fetch(uri) response = @connection.get(uri, @current_etag) @current_page = Page.new(response.body) @current_uri = uri @current_etag = response.headers['etag'] end
fetch_first_page(uri)
click to toggle source
# File lib/es_readmodel/stream.rb, line 34 def fetch_first_page(uri) back_off = 1 @listener.call({ level: 'info', tag: 'fetchFirstPage.connecting', msg: "Connecting to #{uri} on #{@connection}" }) loop do begin fetch(uri) last = @current_page.first_event_uri fetch(last) if last @listener.call({ level: 'info', tag: 'fetchFirstPage.connected', msg: "Connected to #{uri} on #{@connection}", eventsWaiting: !@current_page.empty? }) return rescue Exception => ex @listener.call({ level: 'error', tag: 'fetchFirstPage.error', msg: "#{ex.class}: #{ex.message}. Retry in #{back_off}s." }) sleep back_off back_off *= 2 end end end