class Lumberjack::Beats::Parser
Constants
- FRAME_COMPRESSED
- FRAME_DATA
- FRAME_JSON_DATA
- FRAME_WINDOW
- PROTOCOL_VERSION_1
- PROTOCOL_VERSION_2
- SUPPORTED_PROTOCOLS
Public Class Methods
new()
click to toggle source
# File lib/lumberjack/beats/server.rb, line 144 def initialize @buffer_offset = 0 @buffer = "" @buffer.force_encoding("BINARY") transition(:header, 2) end
Public Instance Methods
compressed_lead(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 294 def compressed_lead(&block) length = get.unpack("N").first transition(:compressed_payload, length) end
compressed_payload(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 299 def compressed_payload(&block) original = Zlib::Inflate.inflate(get) transition(:header, 2) # Parse the uncompressed payload. parser = self.class.new parser.feed(original, &block) end
data_field_key(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 269 def data_field_key(&block) @key = get transition(:data_field_value_len, 4) end
data_field_key_len(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 264 def data_field_key_len(&block) key_len = get.unpack("N").first transition(:data_field_key, key_len) end
data_field_value() { |:data, sequence, data| ... }
click to toggle source
# File lib/lumberjack/beats/server.rb, line 278 def data_field_value(&block) @value = get @data_count -= 1 @data[@key] = @value if @data_count > 0 transition(:data_field_key_len, 4) else # emit the whole map now that we found the end of the data fields list. yield :data, @sequence, @data transition(:header, 2) end end
data_field_value_len(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 274 def data_field_value_len(&block) transition(:data_field_value, get.unpack("N").first) end
data_lead(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 258 def data_lead(&block) @sequence, @data_count = get.unpack("NN") @data = {} transition(:data_field_key_len, 4) end
feed(data, &block)
click to toggle source
Feed data to this parser.
Currently, it will return the raw payload of websocket messages. Otherwise, it returns nil if no complete message has yet been consumed.
@param [String] the string data to feed into the parser. @return [String, nil] the websocket message payload, if any, nil otherwise.
# File lib/lumberjack/beats/server.rb, line 167 def feed(data, &block) @buffer << data #p :need => @need while have?(@need) send(@state, &block) #case @state #when :header; header(&block) #when :window_size; window_size(&block) #when :data_lead; data_lead(&block) #when :data_field_key_len; data_field_key_len(&block) #when :data_field_key; data_field_key(&block) #when :data_field_value_len; data_field_value_len(&block) #when :data_field_value; data_field_value(&block) #when :data_field_value; data_field_value(&block) #when :compressed_lead; compressed_lead(&block) #when :compressed_payload; compressed_payload(&block) #end # case @state end return nil end
get(length=nil)
click to toggle source
Get 'length' string from the buffer.
# File lib/lumberjack/beats/server.rb, line 194 def get(length=nil) length = @need if length.nil? data = @buffer[@buffer_offset ... @buffer_offset + length] @buffer_offset += length if @buffer_offset > 16384 @buffer = @buffer[@buffer_offset .. -1] @buffer_offset = 0 end return data end
handle_version(version) { |:version, version| ... }
click to toggle source
# File lib/lumberjack/beats/server.rb, line 229 def handle_version(version, &block) if supported_protocol?(version) yield :version, version else raise "unsupported protocol #{version}" end end
have?(length)
click to toggle source
Do we have at least 'length' bytes in the buffer?
# File lib/lumberjack/beats/server.rb, line 189 def have?(length) return length <= (@buffer.size - @buffer_offset) end
header(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 214 def header(&block) version, frame_type = get.bytes.to_a[0..1] version ||= PROTOCOL_VERSION_1 handle_version(version, &block) case frame_type when FRAME_WINDOW; transition(:window_size, 4) when FRAME_DATA; transition(:data_lead, 8) when FRAME_JSON_DATA; transition(:json_data_lead, 8) when FRAME_COMPRESSED; transition(:compressed_lead, 4) else; raise "Unknown frame type: `#{frame_type}`" end end
json_data_lead(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 247 def json_data_lead(&block) @sequence, payload_size = get.unpack("NN") transition(:json_data_payload, payload_size) end
json_data_payload() { |:json, sequence, load| ... }
click to toggle source
# File lib/lumberjack/beats/server.rb, line 252 def json_data_payload(&block) payload = get yield :json, @sequence, Lumberjack::Beats::json.load(payload) transition(:header, 2) end
need(length)
click to toggle source
Set the minimum number of bytes we need in the buffer for the next read.
# File lib/lumberjack/beats/server.rb, line 206 def need(length) @need = length end
supported_protocol?(version)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 237 def supported_protocol?(version) SUPPORTED_PROTOCOLS.include?(version) end
transition(state, next_length)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 151 def transition(state, next_length) @state = state #puts :transition => state # TODO(sissel): Assert this self.respond_to?(state) # TODO(sissel): Assert state is in STATES # TODO(sissel): Assert next_length is a number need(next_length) end
window_size() { |:window_size, window_size| ... }
click to toggle source
# File lib/lumberjack/beats/server.rb, line 241 def window_size(&block) @window_size = get.unpack("N").first transition(:header, 2) yield :window_size, @window_size end