class Arachni::Reactor

Reactor scheduler and and resource factory.

You're probably interested in:

* Getting access to a shared and {.global globally accessible Reactor} --
    that's probably what you want.
    * Rest of the class methods can be used to manage it.
* Creating resources like:
    * Cross-thread, non-blocking {#create_queue Queues}.
    * Asynchronous, concurrent {#create_iterator Iterators}.
    * Network connections to:
        * {#connect Connect} to a server.
        * {#listen Listen} for clients.
    * Tasks to be scheduled:
        * {#schedule As soon as possible}.
        * {#on_tick On every loop iteration}.
        * {#delay After a configured delay}.
        * {#at_interval Every few seconds}.
        * {#on_shutdown During shutdown}.

@author Tasos “Zapotek” Laskos <tasos.laskos@gmail.com>

Constants

DEFAULT_OPTIONS
VERSION

Attributes

connections[R]

@return [Array<Connection>]

{#attach Attached} connections.
max_tick_interval[RW]

@return [Integer,nil]

Amount of time to wait for a connection.
ticks[R]

@return [Integer]

Amount of ticks.

Public Class Methods

global() click to toggle source

@return [Reactor]

Lazy-loaded, globally accessible Reactor.
# File lib/arachni/reactor.rb, line 91
def global
    @reactor ||= Global.instance
end
jruby?() click to toggle source
# File lib/arachni/reactor.rb, line 118
def jruby?
    RUBY_PLATFORM == 'java'
end
new( options = {} ) click to toggle source

@param [Hash] options @option options [Integer,nil] :max_tick_interval (0.02)

How long to wait for each tick when no connections are available for
processing.

@option options [Integer] :select_timeout (0.02)

How long to wait for connection activity before continuing to the next
tick.
# File lib/arachni/reactor.rb, line 130
def initialize( options = {} )
    options = DEFAULT_OPTIONS.merge( options )

    @max_tick_interval = options[:max_tick_interval]
    @select_timeout    = options[:select_timeout]

    # Socket => Connection
    @connections = {}
    @stop        = false
    @ticks       = 0
    @thread      = nil
    @tasks       = Tasks.new

    @error_handlers = Tasks.new
    @shutdown_tasks = Tasks.new
    @done_signal    = ::Queue.new
end
stop() click to toggle source

Stops the {.global global Reactor} instance and destroys it. The next call to {.global} will return a new instance.

# File lib/arachni/reactor.rb, line 97
def stop
    return if !@reactor

    global.stop rescue Error::NotRunning

    # Admittedly not the cleanest solution, but that's the only way to
    # force a Singleton to re-initialize -- and we want the Singleton to
    # cleanly implement the pattern in a Thread-safe way.
    global.class.instance_variable_set(:@singleton__instance__, nil)

    @reactor = nil
end
supports_unix_sockets?() click to toggle source
# File lib/arachni/reactor.rb, line 110
def supports_unix_sockets?
    return false if jruby?

    !!UNIXSocket
rescue NameError
    false
end

Public Instance Methods

at_interval( interval, &block ) click to toggle source

@note Time accuracy cannot be guaranteed.

@param [Float] interval

Time in seconds.

@param [Block] block

Schedules a {Tasks::Periodic task} to be run at every `interval` seconds.

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 454
def at_interval( interval, &block )
    fail_if_not_running
    @tasks << Tasks::Periodic.new( interval, &block )
    nil
end
attach( connection ) click to toggle source

@note Will call {Connection::Callbacks#on_attach}.

{Connection#attach Attaches} a connection to the {Reactor} loop.

@param [Connection] connection

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 497
def attach( connection )
    return if attached? connection

    schedule do
        connection.reactor = self
        @connections[connection.to_io] = connection
        connection.on_attach
    end
end
attached?( connection ) click to toggle source

@return [Bool]

`true` if the connection is attached, `false` otherwise.
# File lib/arachni/reactor.rb, line 526
def attached?( connection )
    @connections.include? connection.to_io
end
connect( *args, &block ) click to toggle source

@note {Connection::Error Connection errors} will be passed to the `handler`'s

{Connection::Callbacks#on_close} method as a `reason` argument.

Connects to a peer.

@overload connect( host, port, handler = Connection, *handler_options )

@param    [String]    host
@param    [Integer]   port
@param    [Connection]   handler
    Connection handler, should be a subclass of {Connection}.
@param    [Hash]   handler_options
    Options to pass to the `#initialize` method of the `handler`.

@overload connect( unix_socket, handler = Connection, *handler_options )

@param    [String]    unix_socket
    Path to the UNIX socket to connect.
@param    [Connection]   handler
    Connection handler, should be a subclass of {Connection}.
@param    [Hash]   handler_options
    Options to pass to the `#initialize` method of the `handler`.

@return [Connection]

Connected instance of `handler`.

@raise (see fail_if_not_running) @raise (see fail_if_non_unix)

# File lib/arachni/reactor.rb, line 190
def connect( *args, &block )
    fail_if_not_running

    options = determine_connection_options( *args )

    connection = options[:handler].new( *options[:handler_options] )
    connection.reactor = self
    block.call connection if block_given?

    begin
        Connection::Error.translate do
            socket = options[:unix_socket] ?
                connect_unix( options[:unix_socket] ) : connect_tcp

            connection.configure options.merge( socket: socket, role: :client )
            attach connection
        end
    rescue Connection::Error => e
        connection.close e
    end

    connection
end
create_iterator( list, concurrency = 1 ) click to toggle source

@return [Reactor::Iterator]

New {Reactor::Iterator} with `self` as the scheduler.

@param [#to_a] list

List to iterate.

@param [Integer] concurrency

Parallel workers to spawn.
# File lib/arachni/reactor.rb, line 154
def create_iterator( list, concurrency = 1 )
    Reactor::Iterator.new( self, list, concurrency )
end
create_queue() click to toggle source

@return [Reactor::Queue]

New {Reactor::Queue} with `self` as the scheduler.
# File lib/arachni/reactor.rb, line 160
def create_queue
    Reactor::Queue.new self
end
delay( time, &block ) click to toggle source

@note Time accuracy cannot be guaranteed.

@param [Float] time

Time in seconds.

@param [Block] block

Schedules a {Tasks::Delayed task} to be run in `time` seconds.

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 468
def delay( time, &block )
    fail_if_not_running
    @tasks << Tasks::Delayed.new( time, &block )
    nil
end
detach( connection ) click to toggle source

@note Will call {Connection::Callbacks#on_detach}.

{Connection#detach Detaches} a connection from the {Reactor} loop.

@param [Connection] connection

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 514
def detach( connection )
    return if !attached?( connection )

    schedule do
        connection.on_detach
        @connections.delete connection.to_io
        connection.reactor = nil
    end
end
in_same_thread?() click to toggle source

@return [Bool]

`true` if the caller is in the same {#thread} as the {#run reactor loop},
`false` otherwise.

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 485
def in_same_thread?
    fail_if_not_running
    Thread.current == thread
end
listen( *args, &block ) click to toggle source

@note {Connection::Error Connection errors} will be passed to the `handler`'s

{Connection::Callbacks#on_close} method as a `reason` argument.

Listens for incoming connections.

@overload listen( host, port, handler = Connection, *handler_options )

@param    [String]    host
@param    [Integer]   port
@param    [Connection]   handler
    Connection handler, should be a subclass of {Connection}.
@param    [Hash]   handler_options
    Options to pass to the `#initialize` method of the `handler`.

@raise    [Connection::Error::HostNotFound]
    If the `host` is invalid.
@raise    [Connection::Error::Permission]
    If the `port` could not be opened due to a permission error.

@overload listen( unix_socket, handler = Connection, *handler_options )

@param    [String]    unix_socket
    Path to the UNIX socket to create.
@param    [Connection]   handler
    Connection handler, should be a subclass of {Connection}.
@param    [Hash]   handler_options
    Options to pass to the `#initialize` method of the `handler`.

@raise    [Connection::Error::Permission]
    If the `unix_socket` file could not be created due to a permission error.

@return [Connection]

Listening instance of `handler`.

@raise (see fail_if_not_running) @raise (see fail_if_non_unix)

# File lib/arachni/reactor.rb, line 248
def listen( *args, &block )
    fail_if_not_running

    options = determine_connection_options( *args )

    server_handler = proc do
        c = options[:handler].new( *options[:handler_options] )
        c.reactor = self
        block.call c if block_given?
        c
    end

    server = server_handler.call

    begin
        Connection::Error.translate do
            socket = options[:unix_socket] ?
                listen_unix( options[:unix_socket] ) :
                listen_tcp( options[:host], options[:port] )

            server.configure options.merge( socket: socket, role: :server, server_handler: server_handler )
            attach server
        end
    rescue Connection::Error => e
        server.close e
    end

    server
end
next_tick( &block ) click to toggle source

@param [Block] block

Schedules a {Tasks::OneOff task} to be run at the next tick.

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 440
def next_tick( &block )
    fail_if_not_running
    @tasks << Tasks::OneOff.new( &block )
    nil
end
on_error( &block ) click to toggle source

@param [Block] block

Passes exceptions raised in the Reactor {#thread} to a
{Tasks::Persistent task}.

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 392
def on_error( &block )
    fail_if_not_running
    @error_handlers << Tasks::Persistent.new( &block )
    nil
end
on_shutdown( &block ) click to toggle source

@param [Block] block

Schedules a {Tasks::OneOff task} to be run at {#stop shutdown}.

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 430
def on_shutdown( &block )
    fail_if_not_running
    @shutdown_tasks << Tasks::OneOff.new( &block )
    nil
end
on_tick( &block ) click to toggle source

@param [Block] block

Schedules a {Tasks::Persistent task} to be run at each tick.

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 402
def on_tick( &block )
    fail_if_not_running
    @tasks << Tasks::Persistent.new( &block )
    nil
end
run( &block ) click to toggle source

Starts the {Reactor} loop and blocks the current {#thread} until {#stop} is called.

@param [Block] block

Block to call right before initializing the loop.

@raise (see fail_if_running)

# File lib/arachni/reactor.rb, line 298
def run( &block )
    fail_if_running

    @done_signal.clear

    @thread = Thread.current

    block.call if block_given?

    loop do
        begin
            @tasks.call
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        begin
            process_connections
        rescue => e
            @error_handlers.call( e )
        end
        break if @stop

        @ticks += 1
    end

    @tasks.clear
    close_connections

    @shutdown_tasks.call

    @ticks  = 0
    @thread = nil

    @done_signal << nil
end
run_block( &block ) click to toggle source

Starts the {#run Reactor loop}, blocks the current {#thread} while the given `block` executes and then {#stop}s it.

@param [Block] block

Block to call.

@raise (see fail_if_running)

# File lib/arachni/reactor.rb, line 377
def run_block( &block )
    fail ArgumentError, 'Missing block.' if !block_given?
    fail_if_running

    run do
        block.call
        next_tick { stop }
    end
end
run_in_thread( &block ) click to toggle source

{#run Runs} the Reactor in a thread and blocks until it is {#running?}.

@param (see run)

@return [Thread]

{Reactor#thread}

@raise (see fail_if_running)

# File lib/arachni/reactor.rb, line 344
def run_in_thread( &block )
    fail_if_running

    Thread.new do
        begin
            run(&block)
        rescue => e
            @error_handlers.call( e )
        end
    end

    sleep 0.1 while !running?

    thread
end
running?() click to toggle source

@return [Bool]

`true` if the {Reactor} is {#run running}, `false` otherwise.
# File lib/arachni/reactor.rb, line 280
def running?
    thread && thread.alive?
end
schedule( &block ) click to toggle source

@param [Block] block

Schedules a task to be run as soon as possible, either immediately if
the caller is {#in_same_thread? in the same thread}, or at the
{#next_tick} otherwise.

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 414
def schedule( &block )
    fail_if_not_running

    if running? && in_same_thread?
        block.call
    else
        next_tick(&block)
    end

    nil
end
stop() click to toggle source

Stops the {Reactor} {#run loop} {#schedule as soon as possible}.

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 287
def stop
    schedule { @stop = true }
end
thread() click to toggle source

@return [Thread, nil]

Thread of the {#run loop}, `nil` if not running.
# File lib/arachni/reactor.rb, line 476
def thread
    @thread
end
wait() click to toggle source

Waits for the Reactor to stop {#running?}.

@raise (see fail_if_not_running)

# File lib/arachni/reactor.rb, line 363
def wait
    fail_if_not_running

    @done_signal.pop
    true
end

Private Instance Methods

all_sockets() click to toggle source
# File lib/arachni/reactor.rb, line 715
def all_sockets
    @connections.values.map(&:socket)
end
close_connections() click to toggle source

Closes all client connections, both ingress and egress.

# File lib/arachni/reactor.rb, line 633
def close_connections
    @connections.values.each(&:close)
end
connect_tcp() click to toggle source

@return [Socket]

Connected socket.
# File lib/arachni/reactor.rb, line 608
def connect_tcp
    socket = Socket.new(
        Socket::Constants::AF_INET,
        Socket::Constants::SOCK_STREAM,
        Socket::Constants::IPPROTO_IP
    )
    socket.do_not_reverse_lookup = true
    socket
end
connect_unix( unix_socket ) click to toggle source

@return [UNIXSocket]

Connected socket.
# File lib/arachni/reactor.rb, line 600
def connect_unix( unix_socket )
    fail_if_non_unix

    UNIXSocket.new( unix_socket )
end
connection_from_socket( socket ) click to toggle source
# File lib/arachni/reactor.rb, line 723
def connection_from_socket( socket )
    @connections[socket.to_io]
end
connections_from_sockets( sockets ) click to toggle source
# File lib/arachni/reactor.rb, line 719
def connections_from_sockets( sockets )
    sockets.map { |s| connection_from_socket( s ) }
end
determine_connection_options( *args ) click to toggle source
# File lib/arachni/reactor.rb, line 577
def determine_connection_options( *args )
    options = {}
    host = port = unix_socket = nil

    if args[1].is_a? Integer
        options[:host], options[:port], options[:handler], *handler_options = *args
    else
        options[:unix_socket], options[:handler], *handler_options = *args
    end

    if !options[:unix_socket].is_a?( String ) &&
        (!options[:host].is_a?( String ) || !options[:port].is_a?( Integer ))
        fail ArgumentError,
             'Either a UNIX socket path or a host and port combination are required.'
    end

    options[:handler]       ||= Connection
    options[:handler_options] = handler_options
    options
end
fail_if_non_unix() click to toggle source

@raise [Error::UNIXSocketsNotSupported]

If trying to use UNIX-domain sockets on a host OS that does not
support them.
# File lib/arachni/reactor.rb, line 547
def fail_if_non_unix
    return if self.class.supports_unix_sockets?

    fail Error::UNIXSocketsNotSupported,
         'The host OS does not support UNIX-domain sockets.'
end
fail_if_not_running() click to toggle source

@raise [Error::NotRunning]

If the Reactor is not {#running?}.
# File lib/arachni/reactor.rb, line 534
def fail_if_not_running
    fail Error::NotRunning, 'Reactor is not running.' if !running?
end
fail_if_running() click to toggle source

@raise [Error::NotRunning]

If the Reactor is already {#running?}.
# File lib/arachni/reactor.rb, line 540
def fail_if_running
    fail Error::AlreadyRunning, 'Reactor is already running.' if running?
end
listen_tcp( host, port ) click to toggle source

@return [TCPServer]

Listening server socket.
# File lib/arachni/reactor.rb, line 620
def listen_tcp( host, port )
    server = TCPServer.new( host, port )
    server.do_not_reverse_lookup = true
    server
end
listen_unix( unix_socket ) click to toggle source

@return [UNIXServer]

Listening server socket.
# File lib/arachni/reactor.rb, line 628
def listen_unix( unix_socket )
    UNIXServer.new( unix_socket )
end
process_connections() click to toggle source
# File lib/arachni/reactor.rb, line 554
def process_connections
    if @connections.empty?
        sleep @max_tick_interval
        return
    end

    # Required for OSX as it connects immediately and then #select returns
    # nothing as there's no activity, given that, OpenSSL doesn't get a chance
    # to do its handshake so explicitly connect pending sockets, bypassing #select.
    @connections.each do |_, connection|
        connection._connect if !connection.connected?
    end

    # Get connections with available events - :read, :write, :error.
    selected = select_connections

    # Close connections that have errors.
    [selected.delete(:error)].flatten.compact.each(&:close)

    # Call the corresponding event on the connections.
    selected.each { |event, connections| connections.each(&"_#{event}".to_sym) }
end
read_sockets() click to toggle source

@return [Array<Socket>]

Sockets of all connections, we want to be ready to read at any time.
# File lib/arachni/reactor.rb, line 694
def read_sockets
    s = []
    @connections.each do |_, connection|
        next if !connection.listener? && !connection.connected?
        s << connection.socket
    end
    s
end
select_connections() click to toggle source

@return [Hash]

Connections grouped by their available events:

* `:read` -- Ready for reading (i.e. with data in their incoming buffer).
* `:write` -- Ready for writing (i.e. with data in their
    {Connection#has_outgoing_data? outgoing buffer).
* `:error`
# File lib/arachni/reactor.rb, line 645
def select_connections
    readables = read_sockets

    selected_sockets =
        begin
            Connection::Error.translate do
                select(
                    readables,
                    write_sockets,
                    all_sockets,
                    @select_timeout
                )
            end
        rescue Connection::Error => e
            nil
        end

    selected_sockets ||= [[],[],[]]

    # SSL sockets maintain their own buffer whose state can't be checked by
    # Kernel.select, leading to cases where the SSL buffer isn't empty,
    # even though Kernel.select says that there's nothing to read.
    #
    # So force a read for SSL sockets to cover all our bases.
    #
    # This is apparent especially on JRuby.
    if readables.size != selected_sockets[0].size
        (readables - selected_sockets[0]).each do |socket|
            next if !socket.is_a?( OpenSSL::SSL::SSLSocket )
            selected_sockets[0] << socket
        end
    end

    if selected_sockets[0].empty? && selected_sockets[1].empty? &&
        selected_sockets[2].empty?
        return {}
    end

    {
        # Since these will be processed in order, it's better have the write
        # ones first to flush the buffers ASAP.
        write:   connections_from_sockets( selected_sockets[1] ),
        read:    connections_from_sockets( selected_sockets[0] ),
        error:   connections_from_sockets( selected_sockets[2] )
    }
end
write_sockets() click to toggle source

@return [Array<Socket>]

Sockets of connections with
{Connection#has_outgoing_data? outgoing data}.
# File lib/arachni/reactor.rb, line 706
def write_sockets
    s = []
    @connections.each do |_, connection|
        next if connection.connected? && !connection.has_outgoing_data?
        s << connection.socket
    end
    s
end