class Arachni::RPC::Server::Dispatcher

Dispatches RPC Instances on demand providing a centralized environment for multiple clients and allows for extensive process monitoring.

The process goes something like this:

Once the client finishes using the RPC Instance he must shut it down otherwise the system will be eaten away by zombie RPC Instance processes.

@author Tasos “Zapotek” Laskos <tasos.laskos@arachni-scanner.com>

Constants

SERVICE_NAMESPACE

Public Class Methods

new( options = Options.instance ) click to toggle source
# File lib/arachni/rpc/server/dispatcher.rb, line 46
def initialize( options = Options.instance )
    @options = options

    @options.dispatcher.external_address ||= @options.rpc.server_address
    @options.snapshot.save_path          ||= @options.paths.snapshots

    @server = Base.new( @options )
    @server.logger.level = @options.datastore.log_level if @options.datastore.log_level

    @server.add_async_check do |method|
        # methods that expect a block are async
        method.parameters.flatten.include? :block
    end

    @url = "#{@options.dispatcher.external_address}:#{@options.rpc.server_port}"

    # let the instances in the pool know who to ask for routing instructions
    # when we're in grid mode.
    @options.datastore.dispatcher_url = @url

    prep_logging

    print_status 'Starting the RPC Server...'

    @server.add_handler( 'dispatcher', self )

    # trap interrupts and exit cleanly when required
    trap_interrupts { shutdown }

    @jobs          = []
    @consumed_pids = []
    @pool          = Reactor.global.create_queue

    print_status "Populating the pool with #{@options.dispatcher.pool_size}  Instances."
    if @options.dispatcher.pool_size > 0
        @options.dispatcher.pool_size.times { add_instance_to_pool( false ) }
    end

    print_status 'Waiting for Instances to come on-line.'

    # Check up on the pool and start the server once it has been filled.
    Reactor.global.at_interval( 0.1 ) do |task|
        print_debug "Instances: #{@pool.size}/#{@options.dispatcher.pool_size}"
        next if @options.dispatcher.pool_size != @pool.size
        task.done

        print_status 'Instances are on-line.'

        _services.each do |name, service|
            @server.add_handler( name, service.new( @options, self ) )
        end

        @node = Node.new( @options, @logfile )
        @server.add_handler( 'node', @node )

        run
    end
end

Private Class Methods

_services() click to toggle source
# File lib/arachni/rpc/server/dispatcher.rb, line 272
def self._services
    @services ||= nil
    return @services if @services

    @services = Component::Manager.new( Options.paths.services, SERVICE_NAMESPACE )
    @services.load_all
    @services
end

Public Instance Methods

alive?() click to toggle source

@return [TrueClass]

true
# File lib/arachni/rpc/server/dispatcher.rb, line 111
def alive?
    @server.alive?
end
dispatch( owner = 'unknown', helpers = {}, load_balance = true, &block ) click to toggle source

Dispatches an {Instance} from the pool.

@param [String] owner

An owner to assign to the {Instance}.

@param [Hash] helpers

Hash of helper data to be added to the job.

@param [Boolean] load_balance

Return an {Instance} from the least burdened {Dispatcher} (when in Grid mode)
or from this one directly?

@return [Hash, false, nil]

Depending on availability:

* `Hash`: Includes URL, owner, clock info and proc info.
* `false`: Pool is currently empty, check back again in a few seconds.
* `nil`: The {Dispatcher} was configured with a pool-size of `0`.
# File lib/arachni/rpc/server/dispatcher.rb, line 155
def dispatch( owner = 'unknown', helpers = {}, load_balance = true, &block )
    if load_balance && @node.grid_member?
        preferred do |url|
            connect_to_peer( url ).dispatch( owner, helpers, false, &block )
        end
        return
    end

    if @options.dispatcher.pool_size <= 0
        block.call nil
        return
    end

    if @pool.empty?
        block.call false
    else
        @pool.pop do |cjob|
            cjob['owner']     = owner.to_s
            cjob['starttime'] = Time.now.to_s
            cjob['helpers']   = helpers

            print_status "Instance dispatched -- PID: #{cjob['pid']} - " +
                "Port: #{cjob['port']} - Owner: #{cjob['owner']}"

            @jobs << cjob
            block.call cjob
        end
    end

    Reactor.global.schedule { add_instance_to_pool }
end
finished_jobs() click to toggle source

@return [Array<Hash>]

Returns info for all finished jobs.

@see jobs

# File lib/arachni/rpc/server/dispatcher.rb, line 226
def finished_jobs
    jobs.reject { |job| job['alive'] }
end
job( pid ) click to toggle source

Returns proc info for a given pid

@param [Fixnum] pid

@return [Hash]

# File lib/arachni/rpc/server/dispatcher.rb, line 192
def job( pid )
    @jobs.each do |j|
        next if j['pid'] != pid
        cjob = j.dup

        currtime = Time.now

        cjob['currtime'] = currtime.to_s
        cjob['age']      = currtime - Time.parse( cjob['birthdate'] )
        cjob['runtime']  = currtime - Time.parse( cjob['starttime'] )
        cjob['alive']    = Arachni::Processes::Manager.alive?( pid )

        return cjob
    end
end
jobs() click to toggle source

@return [Array<Hash>]

Returns info for all jobs.
# File lib/arachni/rpc/server/dispatcher.rb, line 210
def jobs
    @jobs.map { |cjob| job( cjob['pid'] ) }.compact
end
log() click to toggle source

@return [String]

Contents of the log file
# File lib/arachni/rpc/server/dispatcher.rb, line 261
def log
    IO.read prep_logging
end
pid() click to toggle source

@private

# File lib/arachni/rpc/server/dispatcher.rb, line 266
def pid
    Process.pid
