class ActiveProjection::EventClient
Attributes
current[RW]
delay_queue[RW]
options[RW]
running[RW]
Public Class Methods
start(options)
click to toggle source
# File lib/active_projection/event_client.rb, line 8 def self.start(options) instance.configure(options).start end
Public Instance Methods
configure(options)
click to toggle source
# File lib/active_projection/event_client.rb, line 12 def configure(options) fail 'Unsupported! Cannot configure running client' if running self.options = options self end
start()
click to toggle source
# File lib/active_projection/event_client.rb, line 18 def start run_once do prepare sync_projections listen_for_events listen_for_replayed_events request_missing_events event_channel.work_pool.join end rescue Interrupt LOGGER.info 'Catching Interrupt' rescue => e LOGGER.error e.message LOGGER.error e.backtrace.join("\n") raise end
Private Instance Methods
event_channel()
click to toggle source
# File lib/active_projection/event_client.rb, line 151 def event_channel @event_channel ||= event_connection.create_channel end
event_connection()
click to toggle source
# File lib/active_projection/event_client.rb, line 147 def event_connection @event_server ||= Bunny.new URI::Generic.build(options[:event_connection]).to_s end
event_exchange()
click to toggle source
# File lib/active_projection/event_client.rb, line 155 def event_exchange @event_exchange ||= event_channel.fanout options[:event_exchange] end
event_queue()
click to toggle source
# File lib/active_projection/event_client.rb, line 143 def event_queue @event_queue ||= event_channel.queue('', auto_delete: true).bind(event_exchange) end
event_received(properties, body)
click to toggle source
# File lib/active_projection/event_client.rb, line 118 def event_received(properties, body) RELOADER.execute_if_updated LOGGER.debug "Received #{properties.type} with #{body}" headers = properties.headers.deep_symbolize_keys! event = ActiveEvent::EventType.create_instance properties.type, JSON.parse(body).deep_symbolize_keys! process_event headers, event end
flush_delay_queue()
click to toggle source
# File lib/active_projection/event_client.rb, line 113 def flush_delay_queue delay_queue.each { |properties, body| event_received properties, body } self.delay_queue = [] end
init_database_connection()
click to toggle source
# File lib/active_projection/event_client.rb, line 56 def init_database_connection ActiveRecord::Base.establish_connection options[:projection_database] end
listen_for_events()
click to toggle source
# File lib/active_projection/event_client.rb, line 70 def listen_for_events event_queue.subscribe do |_delivery_info, properties, body| if current event_received properties, body else delay_queue << [properties, body] end end end
listen_for_replayed_events()
click to toggle source
# File lib/active_projection/event_client.rb, line 80 def listen_for_replayed_events replay_queue.subscribe do |_delivery_info, properties, body| if 'replay_done' == body replay_done else event_received properties, body end end end
prepare()
click to toggle source
# File lib/active_projection/event_client.rb, line 50 def prepare self.delay_queue = [] init_database_connection event_connection.start end
process_event(headers, event)
click to toggle source
# File lib/active_projection/event_client.rb, line 126 def process_event(headers, event) server_projections.select { |p| p.evaluate headers }.each do |projection| begin ActiveRecord::Base.transaction do projection.invoke event, headers end send_projection_notification headers[:id], projection rescue => e send_projection_notification headers[:id], projection, e end end end
replay_done()
click to toggle source
# File lib/active_projection/event_client.rb, line 104 def replay_done LOGGER.debug 'All replayed events received' broken_projections = CachedProjectionRepository.all_broken LOGGER.error "These projections are still broken: #{broken_projections.join(', ')}" unless broken_projections.empty? replay_queue.unbind(resend_exchange) self.current = true flush_delay_queue end
replay_queue()
click to toggle source
# File lib/active_projection/event_client.rb, line 139 def replay_queue @replay_queue ||= event_channel.queue('', auto_delete: true).bind(resend_exchange) end
request_missing_events()
click to toggle source
# File lib/active_projection/event_client.rb, line 90 def request_missing_events send_request_for(CachedProjectionRepository.last_ids.min || 0) end
resend_exchange()
click to toggle source
# File lib/active_projection/event_client.rb, line 159 def resend_exchange @resend_exchange ||= event_channel.fanout "resend_#{options[:event_exchange]}" end
resend_request_exchange()
click to toggle source
# File lib/active_projection/event_client.rb, line 163 def resend_request_exchange @resend_request_exchange ||= event_channel.direct "resend_request_#{options[:event_exchange]}" end
run_once() { || ... }
click to toggle source
# File lib/active_projection/event_client.rb, line 42 def run_once fail 'Unsupported! Connot start a running client' if running self.running = true yield ensure self.running = false end
send_projection_notification(event_id, projection, error = nil)
click to toggle source
# File lib/active_projection/event_client.rb, line 94 def send_projection_notification(event_id, projection, error = nil) message = {event: event_id, projection: projection.class.name} message.merge! error: "#{error.class.name}: #{error.message}", backtrace: error.backtrace if error server_side_events_exchange.publish message.to_json end
send_request_for(id)
click to toggle source
# File lib/active_projection/event_client.rb, line 100 def send_request_for(id) resend_request_exchange.publish id.to_s, routing_key: 'resend_request' end
server_projections()
click to toggle source
# File lib/active_projection/event_client.rb, line 66 def server_projections @server_projections ||= ProjectionTypeRegistry.projections.drop(WORKER_NUMBER).each_slice(WORKER_COUNT).map(&:first) end
server_side_events_exchange()
click to toggle source
# File lib/active_projection/event_client.rb, line 167 def server_side_events_exchange @server_side_events_exchange ||= event_channel.fanout "server_side_#{options[:event_exchange]}" end
sync_projections()
click to toggle source
# File lib/active_projection/event_client.rb, line 60 def sync_projections server_projections.each do |projection| CachedProjectionRepository.ensure_solid(projection.class.name) end end