class Estore::Commands::CatchUpSubscription
Public Class Methods
new(connection, stream, from, options = {})
click to toggle source
Calls superclass method
Estore::Commands::Subscription::new
# File lib/estore/commands/subscriptions/catch_up.rb, line 10 def initialize(connection, stream, from, options = {}) super(connection, stream, options) @from = from @batch = options[:batch_size] @mutex = Mutex.new @while_catching_up = [] @caught_up = false end
Public Instance Methods
event_appeared(response)
click to toggle source
# File lib/estore/commands/subscriptions/catch_up.rb, line 42 def event_appeared(response) unless @caught_up @mutex.synchronize do @while_catching_up << response.event unless @caught_up end end enqueue response.event if @caught_up end
start()
click to toggle source
Calls superclass method
Estore::Commands::Subscription#start
# File lib/estore/commands/subscriptions/catch_up.rb, line 19 def start super # TODO: Think about doing something more clever? read = ReadForward.new(@connection, @stream, @from, @batch) do |events| enqueue events unless events.empty? end read.call.sync switch_to_live end
switch_to_live()
click to toggle source
# File lib/estore/commands/subscriptions/catch_up.rb, line 31 def switch_to_live @mutex.synchronize do unprocessed_events.each { |event| enqueue event } @caught_up = true end end
unprocessed_events()
click to toggle source
# File lib/estore/commands/subscriptions/catch_up.rb, line 38 def unprocessed_events @while_catching_up.find_all { |event| event.original_event_number > @position } end