class Genesis::Reactor

Main reactor class rubocop:disable ClassLength

Public Class Methods

new(**kwargs) click to toggle source
# File lib/genesis/reactor.rb, line 18
def initialize(**kwargs)
  reset
  @poolsize = kwargs[:threads] || 100 # maximum concurrency - larger = longer boot and shutdown time
  @protocols = kwargs[:protocols] || {}
  @views = kwargs[:views] || {}
  @debug = kwargs[:debug] || false
  register_handlers(kwargs[:handlers] || {})
  register_agents(kwargs[:agents] || [])
end

Public Instance Methods

run() click to toggle source

Run the reactor - must be called from EM.run or EM.synchrony

# File lib/genesis/reactor.rb, line 36
def run
  if running?
    initialize_protocols
    initialize_threadpool
    initialize_sighandlers
    return true
  else
    fail 'Must run from within reactor'
  end
end
running?() click to toggle source

Check if the reactor is running

# File lib/genesis/reactor.rb, line 48
def running?
  EM.reactor_running?
end
start() click to toggle source

Convenience wrapper to run indefinitely as daemon

# File lib/genesis/reactor.rb, line 29
def start
  EM.synchrony do
    run
  end
end
stop() click to toggle source

Stop the reactor

# File lib/genesis/reactor.rb, line 53
def stop
  puts 'Shutting down'
  EM.stop
end

Private Instance Methods

initialize_agents() click to toggle source

Sets up agents

# File lib/genesis/reactor.rb, line 105
def initialize_agents
  @agents.each do |protocol, agents|
    agents.each do |data|
      EM.add_periodic_timer(data[:interval]) do
        EM.defer { data[:block].call(@channels[protocol]) }
      end
    end
  end
end
initialize_protocols() click to toggle source

Initialize protocols to be handled

# File lib/genesis/reactor.rb, line 88
def initialize_protocols
  @protocols.each do |protocol, _|
    server = protocol.server
    @servers[protocol.protocol] = { server: server, port: @protocols[protocol], start: protocol.start_block }
    @channels[protocol.protocol] =  EM::Channel.new
  end
  initialize_servers
  initialize_subscribers
  initialize_agents
end
initialize_servers() click to toggle source

Initialize servers for each protocol

# File lib/genesis/reactor.rb, line 79
def initialize_servers
  @protocols.each do |protocol, _|
    server = @servers[protocol.protocol]
    block = server[:start]
    server[:server].start(server[:port], @routes[protocol.protocol], views: @views, channel: @channels[protocol.protocol], debug: @debug, &block)
  end
end
initialize_sighandlers() click to toggle source

Initialize signal handlers to cleanly shutdown

# File lib/genesis/reactor.rb, line 71
def initialize_sighandlers
  trap(:INT)  do
    stop
    exit
  end
end
initialize_subscribers() click to toggle source

Set up subscriptions to channels

# File lib/genesis/reactor.rb, line 116
def initialize_subscribers
  @subscribers.each do |type, subscribers|
    channel = @channels[type]
    if channel
      subscribers.each do |subscriber|
        channel.subscribe do |message|
          EM.defer { subscriber.call(message) }
        end
      end
    end
  end
end
initialize_threadpool() click to toggle source

Sets the initial size of the threadpool

# File lib/genesis/reactor.rb, line 100
def initialize_threadpool
  EM.threadpool_size = @poolsize
end
register_agent(protocol, method) click to toggle source

Registers an agent for a given protocol

# File lib/genesis/reactor.rb, line 165
def register_agent(protocol, method)
  @agents[protocol] ||= []
  @agents[protocol] << method
end
register_agents(agents) click to toggle source

Registers all agenst

# File lib/genesis/reactor.rb, line 143
def register_agents(agents)
  agents.each do |agent|
    (agent.agents || {}).each do |data|
      register_agent(agent.protocol, data)
    end
  end
end
register_handlers(handlers) click to toggle source

Registers all handlers

# File lib/genesis/reactor.rb, line 130
def register_handlers(handlers)
  handlers.each do |handler|
    (handler.routes || {}).each do |match, data|
      register_route(handler.protocol, data[:verb], match, data[:opts], data[:block])
    end

    (handler.subscribers || []).each do |data|
      register_subscriber(handler.protocol, data[:block])
    end
  end
end
register_route(protocol, verb, match, args, block) click to toggle source

Registers a route for a given protocol

# File lib/genesis/reactor.rb, line 152
def register_route(protocol, verb, match, args, block)
  @routes[protocol] ||= {}
  @routes[protocol][verb] ||= {}
  @routes[protocol][verb][match] = { block: block, args: args }
end
register_subscriber(protocol, block) click to toggle source

Registers a handler for a given protocol

# File lib/genesis/reactor.rb, line 159
def register_subscriber(protocol, block)
  @subscribers[protocol] ||= []
  @subscribers[protocol] << block
end
reset() click to toggle source

Reset / initialize instance variables

# File lib/genesis/reactor.rb, line 61
def reset
  @protocols = {}
  @servers = {}
  @routes = {}
  @subscribers = {}
  @agents = {}
  @channels = {}
end