class ActiveEvent::ReplayServer

Attributes

options[RW]

Public Class Methods

start(options, id) click to toggle source
# File lib/active_event/replay_server.rb, line 8
def self.start(options, id)
  instance.options = options
  instance.start id
end
update(id) click to toggle source
# File lib/active_event/replay_server.rb, line 13
def self.update(id)
  instance.queue << id
end

Public Instance Methods

event_channel() click to toggle source
# File lib/active_event/replay_server.rb, line 84
def event_channel
  @event_channel ||= event_connection.create_channel
end
event_connection() click to toggle source
# File lib/active_event/replay_server.rb, line 80
def event_connection
  @event_server ||= Bunny.new URI::Generic.build(options[:event_connection]).to_s
end
new_id?() click to toggle source
# File lib/active_event/replay_server.rb, line 52
def new_id?
  unless queue.empty?
    new_id = queue.pop
    if new_id < @last_id
      @last_id = new_id
      return true
    end
  end
  false
end
next_event() click to toggle source
# File lib/active_event/replay_server.rb, line 70
def next_event
  e = @events.shift
  @last_id = e.id
  e
end
next_event?() click to toggle source
# File lib/active_event/replay_server.rb, line 76
def next_event?
  @events.length > 0
end
queue() click to toggle source
# File lib/active_event/replay_server.rb, line 28
def queue
  @queue ||= Queue.new
end
republish(event) click to toggle source
# File lib/active_event/replay_server.rb, line 63
def republish(event)
  type = event.event
  body = event.data.to_json
  resend_exchange.publish body, type: type, headers: {id: event.id, created_at: event.created_at, replayed: true}
  LOGGER.debug "Republished #{type} with #{body}"
end
republish_events() click to toggle source
# File lib/active_event/replay_server.rb, line 44
def republish_events
  while next_event?
    return if new_id?
    republish next_event
    Thread.pass
  end
end
resend_exchange() click to toggle source
# File lib/active_event/replay_server.rb, line 88
def resend_exchange
  @resend_exchange ||= event_channel.fanout "resend_#{options[:event_exchange]}"
end
send_done_message() click to toggle source
# File lib/active_event/replay_server.rb, line 32
def send_done_message
  resend_exchange.publish 'replay_done'
end
start(id) click to toggle source
# File lib/active_event/replay_server.rb, line 17
def start(id)
  event_connection.start
  @last_id = id
  start_republishing
  send_done_message
rescue => e
  LOGGER.error e.message
  LOGGER.error e.backtrace.join("\n")
  raise e
end
start_republishing() click to toggle source
# File lib/active_event/replay_server.rb, line 36
def start_republishing
  loop do
    @events = EventRepository.after_id(@last_id).to_a
    return if @events.empty?
    republish_events
  end
end