class RFlow::Shard::Worker

An actual child process under the {Shard}, which coordinates a set of identical {Worker}s.

Attributes

index[R]

Which worker index this is (for example, in a set of 3 {Worker}s, one would have index 0, one would have index 1, one would have index 2). @return [Integer]

shard[R]

A reference to the {Shard} governing this {Worker}. @return [Shard]

Public Class Methods

new(shard, index = 1) click to toggle source
Calls superclass method RFlow::ChildProcess::new
# File lib/rflow/shard.rb, line 23
def initialize(shard, index = 1)
  super("#{shard.name}-#{index}", 'Worker')
  @shard = shard
  @index = index

  # build at initialize time to fail fast
  @components = shard.config.components.map {|config| Component.build(self, config) }
end

Public Instance Methods

run_process() click to toggle source

Configure, connect, and actually start running RFlow components. @return [void]

# File lib/rflow/shard.rb, line 34
def run_process
  EM.run do
    begin
      # TODO: Monitor the master
      configure_components!
      connect_components!
      # TODO: need to do proper node synchronization for ZMQ to remove sleep
      sleep 1
      run_components!
    rescue Exception => e
      RFlow.logger.error "Error in worker, shutting down: #{e.class.name}: #{e.message}, because: #{e.backtrace.inspect}"
      exit! 1
    end
  end

  RFlow.logger.info 'Shutting down worker after EM stopped'
end
shutdown!(signal) click to toggle source

Shut down the {Worker}. Shuts down each component and kills EventMachine. @return [void]

Calls superclass method RFlow::ChildProcess#shutdown!
# File lib/rflow/shard.rb, line 86
def shutdown!(signal)
  RFlow.logger.debug 'Shutting down components'
  @components.each do |component|
    RFlow.logger.debug "Shutting down component '#{component.name}' (#{component.uuid})"
    component.shutdown!
  end
  EM.stop_event_loop
  super
end

Protected Instance Methods

configure_components!() click to toggle source
# File lib/rflow/shard.rb, line 53
def configure_components!
  RFlow.logger.debug 'Configuring components'
  @components.zip(shard.config.components.map(&:options)).each do |(component, config)|
    RFlow.logger.debug "Configuring component '#{component.name}' (#{component.uuid})"
    component.configure! config
  end
end
connect_components!() click to toggle source

Connect all inputs before all outputs, so connection types that require a 'server' to be established before a 'client' can connect can get themselves ready.

# File lib/rflow/shard.rb, line 63
def connect_components!
  RFlow.logger.debug 'Connecting components'
  @components.each do |component|
    RFlow.logger.debug "Connecting inputs for component '#{component.name}' (#{component.uuid})"
    component.connect_inputs!
  end
  @components.each do |component|
    RFlow.logger.debug "Connecting outputs for component '#{component.name}' (#{component.uuid})"
    component.connect_outputs!
  end
end
run_components!() click to toggle source
# File lib/rflow/shard.rb, line 75
def run_components!
  RFlow.logger.debug 'Running components'
  @components.each do |component|
    RFlow.logger.debug "Running component '#{component.name}' (#{component.uuid})"
    component.run!
  end
end