module Arachni::RPC::Server::Framework::Distributor

Contains utility methods used to connect to instances and dispatchers and split and distribute the workload.

@author Tasos “Zapotek” Laskos <tasos.laskos@arachni-scanner.com>

Constants

MAX_CONCURRENCY

Maximum concurrency when communicating with instances.

Means that you should connect to MAX_CONCURRENCY instances at a time while iterating through them.

Public Instance Methods

connect_to_instance( instance ) click to toggle source

Connects to a remote Instance.

@param [Hash] instance

The hash must hold the `'url'` and the `'token'`. In subsequent calls
the `'token'` can be omitted.
# File lib/arachni/rpc/server/framework/distributor.rb, line 29
def connect_to_instance( instance )
    instance = instance.my_symbolize_keys
    @instance_connections ||= {}

    if @instance_connections[instance[:url]]
        return @instance_connections[instance[:url]]
    end

    @tokens ||= {}
    @tokens[instance[:url]] = instance[:token] if instance[:token]
    @instance_connections[instance[:url]] =
        RPC::Client::Instance.new( options, instance[:url], @tokens[instance[:url]] )
end
each_slave( foreach = nil, after = nil, &block ) click to toggle source

@param [Proc] foreach

Invoked once for each slave instance.

@param [Proc] after

Invoked after the iteration has completed.

@param [Proc] block

Invoked once for each slave instance.
# File lib/arachni/rpc/server/framework/distributor.rb, line 60
def each_slave( foreach = nil, after = nil, &block )
    foreach ||= block
    wrapped_foreach = proc do |instance, iterator|
        foreach.call( connect_to_instance( instance ), iterator )
    end
    slave_iterator.each( *[wrapped_foreach, after] )
end
iterator_for( arr ) click to toggle source

@param [Array] arr

@return [Arachni::Reactor::Iterator]

Iterator for the provided array.
# File lib/arachni/rpc/server/framework/distributor.rb, line 78
def iterator_for( arr )
    Reactor.global.create_iterator( arr, MAX_CONCURRENCY )
end
map_slaves( foreach, after ) click to toggle source

@param [Proc] foreach

Invoked once for each slave instance and an array from the returned values.

@param [Proc] after

To handle the resulting array.
# File lib/arachni/rpc/server/framework/distributor.rb, line 47
def map_slaves( foreach, after )
    wrap = proc do |instance, iterator|
        foreach.call( connect_to_instance( instance ), iterator )
    end
    slave_iterator.map( wrap, after )
end
slave_iterator() click to toggle source

@return [Arachni::Reactor::Iterator]

Iterator for all slave instances.
# File lib/arachni/rpc/server/framework/distributor.rb, line 70
def slave_iterator
    iterator_for( @slaves )
end

Private Instance Methods

build_element_list( page ) click to toggle source
# File lib/arachni/rpc/server/framework/distributor.rb, line 278
def build_element_list( page )
    filter_elements( page.elements_within_scope )
end
calculate_workload_size( maximum ) click to toggle source

@param [Integer] maximum

