module RxIO::IOFilters::MsgSize
Message-Size I/O Filter: Splits messages according to a _4-byte unsigned big-endian integer_ size field, which prefixes every message and indicates its length in bytes.
Public Class Methods
extended(base)
click to toggle source
Inject Dependencies into Extending Module @param [Module] base
# File lib/rxio/io_filters/msg_size.rb, line 22 def self.extended base base.extend RxIO::HandlerBase end
Public Instance Methods
filter_input(endpoint, chunk)
click to toggle source
Filter Input: Buffers data chunks sent by the endpoint and extracts messages from them, according to the size field present at the beginning of each message. @param [Hash] endpoint @param [String] chunk
# File lib/rxio/io_filters/msg_size.rb, line 30 def filter_input endpoint, chunk # Buffer dat shit buffer_input endpoint, chunk # Loop over Messages while true # Check Buffer Size (can we at least determine the next message size?) break unless endpoint[:ibuf].bytesize >= 4 # Acquire Message Size size = endpoint[:ibuf][0, 4].unpack('N')[0] # Check Buffer Size again (is the complete message present in the buffer?) break unless endpoint[:ibuf].bytesize >= 4 + size # Slice out Message from Input Buffer m = endpoint[:ibuf].slice!(0, 4 + size)[4, size] # Register Message # endpoint[:msgs] << Base64.decode64(m) endpoint[:msgs] << m end end
send_msg(endpoint, msg)
click to toggle source
Send Message: Buffers a message to be sent to the endpoint, after prefixing it with a size field. @param [Hash] endpoint @param [String] msg
# File lib/rxio/io_filters/msg_size.rb, line 60 def send_msg endpoint, msg # msg = Base64.encode64 msg msg = msg.to_s write endpoint, [msg.bytesize].pack('N').force_encoding(msg.encoding), msg end