end
preferred( &block ) click to toggle source

@return [String]

URL of the least burdened Dispatcher. If not a grid member it will
return this Dispatcher's URL.
# File lib/arachni/rpc/server/dispatcher.rb, line 118
def preferred( &block )
    if !@node.grid_member?
        block.call @url
        return
    end

    each = proc do |neighbour, iter|
        connect_to_peer( neighbour ).workload_score do |score|
            iter.return (!score || score.rpc_exception?) ? nil : [neighbour, score]
        end
    end

    after = proc do |nodes|
        nodes.compact!
        nodes << [@url, workload_score]
        block.call nodes.sort_by { |_, score| score }[0][0]
    end

    Reactor.global.create_iterator( @node.neighbours ).map( each, after )
end
running_jobs() click to toggle source

@return [Array<Hash>]

Returns info for all running jobs.

@see jobs

# File lib/arachni/rpc/server/dispatcher.rb, line 218
def running_jobs
    jobs.select { |job| job['alive'] }
end
services() click to toggle source
# File lib/arachni/rpc/server/dispatcher.rb, line 105
def services
    _services.keys
end
statistics() click to toggle source

@return [Hash]

Returns server stats regarding the jobs and pool.
# File lib/arachni/rpc/server/dispatcher.rb, line 243
def statistics
    stats_h = {
        'running_jobs'   => running_jobs,
        'finished_jobs'  => finished_jobs,
        'init_pool_size' => @options.dispatcher.pool_size,
        'curr_pool_size' => @pool.size,
        'consumed_pids'  => @consumed_pids,
        'snapshots'      => Dir.glob( "#{@options.snapshot.save_path}*.afs" )
    }

    stats_h.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours )
    stats_h['node']['score']  = workload_score

    stats_h
end
workload_score() click to toggle source

@return [Float]

Workload score for this Dispatcher, calculated using the number
of {#running_jobs} and the configured node weight.

Lower is better.
# File lib/arachni/rpc/server/dispatcher.rb, line 235
def workload_score
    score = (running_jobs.size + 1).to_f
    score *= @node.info['weight'].to_f if @node.info['weight']
    score
end

Private Instance Methods

_services() click to toggle source
# File lib/arachni/rpc/server/dispatcher.rb, line 281
def _services
    self.class._services
end
add_instance_to_pool( one_at_a_time = true ) click to toggle source
# File lib/arachni/rpc/server/dispatcher.rb, line 328
def add_instance_to_pool( one_at_a_time = true )
    return if @operation_in_progress && one_at_a_time
    @operation_in_progress = true

    owner = 'dispatcher'
    port  = Utilities.available_port
    token = Utilities.generate_token

    pid = Processes::Manager.spawn( :instance, port: port, token: token )
    Process.detach( pid )
    @consumed_pids << pid

    print_status "Instance added to pool -- PID: #{pid} - " +
        "Port: #{port} - Owner: #{owner}"

    # Wait until the Instance has booted before adding it to the pool.
    Client::Instance.when_ready( "#{@options.rpc.server_address}:#{port}", token ) do
        @operation_in_progress = false

        @pool << {
            'token'     => token,
            'pid'       => pid,
            'port'      => port,
            'url'       => "#{@options.dispatcher.external_address}:#{port}",
            'owner'     => owner,
            'birthdate' => Time.now.to_s
        }
    end
end
connect_to_peer( url ) click to toggle source
# File lib/arachni/rpc/server/dispatcher.rb, line 364
def connect_to_peer( url )
    @rpc_clients ||= {}
    @rpc_clients[url] ||= Client::Dispatcher.new( @options, url )
end
kill( pid ) click to toggle source
# File lib/arachni/rpc/server/dispatcher.rb, line 319
def kill( pid )
    begin
        10.times { Process.kill( 'KILL', pid ) }
        return false
    rescue Errno::ESRCH
        return true
    end
end
prep_logging() click to toggle source
# File lib/arachni/rpc/server/dispatcher.rb, line 358
def prep_logging
    # reroute all output to a logfile
    @logfile ||= reroute_to_file( @options.paths.logs +
        "/Dispatcher - #{Process.pid}-#{@options.rpc.server_port}.log" )
end
run() click to toggle source

Starts the dispatcher’s server

# File lib/arachni/rpc/server/dispatcher.rb, line 292
def run
    Reactor.global.on_error do |_, e|
        print_error "Arachni::Reactor: #{e}"

        e.backtrace.each do |l|
            print_error "Arachni::Reactor: #{l}"
        end
    end

    print_status 'Ready'
    @server.start
rescue => e
    print_exception e

    $stderr.puts "Could not start server, for details see: #{@logfile}"

    # If the server fails to start kill the pool Instances
    # to prevent zombie processes.
    @consumed_pids.each { |p| kill p }
    exit 1
end
shutdown() click to toggle source
# File lib/arachni/rpc/server/dispatcher.rb, line 314
def shutdown
    print_status 'Shutting down...'
    Arachni::Reactor.global.stop
end
struct_to_h( struct ) click to toggle source
# File lib/arachni/rpc/server/dispatcher.rb, line 369
def struct_to_h( struct )
    hash = {}
    return hash if !struct

    struct.each_pair do |k, v|
        v = v.to_s if v.is_a?( Bignum ) || v.is_a?( Fixnum )
        hash[k.to_s] = v
    end

    hash
end
trap_interrupts( &block ) click to toggle source
# File lib/arachni/rpc/server/dispatcher.rb, line 285
def trap_interrupts( &block )
    %w(QUIT INT).each do |signal|
        trap( signal, &block || Proc.new{ } ) if Signal.list.has_key?( signal )
    end
end