class Genesis::Reactor
Main reactor class rubocop:disable ClassLength
Public Class Methods
new(**kwargs)
click to toggle source
# File lib/genesis/reactor.rb, line 18 def initialize(**kwargs) reset @poolsize = kwargs[:threads] || 100 # maximum concurrency - larger = longer boot and shutdown time @protocols = kwargs[:protocols] || {} @views = kwargs[:views] || {} @debug = kwargs[:debug] || false register_handlers(kwargs[:handlers] || {}) register_agents(kwargs[:agents] || []) end
Public Instance Methods
run()
click to toggle source
Run the reactor - must be called from EM.run or EM.synchrony
# File lib/genesis/reactor.rb, line 36 def run if running? initialize_protocols initialize_threadpool initialize_sighandlers return true else fail 'Must run from within reactor' end end
running?()
click to toggle source
Check if the reactor is running
# File lib/genesis/reactor.rb, line 48 def running? EM.reactor_running? end
start()
click to toggle source
Convenience wrapper to run indefinitely as daemon
# File lib/genesis/reactor.rb, line 29 def start EM.synchrony do run end end
stop()
click to toggle source
Stop the reactor
# File lib/genesis/reactor.rb, line 53 def stop puts 'Shutting down' EM.stop end
Private Instance Methods
initialize_agents()
click to toggle source
Sets up agents
# File lib/genesis/reactor.rb, line 105 def initialize_agents @agents.each do |protocol, agents| agents.each do |data| EM.add_periodic_timer(data[:interval]) do EM.defer { data[:block].call(@channels[protocol]) } end end end end
initialize_protocols()
click to toggle source
Initialize protocols to be handled
# File lib/genesis/reactor.rb, line 88 def initialize_protocols @protocols.each do |protocol, _| server = protocol.server @servers[protocol.protocol] = { server: server, port: @protocols[protocol], start: protocol.start_block } @channels[protocol.protocol] = EM::Channel.new end initialize_servers initialize_subscribers initialize_agents end
initialize_servers()
click to toggle source
Initialize servers for each protocol
# File lib/genesis/reactor.rb, line 79 def initialize_servers @protocols.each do |protocol, _| server = @servers[protocol.protocol] block = server[:start] server[:server].start(server[:port], @routes[protocol.protocol], views: @views, channel: @channels[protocol.protocol], debug: @debug, &block) end end
initialize_sighandlers()
click to toggle source
Initialize signal handlers to cleanly shutdown
# File lib/genesis/reactor.rb, line 71 def initialize_sighandlers trap(:INT) do stop exit end end
initialize_subscribers()
click to toggle source
Set up subscriptions to channels
# File lib/genesis/reactor.rb, line 116 def initialize_subscribers @subscribers.each do |type, subscribers| channel = @channels[type] if channel subscribers.each do |subscriber| channel.subscribe do |message| EM.defer { subscriber.call(message) } end end end end end
initialize_threadpool()
click to toggle source
Sets the initial size of the threadpool
# File lib/genesis/reactor.rb, line 100 def initialize_threadpool EM.threadpool_size = @poolsize end
register_agent(protocol, method)
click to toggle source
Registers an agent for a given protocol
# File lib/genesis/reactor.rb, line 165 def register_agent(protocol, method) @agents[protocol] ||= [] @agents[protocol] << method end
register_agents(agents)
click to toggle source
Registers all agenst
# File lib/genesis/reactor.rb, line 143 def register_agents(agents) agents.each do |agent| (agent.agents || {}).each do |data| register_agent(agent.protocol, data) end end end
register_handlers(handlers)
click to toggle source
Registers all handlers
# File lib/genesis/reactor.rb, line 130 def register_handlers(handlers) handlers.each do |handler| (handler.routes || {}).each do |match, data| register_route(handler.protocol, data[:verb], match, data[:opts], data[:block]) end (handler.subscribers || []).each do |data| register_subscriber(handler.protocol, data[:block]) end end end
register_route(protocol, verb, match, args, block)
click to toggle source
Registers a route for a given protocol
# File lib/genesis/reactor.rb, line 152 def register_route(protocol, verb, match, args, block) @routes[protocol] ||= {} @routes[protocol][verb] ||= {} @routes[protocol][verb][match] = { block: block, args: args } end
register_subscriber(protocol, block)
click to toggle source
Registers a handler for a given protocol
# File lib/genesis/reactor.rb, line 159 def register_subscriber(protocol, block) @subscribers[protocol] ||= [] @subscribers[protocol] << block end
reset()
click to toggle source
Reset / initialize instance variables
# File lib/genesis/reactor.rb, line 61 def reset @protocols = {} @servers = {} @routes = {} @subscribers = {} @agents = {} @channels = {} end