class RFlow::Master
The master/watchdog process for RFlow
. Mostly exists to receive SIGCHLD
from subprocesses so it can kill them all with SIGQUIT
and get restarted.
Attributes
brokers[R]
The {Broker}s being managed by the {Master}. @return [Array<Broker>]
shards[R]
The {Shard}s being managed by the {Master}. @return [Array<Shard>]
Public Class Methods
new(config)
click to toggle source
Calls superclass method
RFlow::DaemonProcess::new
# File lib/rflow/master.rb, line 17 def initialize(config) super(config['rflow.application_name'], 'Master', pid_file_path: config['rflow.pid_file_path']) @shards = config.shards.map {|config| Shard.new(config) } RFlow.logger.context_width = @shards.flat_map(&:workers).map(&:name).map(&:length).max @brokers = config.connections.flat_map(&:brokers).map {|config| Broker.build(config) } end
Public Instance Methods
run_process()
click to toggle source
Override that starts EventMachine and waits until it gets stopped. @return [void]
# File lib/rflow/master.rb, line 44 def run_process EM.run do # TODO: Monitor the workers end end
spawn_subprocesses()
click to toggle source
Override of {spawn_subprocesses} that actually spawns them, then calls {Shard#run!} on each. @return [void]
# File lib/rflow/master.rb, line 27 def spawn_subprocesses RFlow.logger.debug "Running #{brokers.count} brokers" if brokers.count > 0 brokers.each(&:spawn!) RFlow.logger.debug "#{brokers.count} brokers started: #{brokers.map { |w| "#{w.name} (#{w.pid})" }.join(', ')}" if brokers.count > 0 shards.each(&:run!) end
subprocesses()
click to toggle source
Override of {subprocesses} that includes the {Broker}s and every {Shard::Worker} of every {Shard}. @return [Array<ChildProcess>]
# File lib/rflow/master.rb, line 38 def subprocesses brokers + shards.flat_map(&:workers) end