module BinaryMessageParser
Converts binary WebSocket messages into a hash
Public Class Methods
parse(data)
click to toggle source
data should be a raw string
# File lib/signalfx/signalflow/binary.rb, line 9 def parse(data) # See https://developers.signalfx.com/v2/reference#section-binary-encoding-of-websocket-messages version, message_type, flags, _, channel, payload = data.unpack("CCb8CZ16a*") compressed = flags[0] == "1" is_json = flags[1] == "1" if version != 1 raise "Unsupported SignalFlow version #{version}" end if compressed payload = Zlib::Inflate.new(16+Zlib::MAX_WBITS).inflate(payload) end raise "Unknown binary message type #{message_type}" if !is_json && message_type != 5 message = is_json ? JSON.parse(payload, {:symbolize_names => true}) : parse_data_payload(payload) message.merge({:channel => channel}) end
parse_data_payload(payload)
click to toggle source
# File lib/signalfx/signalflow/binary.rb, line 33 def parse_data_payload(payload) # See https://developers.signalfx.com/v2/reference#section-binary-encoding-used-for-the-websocket timestamp, element_count, tuples_raw = payload.unpack("Q>L>a*") data_hash = (0..element_count-1).map do |i| type, tsid, value_raw = tuples_raw[i*17..i*17+16].unpack("CQ>a8") value = case type when 1 # long value_raw.unpack("q>") when 2 # double value_raw.unpack("G") when 3 # int (32 bit) value_raw.unpack("l>") end [ Base64.urlsafe_encode64([tsid].pack("Q>")).gsub("=", ""), value[0], ] end.to_h { :type => "data", :logicalTimestampMs => timestamp, :logicalTimestamp => Time.at(timestamp / 1000.0), :data => data_hash, } end
Private Instance Methods
parse(data)
click to toggle source
data should be a raw string
# File lib/signalfx/signalflow/binary.rb, line 9 def parse(data) # See https://developers.signalfx.com/v2/reference#section-binary-encoding-of-websocket-messages version, message_type, flags, _, channel, payload = data.unpack("CCb8CZ16a*") compressed = flags[0] == "1" is_json = flags[1] == "1" if version != 1 raise "Unsupported SignalFlow version #{version}" end if compressed payload = Zlib::Inflate.new(16+Zlib::MAX_WBITS).inflate(payload) end raise "Unknown binary message type #{message_type}" if !is_json && message_type != 5 message = is_json ? JSON.parse(payload, {:symbolize_names => true}) : parse_data_payload(payload) message.merge({:channel => channel}) end
parse_data_payload(payload)
click to toggle source
# File lib/signalfx/signalflow/binary.rb, line 33 def parse_data_payload(payload) # See https://developers.signalfx.com/v2/reference#section-binary-encoding-used-for-the-websocket timestamp, element_count, tuples_raw = payload.unpack("Q>L>a*") data_hash = (0..element_count-1).map do |i| type, tsid, value_raw = tuples_raw[i*17..i*17+16].unpack("CQ>a8") value = case type when 1 # long value_raw.unpack("q>") when 2 # double value_raw.unpack("G") when 3 # int (32 bit) value_raw.unpack("l>") end [ Base64.urlsafe_encode64([tsid].pack("Q>")).gsub("=", ""), value[0], ] end.to_h { :type => "data", :logicalTimestampMs => timestamp, :logicalTimestamp => Time.at(timestamp / 1000.0), :data => data_hash, } end