module Estore::Commands::Subscription
Public Class Methods
new(connection, stream, options = {})
click to toggle source
Calls superclass method
# File lib/estore/commands/subscription.rb, line 4 def initialize(connection, stream, options = {}) super(connection) @has_finished = false @stream = stream @resolve_link_tos = options.fetch(:resolve_link_tos, true) @worker_queue = Queue.new @worker = Thread.new { loop { worker_loop } } end
Public Instance Methods
call()
click to toggle source
# File lib/estore/commands/subscription.rb, line 13 def call start end
close()
click to toggle source
# File lib/estore/commands/subscription.rb, line 29 def close write('UnsubscribeFromStream', UnsubscribeFromStream.new) remove! end
enqueue(events)
click to toggle source
# File lib/estore/commands/subscription.rb, line 38 def enqueue(events) events = Array(events) @position = events.last.original_event_number @worker_queue << events end
on_event(&block)
click to toggle source
# File lib/estore/commands/subscription.rb, line 34 def on_event(&block) @handler = block end
start()
click to toggle source
# File lib/estore/commands/subscription.rb, line 17 def start raise 'Subscription block not defined' unless @handler msg = SubscribeToStream.new( event_stream_id: @stream, resolve_link_tos: @resolve_link_tos ) register! write('SubscribeToStream', msg) end
worker_loop()
click to toggle source
# File lib/estore/commands/subscription.rb, line 44 def worker_loop @worker_queue.pop.each { |event| @handler.call(event) } rescue => e puts e.message puts e.backtrace end