Maximum allowed workload, to be returned in case the calculation (based
on the amount of {#preferred_slaves}) exceeds it.

@see preferred_slaves @see split_page_workload @see distribute_page_workload

# File lib/arachni/rpc/server/framework/distributor.rb, line 91
def calculate_workload_size( maximum )
    [10 * (preferred_slaves.size + 1), maximum].min
end
clear_filters() click to toggle source
# File lib/arachni/rpc/server/framework/distributor.rb, line 302
def clear_filters
    distributed_elements.clear
    distributed_pages.clear
end
connect_to_dispatcher( url ) click to toggle source
# File lib/arachni/rpc/server/framework/distributor.rb, line 467
def connect_to_dispatcher( url )
    @dispatcher_connections ||= {}
    @dispatcher_connections[url] ||= RPC::Client::Dispatcher.new( options, url )
end
dispatcher() click to toggle source
# File lib/arachni/rpc/server/framework/distributor.rb, line 472
def dispatcher
    return if !options.datastore.dispatcher_url
    connect_to_dispatcher( options.datastore.dispatcher_url )
end
distribute_page_workload( pages, &block ) click to toggle source

@param [Array<Page>] pages

Page workload to be {#split_page_workload split} and distributed to
the master (`self`) and the {#preferred_slaves}.

@param [Block] block

Block to be called with the next page for the master (`self`), giving
us a chance to preload it for a smoother audit.
# File lib/arachni/rpc/server/framework/distributor.rb, line 101
def distribute_page_workload( pages, &block )
    workloads = split_page_workload( pages.compact )
    return if workloads.empty?

    instances = preferred_slaves

    # Grab our chunk of the pages from the last slot (which could be empty
    # if there's not enough workload), since we've got the added burden of
    # performing browser analysis while the slaves do not...
    self_workload = workloads.delete_at( instances.size ) || []

    # ... and allow us to preload the next page from it...
    block.call self_workload.pop if block_given?

    # ...and just push the rest to be audited ASAP.
    self_workload.each { |page| push_to_distributed_page_queue( page ) }

    # Assign the rest of the workload amongst the slaves.
    workloads.each.with_index do |workload, i|
        # We won't see these pages again so this is our only chance to
        # process them.
        (workload - self_workload).each do |p|
            add_to_sitemap( p )
            # Push any new resources to the audit queue.
            push_paths_from_page( p ) if crawl?
            perform_browser_analysis p
        end

        # Assign the workload to the slave.
        connect_to_instance( instances[i] ).
            framework.process_pages( workload.map(&:to_rpc_data) ) do
                # Slave got workload, remove it from the 'done' list.
                mark_slave_as_not_done instances[i][:url]
            end
    end
end
distributed_elements() click to toggle source
# File lib/arachni/rpc/server/framework/distributor.rb, line 294
def distributed_elements
    state.rpc.distributed_elements
end
distributed_pages() click to toggle source
# File lib/arachni/rpc/server/framework/distributor.rb, line 298
def distributed_pages
    state.rpc.distributed_pages
end
dump_workload_to_console( workload ) click to toggle source
# File lib/arachni/rpc/server/framework/distributor.rb, line 196
def dump_workload_to_console( workload )
    find_by_id = proc do |page, id|
        page.elements.find { |e| e.persistent_hash == id }
    end

    distributed = []
    workload.map do |page_chunks|
        c = page_chunks.map do |p|
            elements = p.audit_whitelist.to_a.map do |id|
                e = find_by_id.call( p, id )
                { id => (e.coverage_id if e) }
            end

            [p.url, elements]
        end
        distributed << Hash[c]
    end

    ap distributed
end
filter_elements( elements ) click to toggle source
# File lib/arachni/rpc/server/framework/distributor.rb, line 282
def filter_elements( elements )
    elements.map do |e|
        next if !(e.inputs.any? || (e.respond_to?( :dom ) && e.dom && e.dom.inputs.any?))

        id = e.persistent_hash
        next if distributed_elements.include?( id )
        distributed_elements << id

        e
    end.compact.uniq
end
filter_elements_from_pages( pages ) click to toggle source

@param [Array<Pages>] pages @return [Array<Element::Capabilities::Auditable>]

Flat list of all unique and previously un-seen elements from the given
`pages`.
# File lib/arachni/rpc/server/framework/distributor.rb, line 274
def filter_elements_from_pages( pages )
    pages.map { |page| build_element_list( page ) }.flatten
end
has_idle_slaves?() click to toggle source

@return [Bool]

`true` if there are slaves that have finished their assigned workload,
`false` otherwise.
# File lib/arachni/rpc/server/framework/distributor.rb, line 227
def has_idle_slaves?
    synchronize { @done_slaves.any? }
end
initialize_slaves( &block ) click to toggle source

Configures and initializes slave instances.

@param [Block] block

Block to be called once the slaves are ready to receive workload.
# File lib/arachni/rpc/server/framework/distributor.rb, line 366
def initialize_slaves( &block )
    slave_options = prepare_slave_options

    foreach = proc do |slave, iterator|
        slave.service.scan( slave_options ) do
            # Workload will actually be distributed later on so mark it as
            # done by default, i.e. available for work.
            mark_slave_as_done slave.url
            iterator.next
        end
    end

    each_slave( foreach, block )
end
mark_slave_as_done( url ) click to toggle source

@param [String] url

Slave to mark as done, by RPC URL.
# File lib/arachni/rpc/server/framework/distributor.rb, line 242
def mark_slave_as_done( url )
    synchronize { @done_slaves << url }
end
mark_slave_as_not_done( url ) click to toggle source

@param [String] url

Slave to mark as not done, by RPC URL.
# File lib/arachni/rpc/server/framework/distributor.rb, line 248
def mark_slave_as_not_done( url )
    synchronize { @done_slaves.delete url }
end
merge_statistics( stats ) click to toggle source

@param [Array<Hash>] stats

Array of {Framework#statistics} to merge.

@return [Hash]

Hash with the values of all passed statistics appropriately merged.
# File lib/arachni/rpc/server/framework/distributor.rb, line 409
def merge_statistics( stats )
    merged_statistics = stats.pop.dup

    return {} if !merged_statistics || merged_statistics.empty?
    return merged_statistics if stats.empty?

    merged_statistics[:current_pages] = []

    if merged_statistics[:current_page]
        merged_statistics[:current_pages] << merged_statistics[:current_page]
    end

    sum = [
        :request_count,
        :response_count,
        :time_out_count,
        :total_responses_per_second,
        :burst_response_time_sum,
        :burst_response_count,
        :burst_responses_per_second,
        :max_concurrency
    ]

    average = [
        :burst_average_response_time,
        :total_average_response_time
    ]

    integers = [:max_concurrency, :request_count, :response_count, :time_out_count,
        :burst_response_count]

    begin
        stats.each do |instats|
            (sum | average).each do |k|
                merged_statistics[:http][k] += Float( instats[:http][k] )
            end

            merged_statistics[:current_pages] << instats[:current_page] if instats[:current_page]
        end

        average.each do |k|
            merged_statistics[:http][k] /= Float( stats.size + 1 )
            merged_statistics[:http][k] = Float( sprintf( '%.2f', merged_statistics[:http][k] ) )
        end

        integers.each do |k|
            merged_statistics[:http][k] = merged_statistics[:http][k].to_i
        end
    rescue => e
        ap e
        ap e.backtrace
    end

    merged_statistics.delete :current_page

    merged_statistics
end
pick_dispatchers( dispatchers ) click to toggle source

Picks the dispatchers to use based on their load balancing metrics and the instructed maximum amount of slaves.

# File lib/arachni/rpc/server/framework/distributor.rb, line 357
def pick_dispatchers( dispatchers )
    dispatchers = dispatchers.sort_by { |d| d['node']['score'] }
    options.spawns > 0 ? dispatchers[0...options.spawns] : dispatchers
end
preferred_dispatchers( &block ) click to toggle source

@param [Block] block

Block to be passed the Dispatchers that have different Pipe IDs -- i.e
can be setup in HPG mode; pretty simple at this point.
# File lib/arachni/rpc/server/framework/distributor.rb, line 310
def preferred_dispatchers( &block )
    if !dispatcher
        block.call []
        return
    end

    # To keep track of the Pipe IDs we've used.
    @used_pipe_ids ||= []

    foreach = proc do |dispatcher, iter|
        connect_to_dispatcher( dispatcher['url'] ).statistics do |res|
            if !res.rpc_exception?
                iter.return( res )
            else
                iter.return( nil )
            end
        end
    end

    # Get the Dispatchers with unique Pipe IDs and pass them to the given block.
    after = proc do |reachable_dispatchers|
        pref_dispatcher_urls = []
        pick_dispatchers( reachable_dispatchers.compact ).each do |dispatcher|
            next if @used_pipe_ids.include?( dispatcher['node']['pipe_id'] )

            @used_pipe_ids       << dispatcher['node']['pipe_id']
            pref_dispatcher_urls << dispatcher['node']['url']
        end

        block.call( pref_dispatcher_urls )
    end

    # Get the info of the local dispatcher since this will be our frame of
    # reference.
    dispatcher.node.info do |info|
        # Add the Pipe ID of the local Dispatcher in order to avoid it later on.
        @used_pipe_ids << info['pipe_id']

        # Grab and process the rest of the Grid Dispatchers.
        dispatcher.node.neighbours_with_info do |dispatchers|
            iterator_for( dispatchers ).map( foreach, after )
        end
    end
end
preferred_slaves() click to toggle source

@note Check with {#has_idle_slaves?} first, if you only want to get

slaves which are idle. This method assumes that the workload needs
somewhere to go immediately.

@return [Array<Hash>]

Connection info for the currently {#slave_done? done slaves}.
If all slaves are busy, all are returned.
# File lib/arachni/rpc/server/framework/distributor.rb, line 265
def preferred_slaves
    instances = @slaves.select { |info| slave_done? info[:url] }
    instances.any? ? instances : @slaves
end
prepare_slave_options() click to toggle source

@return [Hash]

Options suitable to be passed as a configuration to slaves.

Removes options that shouldn't be set like `spawns`, etc.

Finally, it sets the master's privilege token so that the slave can
report back to us.
# File lib/arachni/rpc/server/framework/distributor.rb, line 388
def prepare_slave_options
    options = @options.to_rpc_data

    # Don't let the slaves run plugins that are not meant to be distributed.
    if options['plugins']
        options['plugins'].reject! { |k, _| !@plugins[k].distributable? }
    end

    options['datastore'].delete( 'dispatcher_url' )
    options['datastore'].delete( 'token' )

    options['datastore']['master_priv_token'] = @local_token

    options
end
slave_done?( url ) click to toggle source

@param [String] url

Slave RPC URL.

@return [Bool]

`true` if the slave has finished its assigned workload, `false` otherwise.
# File lib/arachni/rpc/server/framework/distributor.rb, line 236
def slave_done?( url )
    synchronize { @done_slaves.include? url }
end
slave_urls() click to toggle source

@return [Array<String>]

Slave RPC URLs.
# File lib/arachni/rpc/server/framework/distributor.rb, line 254
def slave_urls
    @slaves.map { |info| info[:url] }
end
slaves_done?() click to toggle source

@return [Bool]

`true` if all slaves have reported that they've finished their assigned
workload, `false` otherwise.
# File lib/arachni/rpc/server/framework/distributor.rb, line 220
def slaves_done?
    synchronize { slave_urls.sort == @done_slaves.to_a.sort }
end
split_page_workload( pages ) click to toggle source

@param [Array<Page>] pages

Page workload to be split for {#distribute_page_workload distribution}
based on the amount of {#preferred_slaves}.

@return [Array<Array<Page>>]

Chunks of pages (with {#Page#audit_whitelist} configured) for each
instance.

Distribution is per-element and not per-page, that is, the focus is placed
on each chunk having an equal amount of element workload. Thus, if a page
needs to be split up, it will be.

If there are new pages without unseen elements, they will be equally
distributed but their elements will {#Page#do_not_audit_elements not be audited}.
This is because we still need passive checks and browser analysis to
seem them.
# File lib/arachni/rpc/server/framework/distributor.rb, line 154
def split_page_workload( pages )
    # Split elements in chunks for each instance and setup audit restrictions
    # for the relevant pages.
    #
    # The pages should contain all their original elements to maintain their
    # integrity, with the elements which should be audited explicitly white-listed.
    workload  = []
    filter_elements_from_pages( pages ).chunk( preferred_slaves.size + 1 ).
        each_with_index do |elements, i|
            workload[i] ||= {}

            elements.each do |element|
                workload[i][element.page] ||= element.page.dup
                workload[i][element.page].update_element_audit_whitelist element

                if element.respond_to?(:dom) && element.dom
                    workload[i][element.page].update_element_audit_whitelist element.dom
                end

                distributed_pages << element.page
            end

            workload[i] = workload[i].values
        end

    missed_pages = pages.select { |page| !distributed_pages.include? page }

    # Some pages may not have any elements but they still need to be seen in
    # order to be passed to passive checks and be analyzed by the browser
    # cluster.
    missed_pages.chunk( preferred_slaves.size + 1 ).
        each_with_index do |page_chunks, i|
            workload[i] ||= []
            workload[i] |= page_chunks.each(&:do_not_audit_elements)
        end

    workload.reject!(&:empty?)

    #dump_workload_to_console( workload )
    workload
end