class ActiveEvent::ReplayServer
Attributes
options[RW]
Public Class Methods
start(options, id)
click to toggle source
# File lib/active_event/replay_server.rb, line 8 def self.start(options, id) instance.options = options instance.start id end
update(id)
click to toggle source
# File lib/active_event/replay_server.rb, line 13 def self.update(id) instance.queue << id end
Public Instance Methods
event_channel()
click to toggle source
# File lib/active_event/replay_server.rb, line 84 def event_channel @event_channel ||= event_connection.create_channel end
event_connection()
click to toggle source
# File lib/active_event/replay_server.rb, line 80 def event_connection @event_server ||= Bunny.new URI::Generic.build(options[:event_connection]).to_s end
new_id?()
click to toggle source
# File lib/active_event/replay_server.rb, line 52 def new_id? unless queue.empty? new_id = queue.pop if new_id < @last_id @last_id = new_id return true end end false end
next_event()
click to toggle source
# File lib/active_event/replay_server.rb, line 70 def next_event e = @events.shift @last_id = e.id e end
next_event?()
click to toggle source
# File lib/active_event/replay_server.rb, line 76 def next_event? @events.length > 0 end
queue()
click to toggle source
# File lib/active_event/replay_server.rb, line 28 def queue @queue ||= Queue.new end
republish(event)
click to toggle source
# File lib/active_event/replay_server.rb, line 63 def republish(event) type = event.event body = event.data.to_json resend_exchange.publish body, type: type, headers: {id: event.id, created_at: event.created_at, replayed: true} LOGGER.debug "Republished #{type} with #{body}" end
republish_events()
click to toggle source
# File lib/active_event/replay_server.rb, line 44 def republish_events while next_event? return if new_id? republish next_event Thread.pass end end
resend_exchange()
click to toggle source
# File lib/active_event/replay_server.rb, line 88 def resend_exchange @resend_exchange ||= event_channel.fanout "resend_#{options[:event_exchange]}" end
send_done_message()
click to toggle source
# File lib/active_event/replay_server.rb, line 32 def send_done_message resend_exchange.publish 'replay_done' end
start(id)
click to toggle source
# File lib/active_event/replay_server.rb, line 17 def start(id) event_connection.start @last_id = id start_republishing send_done_message rescue => e LOGGER.error e.message LOGGER.error e.backtrace.join("\n") raise e end
start_republishing()
click to toggle source
# File lib/active_event/replay_server.rb, line 36 def start_republishing loop do @events = EventRepository.after_id(@last_id).to_a return if @events.empty? republish_events end end