class Roby::Interface::DRobyChannel
A wrapper on top of raw IO
that uses droby marshalling to communicate
Attributes
@return [#read_nonblock,#write] the channel that allows us to communicate to clients
@return [DRoby::Marshal] an object used to marshal or unmarshal
objects to/from the connection
The maximum byte count that the channel can hold on the write side until it bails out
Public Class Methods
# File lib/roby/interface/droby_channel.rb, line 17 def initialize(io, client, marshaller: DRoby::Marshal.new(auto_create_plans: true), max_write_buffer_size: 25*1024**2) @io = io @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @client = client @incoming = if client? WebSocket::Frame::Incoming::Client.new(type: :binary) else WebSocket::Frame::Incoming::Server.new(type: :binary) end @marshaller = marshaller @max_write_buffer_size = max_write_buffer_size @read_buffer = String.new @write_buffer = String.new @write_thread = nil end
Public Instance Methods
# File lib/roby/interface/droby_channel.rb, line 43 def close io.close end
# File lib/roby/interface/droby_channel.rb, line 47 def closed? io.closed? end
# File lib/roby/interface/droby_channel.rb, line 51 def eof? io.eof? end
# File lib/roby/interface/droby_channel.rb, line 55 def flush io.flush end
Push queued data
The write I/O is buffered. This method pushes data stored within the internal buffer and/or appends new data to it.
@return [Boolean] true if there is still data left in the buffe,
false otherwise
# File lib/roby/interface/droby_channel.rb, line 147 def push_write_data(new_bytes = nil) @write_thread ||= Thread.current if @write_thread != Thread.current raise InternalError, "cross-thread access to droby channel: from #{@write_thread} to #{Thread.current}" end @write_buffer.concat(new_bytes) if new_bytes written_bytes = io.syswrite(@write_buffer) @write_buffer = @write_buffer[written_bytes..-1] !@write_buffer.empty? rescue Errno::EWOULDBLOCK, Errno::EAGAIN if @write_buffer.size > max_write_buffer_size raise ComError, "droby_channel reached an internal buffer size of #{@write_buffer.size}, which is bigger than the limit of #{max_write_buffer_size}, bailing out" end rescue SystemCallError, IOError, EOFError raise ComError, "broken communication channel" rescue RuntimeError => e # Workaround what seems to be a Ruby bug ... if e.message =~ /can.t modify frozen IOError/ raise ComError, "broken communication channel" else raise end end
Read one packet from {#io} and unmarshal it
@return [Object,nil] returns the unmarshalled object, or nil if no
full object can be found in the data received so far
# File lib/roby/interface/droby_channel.rb, line 73 def read_packet(timeout = 0) @read_thread ||= Thread.current if @read_thread != Thread.current raise InternalError, "cross-thread access to droby channel: from #{@read_thread} to #{Thread.current}" end deadline = Time.now + timeout if timeout remaining_time = timeout if packet = @incoming.next return unmarshal_packet(packet) end while true if IO.select([io], [], [], remaining_time) begin if io.sysread(1024 ** 2, @read_buffer) @incoming << @read_buffer end rescue Errno::EWOULDBLOCK, Errno::EAGAIN end end if packet = @incoming.next return unmarshal_packet(packet) end if deadline remaining_time = deadline - Time.now return if remaining_time < 0 end end rescue SystemCallError, EOFError, IOError raise ComError, "closed communication" end
Wait until there is something to read on the channel
@param [Numeric,nil] timeout a timeout after which the method
will return. Use nil for no timeout
@return [Boolean] falsy if the timeout was reached, true
otherwise
# File lib/roby/interface/droby_channel.rb, line 65 def read_wait(timeout: nil) !!IO.select([io], [], [], timeout) end
# File lib/roby/interface/droby_channel.rb, line 135 def reset_thread_guard @write_thread = nil @read_thread = nil end
# File lib/roby/interface/droby_channel.rb, line 39 def to_io io.to_io end
# File lib/roby/interface/droby_channel.rb, line 110 def unmarshal_packet(packet) unmarshalled = begin Marshal.load(packet.to_s) rescue TypeError => e raise ProtocolError, "failed to unmarshal received packet: #{e.message}" end marshaller.local_object(unmarshalled) end
# File lib/roby/interface/droby_channel.rb, line 35 def write_buffer_size @write_buffer.size end
Write one ruby object (usually an array) as a marshalled packet and send it to {#io}
@param [Object] object the object to be sent @return [void]
# File lib/roby/interface/droby_channel.rb, line 123 def write_packet(object) marshalled = Marshal.dump(marshaller.dump(object)) packet = if client? WebSocket::Frame::Outgoing::Client.new(data: marshalled, type: :binary) else WebSocket::Frame::Outgoing::Server.new(data: marshalled, type: :binary) end push_write_data(packet.to_s) end