class Roby::DRoby::Logfile::Server

This is the server part of the log distribution mechanism

It is basically a file distribution mechanism: it “listens” to the event log file and sends new data to the clients that are connected to it.

When a client connects, it will send the complete file

Constants

CONNECTION_INIT
CONNECTION_INIT_DONE
DATA_CHUNK_SIZE
DEFAULT_PORT
DEFAULT_SAMPLING_PERIOD

Attributes

event_file[R]

The IO object that we use to read the event file

event_file_path[R]

The path to the event file this server is listening to

pending_data[R]

A mapping from socket to data chunks representing the data that should be sent to a particular client

sampling_period[R]

The sampling period (in seconds)

server[R]

The server socket

server_io[R]

The IO we are listening on

Public Class Methods

new(event_file_path, sampling_period = DEFAULT_SAMPLING_PERIOD, io) click to toggle source
# File lib/roby/droby/logfile/server.rb, line 39
def initialize(event_file_path, sampling_period = DEFAULT_SAMPLING_PERIOD, io)
    @server = io
    @pending_data = Hash.new
    @sampling_period = sampling_period
    @event_file_path = event_file_path
    @event_file = File.open(event_file_path, 'r:BINARY')
end

Public Instance Methods

exec() click to toggle source
# File lib/roby/droby/logfile/server.rb, line 51
def exec
    while true
        sockets_with_pending_data = pending_data.find_all do |socket, chunks|
            !chunks.empty?
        end.map(&:first)
        if !sockets_with_pending_data.empty?
            Server.debug "#{sockets_with_pending_data.size} sockets have pending data"
        end

        readable_sockets, _ =
            select([server], sockets_with_pending_data, nil, sampling_period)

        # Incoming connections
        if readable_sockets && !readable_sockets.empty?
            socket = server.accept
            socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
            socket.fcntl(Fcntl::FD_CLOEXEC, 1)

            Server.debug "new connection: #{socket}"
            if found_header?
                all_data = File.binread(event_file_path, 
                                        event_file.tell - Logfile::PROLOGUE_SIZE,
                                        Logfile::PROLOGUE_SIZE)

                Server.debug "  queueing #{all_data.size} bytes of data"
                chunks = split_in_chunks(all_data)
            else
                Server.debug "  log file is empty, not queueing any data"
                chunks = Array.new
            end
            connection_init      = ::Marshal.dump([CONNECTION_INIT, chunks.inject(0) { |s, c| s + c.size }])
            connection_init_done = ::Marshal.dump(CONNECTION_INIT_DONE)
            chunks.unshift([connection_init.size].pack('L<') + connection_init)
            chunks << [connection_init_done.size].pack('L<') + connection_init_done
            @pending_data[socket] = chunks
        end

        # Read new data
        read_new_data
        # Send data to our peers
        send_pending_data
    end
rescue Exception
    pending_data.each_key(&:close)
    raise
end
found_header?() click to toggle source
# File lib/roby/droby/logfile/server.rb, line 47
def found_header?
    @found_header
end
read_new_data() click to toggle source

Reads new data from the underlying file and queues it to dispatch for our clients

# File lib/roby/droby/logfile/server.rb, line 118
def read_new_data
    new_data = event_file.read
    return if new_data.empty?

    if !found_header?
        if new_data.size >= Logfile::PROLOGUE_SIZE
            # This will read and validate the prologue
            Logfile.read_prologue(StringIO.new(new_data))
            new_data = new_data[Logfile::PROLOGUE_SIZE..-1]
            @found_header = true
        else
            # Go back to the beginning of the file so that, next
            # time, we read the complete prologue again
            event_file.rewind
            return
        end
    end

    # Split the data in chunks of DATA_CHUNK_SIZE, and add the
    # chunks in the pending_data hash
    new_chunks = split_in_chunks(new_data)
    pending_data.each_value do |chunks|
        chunks.concat(new_chunks)
    end
end
send_pending_data() click to toggle source

Tries to send all pending data to the connected clients

# File lib/roby/droby/logfile/server.rb, line 148
def send_pending_data
    needs_looping = true
    while needs_looping
        needs_looping = false
        pending_data.delete_if do |socket, chunks|
            if chunks.empty?
                # nothing left to send for this socket
                next
            end

            buffer = chunks.shift
            while !chunks.empty? && (buffer.size + chunks[0].size < DATA_CHUNK_SIZE)
                buffer.concat(chunks.shift)
            end
            Server.debug "sending #{buffer.size} bytes to #{socket}"


            begin
                written = socket.write_nonblock(buffer)
            rescue Errno::EAGAIN
                Server.debug "cannot send: send buffer full"
                chunks.unshift(buffer)
                next
            rescue Exception => e
                Server.warn "disconnecting from #{socket}: #{e.message}"
                e.backtrace.each do |line|
                    Server.warn "  #{line}"
                end
                socket.close
                next(true)
            end

            remaining = buffer.size - written
            if remaining == 0
                Server.debug "wrote complete chunk of #{written} bytes to #{socket}"
                # Loop if we wrote the complete chunk and there
                # is still stuff to write for this socket
                needs_looping = !chunks.empty?
            else
                Server.debug "wrote partial chunk #{written} bytes instead of #{buffer.size} bytes to #{socket}"
                chunks.unshift(buffer[written, remaining])
            end
            false
        end
    end
end
split_in_chunks(data) click to toggle source

Splits the data block in data in blocks of size DATA_CHUNK_SIZE

# File lib/roby/droby/logfile/server.rb, line 99
def split_in_chunks(data)
    result = []

    index = 0
    while index != data.size
        remaining = (data.size - index)
        if remaining > DATA_CHUNK_SIZE
            result << data[index, DATA_CHUNK_SIZE]
            index += DATA_CHUNK_SIZE
        else
            result << data[index, remaining]
            index = data.size
        end
    end
    result
end