class ActiveEvent::EventSourceServer

Attributes

mutex[RW]
options[W]
status[RW]

Public Class Methods

fail_on_projection_error(projection) click to toggle source
# File lib/active_event/event_source_server.rb, line 20
def fail_on_projection_error(projection)
  instance.fail_on_projection_error(projection)
end
new() click to toggle source
# File lib/active_event/event_source_server.rb, line 85
def initialize
  self.mutex = Mutex.new
  self.status = Hash.new { |h, k| h[k] = Status.new }
  event_connection.start
  event_queue.subscribe do |_delivery_info, _properties, body|
    process_projection JSON.parse(body).symbolize_keys!
  end
end
wait_for_event_projection(event_id, projection, options = {}) click to toggle source
# File lib/active_event/event_source_server.rb, line 16
def wait_for_event_projection(event_id, projection, options = {})
  instance.wait_for_event_projection(event_id, projection, options)
end

Public Instance Methods

fail_on_projection_error(projection) click to toggle source
# File lib/active_event/event_source_server.rb, line 34
def fail_on_projection_error(projection)
  mutex.synchronize do
    projection_status = status[projection]
    projection_status.fail_on_error
  end
end
wait_for_event_projection(event_id, projection, options = {}) click to toggle source
# File lib/active_event/event_source_server.rb, line 25
def wait_for_event_projection(event_id, projection, options = {})
  mutex.synchronize do
    projection_status = status[projection]
    projection_status.fail_on_error # projection will not continue if error occurred
    projection_status.waiter(event_id).wait(mutex, options[:timeout])
    projection_status.fail_on_error
  end
end

Private Instance Methods

config_file() click to toggle source
# File lib/active_event/event_source_server.rb, line 135
def config_file
  File.expand_path('config/disco.yml', Rails.root)
end
default_options() click to toggle source
# File lib/active_event/event_source_server.rb, line 118
def default_options
  {
    event_connection: {
      scheme: 'amqp',
      userinfo: nil,
      host: '127.0.0.1',
      port: 9797,
    },
    event_exchange: 'events',
  }
end
env() click to toggle source
# File lib/active_event/event_source_server.rb, line 143
def env
  @env = ENV['PROJECTION_ENV'] || ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development'
end
event_channel() click to toggle source
# File lib/active_event/event_source_server.rb, line 110
def event_channel
  @event_channel ||= event_connection.create_channel
end
event_connection() click to toggle source
# File lib/active_event/event_source_server.rb, line 106
def event_connection
  @event_server ||= Bunny.new URI::Generic.build(options[:event_connection]).to_s
end
event_exchange() click to toggle source
# File lib/active_event/event_source_server.rb, line 114
def event_exchange
  @@event_exchange ||= event_channel.fanout "server_side_#{options[:event_exchange]}"
end
event_queue() click to toggle source
# File lib/active_event/event_source_server.rb, line 102
def event_queue
  @event_queue ||= event_channel.queue('', auto_delete: true).bind(event_exchange)
end
options() click to toggle source
# File lib/active_event/event_source_server.rb, line 139
def options
  @options ||= parse_options(ARGV)
end
parse_options(_args) click to toggle source
# File lib/active_event/event_source_server.rb, line 130
def parse_options(_args)
  options = default_options
  options.merge! YAML.load_file(config_file)[env].deep_symbolize_keys! unless config_file.blank?
end
process_projection(data) click to toggle source
# File lib/active_event/event_source_server.rb, line 94
def process_projection(data)
  mutex.synchronize do
    projection_status = status[data[:projection]]
    projection_status.set_error data[:error], data[:backtrace]
    projection_status.event = data[:event]
  end
end