class RFlow::Master

The master/watchdog process for RFlow. Mostly exists to receive SIGCHLD from subprocesses so it can kill them all with SIGQUIT and get restarted.

Attributes

brokers[R]

The {Broker}s being managed by the {Master}. @return [Array<Broker>]

shards[R]

The {Shard}s being managed by the {Master}. @return [Array<Shard>]

Public Class Methods

new(config) click to toggle source
Calls superclass method RFlow::DaemonProcess::new
# File lib/rflow/master.rb, line 17
def initialize(config)
  super(config['rflow.application_name'], 'Master', pid_file_path: config['rflow.pid_file_path'])
  @shards = config.shards.map {|config| Shard.new(config) }
  RFlow.logger.context_width = @shards.flat_map(&:workers).map(&:name).map(&:length).max
  @brokers = config.connections.flat_map(&:brokers).map {|config| Broker.build(config) }
end

Public Instance Methods

run_process() click to toggle source

Override that starts EventMachine and waits until it gets stopped. @return [void]

# File lib/rflow/master.rb, line 44
def run_process
  EM.run do
    # TODO: Monitor the workers
  end
end
spawn_subprocesses() click to toggle source

Override of {spawn_subprocesses} that actually spawns them, then calls {Shard#run!} on each. @return [void]

# File lib/rflow/master.rb, line 27
def spawn_subprocesses
  RFlow.logger.debug "Running #{brokers.count} brokers" if brokers.count > 0
  brokers.each(&:spawn!)
  RFlow.logger.debug "#{brokers.count} brokers started: #{brokers.map { |w| "#{w.name} (#{w.pid})" }.join(', ')}" if brokers.count > 0

  shards.each(&:run!)
end
subprocesses() click to toggle source

Override of {subprocesses} that includes the {Broker}s and every {Shard::Worker} of every {Shard}. @return [Array<ChildProcess>]

# File lib/rflow/master.rb, line 38
def subprocesses
  brokers + shards.flat_map(&:workers)
end