module RxIO::IOFilters::MsgSize

Message-Size I/O Filter: Splits messages according to a _4-byte unsigned big-endian integer_ size field, which prefixes every message and indicates its length in bytes.

Public Class Methods

extended(base) click to toggle source

Inject Dependencies into Extending Module @param [Module] base

# File lib/rxio/io_filters/msg_size.rb, line 22
def self.extended base
        base.extend RxIO::HandlerBase
end

Public Instance Methods

filter_input(endpoint, chunk) click to toggle source

Filter Input: Buffers data chunks sent by the endpoint and extracts messages from them, according to the size field present at the beginning of each message. @param [Hash] endpoint @param [String] chunk

# File lib/rxio/io_filters/msg_size.rb, line 30
def filter_input endpoint, chunk

        # Buffer dat shit
        buffer_input endpoint, chunk

        # Loop over Messages
        while true

                # Check Buffer Size (can we at least determine the next message size?)
                break unless endpoint[:ibuf].bytesize >= 4

                # Acquire Message Size
                size = endpoint[:ibuf][0, 4].unpack('N')[0]

                # Check Buffer Size again (is the complete message present in the buffer?)
                break unless endpoint[:ibuf].bytesize >= 4 + size

                # Slice out Message from Input Buffer
                m = endpoint[:ibuf].slice!(0, 4 + size)[4, size]

                # Register Message
                # endpoint[:msgs] << Base64.decode64(m)
                endpoint[:msgs] << m
        end
end
send_msg(endpoint, msg) click to toggle source

Send Message: Buffers a message to be sent to the endpoint, after prefixing it with a size field. @param [Hash] endpoint @param [String] msg

# File lib/rxio/io_filters/msg_size.rb, line 60
def send_msg endpoint, msg
        # msg = Base64.encode64 msg
        msg = msg.to_s
        write endpoint, [msg.bytesize].pack('N').force_encoding(msg.encoding), msg
end