class ActiveEvent::EventSourceServer
Attributes
mutex[RW]
options[W]
status[RW]
Public Class Methods
fail_on_projection_error(projection)
click to toggle source
# File lib/active_event/event_source_server.rb, line 20 def fail_on_projection_error(projection) instance.fail_on_projection_error(projection) end
new()
click to toggle source
# File lib/active_event/event_source_server.rb, line 85 def initialize self.mutex = Mutex.new self.status = Hash.new { |h, k| h[k] = Status.new } event_connection.start event_queue.subscribe do |_delivery_info, _properties, body| process_projection JSON.parse(body).symbolize_keys! end end
wait_for_event_projection(event_id, projection, options = {})
click to toggle source
# File lib/active_event/event_source_server.rb, line 16 def wait_for_event_projection(event_id, projection, options = {}) instance.wait_for_event_projection(event_id, projection, options) end
Public Instance Methods
fail_on_projection_error(projection)
click to toggle source
# File lib/active_event/event_source_server.rb, line 34 def fail_on_projection_error(projection) mutex.synchronize do projection_status = status[projection] projection_status.fail_on_error end end
wait_for_event_projection(event_id, projection, options = {})
click to toggle source
# File lib/active_event/event_source_server.rb, line 25 def wait_for_event_projection(event_id, projection, options = {}) mutex.synchronize do projection_status = status[projection] projection_status.fail_on_error # projection will not continue if error occurred projection_status.waiter(event_id).wait(mutex, options[:timeout]) projection_status.fail_on_error end end
Private Instance Methods
config_file()
click to toggle source
# File lib/active_event/event_source_server.rb, line 135 def config_file File.expand_path('config/disco.yml', Rails.root) end
default_options()
click to toggle source
# File lib/active_event/event_source_server.rb, line 118 def default_options { event_connection: { scheme: 'amqp', userinfo: nil, host: '127.0.0.1', port: 9797, }, event_exchange: 'events', } end
env()
click to toggle source
# File lib/active_event/event_source_server.rb, line 143 def env @env = ENV['PROJECTION_ENV'] || ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development' end
event_channel()
click to toggle source
# File lib/active_event/event_source_server.rb, line 110 def event_channel @event_channel ||= event_connection.create_channel end
event_connection()
click to toggle source
# File lib/active_event/event_source_server.rb, line 106 def event_connection @event_server ||= Bunny.new URI::Generic.build(options[:event_connection]).to_s end
event_exchange()
click to toggle source
# File lib/active_event/event_source_server.rb, line 114 def event_exchange @@event_exchange ||= event_channel.fanout "server_side_#{options[:event_exchange]}" end
event_queue()
click to toggle source
# File lib/active_event/event_source_server.rb, line 102 def event_queue @event_queue ||= event_channel.queue('', auto_delete: true).bind(event_exchange) end
options()
click to toggle source
# File lib/active_event/event_source_server.rb, line 139 def options @options ||= parse_options(ARGV) end
parse_options(_args)
click to toggle source
# File lib/active_event/event_source_server.rb, line 130 def parse_options(_args) options = default_options options.merge! YAML.load_file(config_file)[env].deep_symbolize_keys! unless config_file.blank? end
process_projection(data)
click to toggle source
# File lib/active_event/event_source_server.rb, line 94 def process_projection(data) mutex.synchronize do projection_status = status[data[:projection]] projection_status.set_error data[:error], data[:backtrace] projection_status.event = data[:event] end end