class Roby::DRoby::Logfile::Server
This is the server part of the log distribution mechanism
It is basically a file distribution mechanism: it “listens” to the event log file and sends new data to the clients that are connected to it.
When a client connects, it will send the complete file
Constants
- CONNECTION_INIT
- CONNECTION_INIT_DONE
- DATA_CHUNK_SIZE
- DEFAULT_PORT
- DEFAULT_SAMPLING_PERIOD
Attributes
event_file[R]
The IO
object that we use to read the event file
event_file_path[R]
The path to the event file this server is listening to
pending_data[R]
A mapping from socket to data chunks representing the data that should be sent to a particular client
sampling_period[R]
The sampling period (in seconds)
server[R]
The server socket
server_io[R]
The IO
we are listening on
Public Class Methods
new(event_file_path, sampling_period = DEFAULT_SAMPLING_PERIOD, io)
click to toggle source
# File lib/roby/droby/logfile/server.rb, line 39 def initialize(event_file_path, sampling_period = DEFAULT_SAMPLING_PERIOD, io) @server = io @pending_data = Hash.new @sampling_period = sampling_period @event_file_path = event_file_path @event_file = File.open(event_file_path, 'r:BINARY') end
Public Instance Methods
exec()
click to toggle source
# File lib/roby/droby/logfile/server.rb, line 51 def exec while true sockets_with_pending_data = pending_data.find_all do |socket, chunks| !chunks.empty? end.map(&:first) if !sockets_with_pending_data.empty? Server.debug "#{sockets_with_pending_data.size} sockets have pending data" end readable_sockets, _ = select([server], sockets_with_pending_data, nil, sampling_period) # Incoming connections if readable_sockets && !readable_sockets.empty? socket = server.accept socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true) socket.fcntl(Fcntl::FD_CLOEXEC, 1) Server.debug "new connection: #{socket}" if found_header? all_data = File.binread(event_file_path, event_file.tell - Logfile::PROLOGUE_SIZE, Logfile::PROLOGUE_SIZE) Server.debug " queueing #{all_data.size} bytes of data" chunks = split_in_chunks(all_data) else Server.debug " log file is empty, not queueing any data" chunks = Array.new end connection_init = ::Marshal.dump([CONNECTION_INIT, chunks.inject(0) { |s, c| s + c.size }]) connection_init_done = ::Marshal.dump(CONNECTION_INIT_DONE) chunks.unshift([connection_init.size].pack('L<') + connection_init) chunks << [connection_init_done.size].pack('L<') + connection_init_done @pending_data[socket] = chunks end # Read new data read_new_data # Send data to our peers send_pending_data end rescue Exception pending_data.each_key(&:close) raise end
found_header?()
click to toggle source
# File lib/roby/droby/logfile/server.rb, line 47 def found_header? @found_header end
read_new_data()
click to toggle source
Reads new data from the underlying file and queues it to dispatch for our clients
# File lib/roby/droby/logfile/server.rb, line 118 def read_new_data new_data = event_file.read return if new_data.empty? if !found_header? if new_data.size >= Logfile::PROLOGUE_SIZE # This will read and validate the prologue Logfile.read_prologue(StringIO.new(new_data)) new_data = new_data[Logfile::PROLOGUE_SIZE..-1] @found_header = true else # Go back to the beginning of the file so that, next # time, we read the complete prologue again event_file.rewind return end end # Split the data in chunks of DATA_CHUNK_SIZE, and add the # chunks in the pending_data hash new_chunks = split_in_chunks(new_data) pending_data.each_value do |chunks| chunks.concat(new_chunks) end end
send_pending_data()
click to toggle source
Tries to send all pending data to the connected clients
# File lib/roby/droby/logfile/server.rb, line 148 def send_pending_data needs_looping = true while needs_looping needs_looping = false pending_data.delete_if do |socket, chunks| if chunks.empty? # nothing left to send for this socket next end buffer = chunks.shift while !chunks.empty? && (buffer.size + chunks[0].size < DATA_CHUNK_SIZE) buffer.concat(chunks.shift) end Server.debug "sending #{buffer.size} bytes to #{socket}" begin written = socket.write_nonblock(buffer) rescue Errno::EAGAIN Server.debug "cannot send: send buffer full" chunks.unshift(buffer) next rescue Exception => e Server.warn "disconnecting from #{socket}: #{e.message}" e.backtrace.each do |line| Server.warn " #{line}" end socket.close next(true) end remaining = buffer.size - written if remaining == 0 Server.debug "wrote complete chunk of #{written} bytes to #{socket}" # Loop if we wrote the complete chunk and there # is still stuff to write for this socket needs_looping = !chunks.empty? else Server.debug "wrote partial chunk #{written} bytes instead of #{buffer.size} bytes to #{socket}" chunks.unshift(buffer[written, remaining]) end false end end end
split_in_chunks(data)
click to toggle source
Splits the data block in data
in blocks of size DATA_CHUNK_SIZE
# File lib/roby/droby/logfile/server.rb, line 99 def split_in_chunks(data) result = [] index = 0 while index != data.size remaining = (data.size - index) if remaining > DATA_CHUNK_SIZE result << data[index, DATA_CHUNK_SIZE] index += DATA_CHUNK_SIZE else result << data[index, remaining] index = data.size end end result end