class Estore::Commands::CatchUpSubscription

Public Class Methods

new(connection, stream, from, options = {}) click to toggle source
Calls superclass method Estore::Commands::Subscription::new
# File lib/estore/commands/subscriptions/catch_up.rb, line 10
def initialize(connection, stream, from, options = {})
  super(connection, stream, options)
  @from = from
  @batch = options[:batch_size]
  @mutex = Mutex.new
  @while_catching_up = []
  @caught_up = false
end

Public Instance Methods

event_appeared(response) click to toggle source
# File lib/estore/commands/subscriptions/catch_up.rb, line 42
def event_appeared(response)
  unless @caught_up
    @mutex.synchronize do
      @while_catching_up << response.event unless @caught_up
    end
  end

  enqueue response.event if @caught_up
end
start() click to toggle source
Calls superclass method Estore::Commands::Subscription#start
# File lib/estore/commands/subscriptions/catch_up.rb, line 19
def start
  super

  # TODO: Think about doing something more clever?
  read = ReadForward.new(@connection, @stream, @from, @batch) do |events|
    enqueue events unless events.empty?
  end

  read.call.sync
  switch_to_live
end
switch_to_live() click to toggle source
# File lib/estore/commands/subscriptions/catch_up.rb, line 31
def switch_to_live
  @mutex.synchronize do
    unprocessed_events.each { |event| enqueue event }
    @caught_up = true
  end
end
unprocessed_events() click to toggle source
# File lib/estore/commands/subscriptions/catch_up.rb, line 38
def unprocessed_events
  @while_catching_up.find_all { |event| event.original_event_number > @position }
end