class ActiveProjection::EventClient

Attributes

current[RW]
delay_queue[RW]
options[RW]
running[RW]

Public Class Methods

start(options) click to toggle source
# File lib/active_projection/event_client.rb, line 8
def self.start(options)
  instance.configure(options).start
end

Public Instance Methods

configure(options) click to toggle source
# File lib/active_projection/event_client.rb, line 12
def configure(options)
  fail 'Unsupported! Cannot configure running client' if running
  self.options = options
  self
end
start() click to toggle source
# File lib/active_projection/event_client.rb, line 18
def start
  run_once do
    prepare
    sync_projections
    listen_for_events
    listen_for_replayed_events
    request_missing_events
    event_channel.work_pool.join
  end
rescue Interrupt
  LOGGER.info 'Catching Interrupt'
rescue => e
  LOGGER.error e.message
  LOGGER.error e.backtrace.join("\n")
  raise
end

Private Instance Methods

event_channel() click to toggle source
# File lib/active_projection/event_client.rb, line 151
def event_channel
  @event_channel ||= event_connection.create_channel
end
event_connection() click to toggle source
# File lib/active_projection/event_client.rb, line 147
def event_connection
  @event_server ||= Bunny.new URI::Generic.build(options[:event_connection]).to_s
end
event_exchange() click to toggle source
# File lib/active_projection/event_client.rb, line 155
def event_exchange
  @event_exchange ||= event_channel.fanout options[:event_exchange]
end
event_queue() click to toggle source
# File lib/active_projection/event_client.rb, line 143
def event_queue
  @event_queue ||= event_channel.queue('', auto_delete: true).bind(event_exchange)
end
event_received(properties, body) click to toggle source
# File lib/active_projection/event_client.rb, line 118
def event_received(properties, body)
  RELOADER.execute_if_updated
  LOGGER.debug "Received #{properties.type} with #{body}"
  headers = properties.headers.deep_symbolize_keys!
  event = ActiveEvent::EventType.create_instance properties.type, JSON.parse(body).deep_symbolize_keys!
  process_event headers, event
end
flush_delay_queue() click to toggle source
# File lib/active_projection/event_client.rb, line 113
def flush_delay_queue
  delay_queue.each { |properties, body| event_received properties, body }
  self.delay_queue = []
end
init_database_connection() click to toggle source
# File lib/active_projection/event_client.rb, line 56
def init_database_connection
  ActiveRecord::Base.establish_connection options[:projection_database]
end
listen_for_events() click to toggle source
# File lib/active_projection/event_client.rb, line 70
def listen_for_events
  event_queue.subscribe do |_delivery_info, properties, body|
    if current
      event_received properties, body
    else
      delay_queue << [properties, body]
    end
  end
end
listen_for_replayed_events() click to toggle source
# File lib/active_projection/event_client.rb, line 80
def listen_for_replayed_events
  replay_queue.subscribe do |_delivery_info, properties, body|
    if 'replay_done' == body
      replay_done
    else
      event_received properties, body
    end
  end
end
prepare() click to toggle source
# File lib/active_projection/event_client.rb, line 50
def prepare
  self.delay_queue = []
  init_database_connection
  event_connection.start
end
process_event(headers, event) click to toggle source
# File lib/active_projection/event_client.rb, line 126
def process_event(headers, event)
  server_projections.select { |p| p.evaluate headers }.each do |projection|
    begin
      ActiveRecord::Base.transaction do
        projection.invoke event, headers
      end
      send_projection_notification headers[:id], projection
    rescue => e
      send_projection_notification headers[:id], projection, e
    end
  end
end
replay_done() click to toggle source
# File lib/active_projection/event_client.rb, line 104
def replay_done
  LOGGER.debug 'All replayed events received'
  broken_projections = CachedProjectionRepository.all_broken
  LOGGER.error "These projections are still broken: #{broken_projections.join(', ')}" unless broken_projections.empty?
  replay_queue.unbind(resend_exchange)
  self.current = true
  flush_delay_queue
end
replay_queue() click to toggle source
# File lib/active_projection/event_client.rb, line 139
def replay_queue
  @replay_queue ||= event_channel.queue('', auto_delete: true).bind(resend_exchange)
end
request_missing_events() click to toggle source
# File lib/active_projection/event_client.rb, line 90
def request_missing_events
  send_request_for(CachedProjectionRepository.last_ids.min || 0)
end
resend_exchange() click to toggle source
# File lib/active_projection/event_client.rb, line 159
def resend_exchange
  @resend_exchange ||= event_channel.fanout "resend_#{options[:event_exchange]}"
end
resend_request_exchange() click to toggle source
# File lib/active_projection/event_client.rb, line 163
def resend_request_exchange
  @resend_request_exchange ||= event_channel.direct "resend_request_#{options[:event_exchange]}"
end
run_once() { || ... } click to toggle source
# File lib/active_projection/event_client.rb, line 42
def run_once
  fail 'Unsupported! Connot start a running client' if running
  self.running = true
  yield
ensure
  self.running = false
end
send_projection_notification(event_id, projection, error = nil) click to toggle source
# File lib/active_projection/event_client.rb, line 94
def send_projection_notification(event_id, projection, error = nil)
  message = {event: event_id, projection: projection.class.name}
  message.merge! error: "#{error.class.name}: #{error.message}", backtrace: error.backtrace if error
  server_side_events_exchange.publish message.to_json
end
send_request_for(id) click to toggle source
# File lib/active_projection/event_client.rb, line 100
def send_request_for(id)
  resend_request_exchange.publish id.to_s, routing_key: 'resend_request'
end
server_projections() click to toggle source
# File lib/active_projection/event_client.rb, line 66
def server_projections
  @server_projections ||= ProjectionTypeRegistry.projections.drop(WORKER_NUMBER).each_slice(WORKER_COUNT).map(&:first)
end
server_side_events_exchange() click to toggle source
# File lib/active_projection/event_client.rb, line 167
def server_side_events_exchange
  @server_side_events_exchange ||= event_channel.fanout "server_side_#{options[:event_exchange]}"
end
sync_projections() click to toggle source
# File lib/active_projection/event_client.rb, line 60
def sync_projections
  server_projections.each do |projection|
    CachedProjectionRepository.ensure_solid(projection.class.name)
  end
end