class Lumberjack::Beats::Parser
Constants
- FRAME_COMPRESSED
- FRAME_DATA
- FRAME_JSON_DATA
- FRAME_WINDOW
- PROTOCOL_VERSION_1
- PROTOCOL_VERSION_2
Public Class Methods
new()
click to toggle source
# File lib/lumberjack/beats/server.rb, line 224 def initialize @buffer_offset = 0 @buffer = "" @buffer.force_encoding("BINARY") transition(:header, 2) end
Public Instance Methods
compressed_lead(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 374 def compressed_lead(&block) length = get.unpack("N").first transition(:compressed_payload, length) end
compressed_payload(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 379 def compressed_payload(&block) original = Zlib::Inflate.inflate(get) transition(:header, 2) # Parse the uncompressed payload. parser = self.class.new parser.feed(original, &block) end
data_field_key(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 349 def data_field_key(&block) @key = get transition(:data_field_value_len, 4) end
data_field_key_len(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 344 def data_field_key_len(&block) key_len = get.unpack("N").first transition(:data_field_key, key_len) end
data_field_value() { |:data, sequence, data| ... }
click to toggle source
# File lib/lumberjack/beats/server.rb, line 358 def data_field_value(&block) @value = get @data_count -= 1 @data[@key] = @value if @data_count > 0 transition(:data_field_key_len, 4) else # emit the whole map now that we found the end of the data fields list. yield :data, @sequence, @data transition(:header, 2) end end
data_field_value_len(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 354 def data_field_value_len(&block) transition(:data_field_value, get.unpack("N").first) end
data_lead(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 338 def data_lead(&block) @sequence, @data_count = get.unpack("NN") @data = {} transition(:data_field_key_len, 4) end
feed(data, &block)
click to toggle source
Feed data to this parser.
Currently, it will return the raw payload of websocket messages. Otherwise, it returns nil if no complete message has yet been consumed.
@param [String] the string data to feed into the parser. @return [String, nil] the websocket message payload, if any, nil otherwise.
# File lib/lumberjack/beats/server.rb, line 247 def feed(data, &block) @buffer << data #p :need => @need while have?(@need) send(@state, &block) #case @state #when :header; header(&block) #when :window_size; window_size(&block) #when :data_lead; data_lead(&block) #when :data_field_key_len; data_field_key_len(&block) #when :data_field_key; data_field_key(&block) #when :data_field_value_len; data_field_value_len(&block) #when :data_field_value; data_field_value(&block) #when :data_field_value; data_field_value(&block) #when :compressed_lead; compressed_lead(&block) #when :compressed_payload; compressed_payload(&block) #end # case @state end return nil end
get(length=nil)
click to toggle source
Get 'length' string from the buffer.
# File lib/lumberjack/beats/server.rb, line 274 def get(length=nil) length = @need if length.nil? data = @buffer[@buffer_offset ... @buffer_offset + length] @buffer_offset += length if @buffer_offset > 16384 @buffer = @buffer[@buffer_offset .. -1] @buffer_offset = 0 end return data end
handle_version(version) { |:version, version| ... }
click to toggle source
# File lib/lumberjack/beats/server.rb, line 309 def handle_version(version, &block) if supported_protocol?(version) yield :version, version else raise UnsupportedProtocol, "unsupported protocol #{version}" end end
have?(length)
click to toggle source
Do we have at least 'length' bytes in the buffer?
# File lib/lumberjack/beats/server.rb, line 269 def have?(length) return length <= (@buffer.size - @buffer_offset) end
header(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 294 def header(&block) version, frame_type = get.bytes.to_a[0..1] version ||= PROTOCOL_VERSION_1 handle_version(version, &block) case frame_type when FRAME_WINDOW; transition(:window_size, 4) when FRAME_DATA; transition(:data_lead, 8) when FRAME_JSON_DATA; transition(:json_data_lead, 8) when FRAME_COMPRESSED; transition(:compressed_lead, 4) else; raise "Unknown frame type: `#{frame_type}`" end end
json_data_lead(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 327 def json_data_lead(&block) @sequence, payload_size = get.unpack("NN") transition(:json_data_payload, payload_size) end
json_data_payload() { |:json, sequence, load| ... }
click to toggle source
# File lib/lumberjack/beats/server.rb, line 332 def json_data_payload(&block) payload = get yield :json, @sequence, Lumberjack::Beats::json.load(payload) transition(:header, 2) end
need(length)
click to toggle source
Set the minimum number of bytes we need in the buffer for the next read.
# File lib/lumberjack/beats/server.rb, line 286 def need(length) @need = length end
supported_protocol?(version)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 317 def supported_protocol?(version) PROTOCOL_VERSION_2 == version || PROTOCOL_VERSION_1 == version end
transition(state, next_length)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 231 def transition(state, next_length) @state = state #puts :transition => state # TODO(sissel): Assert this self.respond_to?(state) # TODO(sissel): Assert state is in STATES # TODO(sissel): Assert next_length is a number need(next_length) end
window_size() { |:window_size, window_size| ... }
click to toggle source
# File lib/lumberjack/beats/server.rb, line 321 def window_size(&block) @window_size = get.unpack("N").first transition(:header, 2) yield :window_size, @window_size end