class Lumberjack::Beats::Parser

Constants

FRAME_COMPRESSED
FRAME_DATA
FRAME_JSON_DATA
FRAME_WINDOW
PROTOCOL_VERSION_1
PROTOCOL_VERSION_2

Public Class Methods

new() click to toggle source
# File lib/lumberjack/beats/server.rb, line 224
def initialize
  @buffer_offset = 0
  @buffer = ""
  @buffer.force_encoding("BINARY")
  transition(:header, 2)
end

Public Instance Methods

compressed_lead(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 374
def compressed_lead(&block)
  length = get.unpack("N").first
  transition(:compressed_payload, length)
end
compressed_payload(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 379
def compressed_payload(&block)
  original = Zlib::Inflate.inflate(get)
  transition(:header, 2)

  # Parse the uncompressed payload.
  parser = self.class.new
  parser.feed(original, &block)
end
data_field_key(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 349
def data_field_key(&block)
  @key = get
  transition(:data_field_value_len, 4)
end
data_field_key_len(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 344
def data_field_key_len(&block)
  key_len = get.unpack("N").first
  transition(:data_field_key, key_len)
end
data_field_value() { |:data, sequence, data| ... } click to toggle source
# File lib/lumberjack/beats/server.rb, line 358
def data_field_value(&block)
  @value = get

  @data_count -= 1
  @data[@key] = @value

  if @data_count > 0
    transition(:data_field_key_len, 4)
  else
    # emit the whole map now that we found the end of the data fields list.
    yield :data, @sequence, @data
    transition(:header, 2)
  end

end
data_field_value_len(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 354
def data_field_value_len(&block)
  transition(:data_field_value, get.unpack("N").first)
end
data_lead(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 338
def data_lead(&block)
  @sequence, @data_count = get.unpack("NN")
  @data = {}
  transition(:data_field_key_len, 4)
end
feed(data, &block) click to toggle source

Feed data to this parser.

Currently, it will return the raw payload of websocket messages. Otherwise, it returns nil if no complete message has yet been consumed.

@param [String] the string data to feed into the parser. @return [String, nil] the websocket message payload, if any, nil otherwise.

# File lib/lumberjack/beats/server.rb, line 247
def feed(data, &block)
  @buffer << data
  #p :need => @need
  while have?(@need)
    send(@state, &block)
    #case @state
    #when :header; header(&block)
    #when :window_size; window_size(&block)
    #when :data_lead; data_lead(&block)
    #when :data_field_key_len; data_field_key_len(&block)
    #when :data_field_key; data_field_key(&block)
    #when :data_field_value_len; data_field_value_len(&block)
    #when :data_field_value; data_field_value(&block)
    #when :data_field_value; data_field_value(&block)
    #when :compressed_lead; compressed_lead(&block)
    #when :compressed_payload; compressed_payload(&block)
    #end # case @state
  end
  return nil
end
get(length=nil) click to toggle source

Get 'length' string from the buffer.

# File lib/lumberjack/beats/server.rb, line 274
def get(length=nil)
  length = @need if length.nil?
  data = @buffer[@buffer_offset ... @buffer_offset + length]
  @buffer_offset += length
  if @buffer_offset > 16384
    @buffer = @buffer[@buffer_offset  .. -1]
    @buffer_offset = 0
  end
  return data
end
handle_version(version) { |:version, version| ... } click to toggle source
# File lib/lumberjack/beats/server.rb, line 309
def handle_version(version, &block)
  if supported_protocol?(version)
    yield :version, version
  else
    raise UnsupportedProtocol, "unsupported protocol #{version}"
  end
end
have?(length) click to toggle source

Do we have at least 'length' bytes in the buffer?

# File lib/lumberjack/beats/server.rb, line 269
def have?(length)
  return length <= (@buffer.size - @buffer_offset)
end
header(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 294
def header(&block)
  version, frame_type = get.bytes.to_a[0..1]
  version ||= PROTOCOL_VERSION_1

  handle_version(version, &block)

  case frame_type
  when FRAME_WINDOW; transition(:window_size, 4)
  when FRAME_DATA; transition(:data_lead, 8)
  when FRAME_JSON_DATA; transition(:json_data_lead, 8)
  when FRAME_COMPRESSED; transition(:compressed_lead, 4)
  else; raise "Unknown frame type: `#{frame_type}`"
  end
end
json_data_lead(&block) click to toggle source
# File lib/lumberjack/beats/server.rb, line 327
def json_data_lead(&block)
  @sequence, payload_size = get.unpack("NN")
  transition(:json_data_payload, payload_size)
end
json_data_payload() { |:json, sequence, load| ... } click to toggle source
# File lib/lumberjack/beats/server.rb, line 332
def json_data_payload(&block)
  payload = get
  yield :json, @sequence, Lumberjack::Beats::json.load(payload)
  transition(:header, 2)
end
need(length) click to toggle source

Set the minimum number of bytes we need in the buffer for the next read.

# File lib/lumberjack/beats/server.rb, line 286
def need(length)
  @need = length
end
supported_protocol?(version) click to toggle source
# File lib/lumberjack/beats/server.rb, line 317
def supported_protocol?(version)
  PROTOCOL_VERSION_2 == version || PROTOCOL_VERSION_1 == version
end
transition(state, next_length) click to toggle source
# File lib/lumberjack/beats/server.rb, line 231
def transition(state, next_length)
  @state = state
  #puts :transition => state
  # TODO(sissel): Assert this self.respond_to?(state)
  # TODO(sissel): Assert state is in STATES
  # TODO(sissel): Assert next_length is a number
  need(next_length)
end
window_size() { |:window_size, window_size| ... } click to toggle source
# File lib/lumberjack/beats/server.rb, line 321
def window_size(&block)
  @window_size = get.unpack("N").first
  transition(:header, 2)
  yield :window_size, @window_size
end