module Estore::Commands::Subscription

Public Class Methods

new(connection, stream, options = {}) click to toggle source
Calls superclass method
# File lib/estore/commands/subscription.rb, line 4
def initialize(connection, stream, options = {})
  super(connection)
  @has_finished = false
  @stream = stream
  @resolve_link_tos = options.fetch(:resolve_link_tos, true)
  @worker_queue = Queue.new
  @worker = Thread.new { loop { worker_loop } }
end

Public Instance Methods

call() click to toggle source
# File lib/estore/commands/subscription.rb, line 13
def call
  start
end
close() click to toggle source
# File lib/estore/commands/subscription.rb, line 29
def close
  write('UnsubscribeFromStream', UnsubscribeFromStream.new)
  remove!
end
enqueue(events) click to toggle source
# File lib/estore/commands/subscription.rb, line 38
def enqueue(events)
  events = Array(events)
  @position = events.last.original_event_number
  @worker_queue << events
end
on_event(&block) click to toggle source
# File lib/estore/commands/subscription.rb, line 34
def on_event(&block)
  @handler = block
end
start() click to toggle source
# File lib/estore/commands/subscription.rb, line 17
def start
  raise 'Subscription block not defined' unless @handler

  msg = SubscribeToStream.new(
    event_stream_id: @stream,
    resolve_link_tos: @resolve_link_tos
  )

  register!
  write('SubscribeToStream', msg)
end
worker_loop() click to toggle source
# File lib/estore/commands/subscription.rb, line 44
def worker_loop
  @worker_queue.pop.each { |event| @handler.call(event) }
rescue => e
  puts e.message
  puts e.backtrace
end