class RFlow::Shard::Worker
An actual child process under the {Shard}, which coordinates a set of identical {Worker}s.
Attributes
index[R]
Which worker index this is (for example, in a set of 3 {Worker}s, one would have index 0, one would have index 1, one would have index 2). @return [Integer]
shard[R]
A reference to the {Shard} governing this {Worker}. @return [Shard]
Public Class Methods
new(shard, index = 1)
click to toggle source
Calls superclass method
RFlow::ChildProcess::new
# File lib/rflow/shard.rb, line 23 def initialize(shard, index = 1) super("#{shard.name}-#{index}", 'Worker') @shard = shard @index = index # build at initialize time to fail fast @components = shard.config.components.map {|config| Component.build(self, config) } end
Public Instance Methods
run_process()
click to toggle source
Configure, connect, and actually start running RFlow
components. @return [void]
# File lib/rflow/shard.rb, line 34 def run_process EM.run do begin # TODO: Monitor the master configure_components! connect_components! # TODO: need to do proper node synchronization for ZMQ to remove sleep sleep 1 run_components! rescue Exception => e RFlow.logger.error "Error in worker, shutting down: #{e.class.name}: #{e.message}, because: #{e.backtrace.inspect}" exit! 1 end end RFlow.logger.info 'Shutting down worker after EM stopped' end
shutdown!(signal)
click to toggle source
Shut down the {Worker}. Shuts down each component and kills EventMachine. @return [void]
Calls superclass method
RFlow::ChildProcess#shutdown!
# File lib/rflow/shard.rb, line 86 def shutdown!(signal) RFlow.logger.debug 'Shutting down components' @components.each do |component| RFlow.logger.debug "Shutting down component '#{component.name}' (#{component.uuid})" component.shutdown! end EM.stop_event_loop super end
Protected Instance Methods
configure_components!()
click to toggle source
# File lib/rflow/shard.rb, line 53 def configure_components! RFlow.logger.debug 'Configuring components' @components.zip(shard.config.components.map(&:options)).each do |(component, config)| RFlow.logger.debug "Configuring component '#{component.name}' (#{component.uuid})" component.configure! config end end
connect_components!()
click to toggle source
Connect all inputs before all outputs, so connection types that require a 'server' to be established before a 'client' can connect can get themselves ready.
# File lib/rflow/shard.rb, line 63 def connect_components! RFlow.logger.debug 'Connecting components' @components.each do |component| RFlow.logger.debug "Connecting inputs for component '#{component.name}' (#{component.uuid})" component.connect_inputs! end @components.each do |component| RFlow.logger.debug "Connecting outputs for component '#{component.name}' (#{component.uuid})" component.connect_outputs! end end
run_components!()
click to toggle source
# File lib/rflow/shard.rb, line 75 def run_components! RFlow.logger.debug 'Running components' @components.each do |component| RFlow.logger.debug "Running component '#{component.name}' (#{component.uuid})" component.run! end end