class PulsarSdk::Protocol::Structure
Constants
- CHECKSUM_LEN
- MAGIC_NUMBER
MAGIC_NUMBER
- CHECKSUM
- METADATA_SIZE
- METADATA
- PAYLOAD
- MAGIC_NUMBER_LEN
- METADATA_SIZE_LEN
Public Class Methods
new(buff)
click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 12 def initialize(buff) @buff = buff rewind end
Public Instance Methods
decode()
click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 17 def decode metadata = nil message = PulsarSdk::Protocol::Message.new mn_bytes = read_magic_number if mn_bytes == MAGIC_NUMBER _checksum = read_checksum # TODO 可能需要校验一下,防止错误消息 metadata = read_metadata else rewind(MAGIC_NUMBER_LEN) metadata = read_metadata end msg = read_remaining # NOTE 同为Ruby SDK时可以根据Content-Type预先还原 # 复杂类型依旧为string,需要特别注意 metadata.properties.each do |x| next unless x.key.to_s =~ /Content-Type/i next unless x.value.to_s =~ /json/i PulsarSdk.logger.info("#{self.class}::#{__method__}"){"Found json encode remark, parse JSON mesaage!"} msg = JSON.parse(msg) end message.assign_attributes( publish_time: metadata.publish_time, event_time: metadata.event_time, partition_key: metadata.partition_key, properties: metadata.properties, payload: msg ) message end
read_checksum()
click to toggle source
crc32
# File lib/pulsar_sdk/protocol/structure.rb, line 66 def read_checksum read(CHECKSUM_LEN) end
read_magic_number()
click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 61 def read_magic_number read(MAGIC_NUMBER_LEN) end
read_metadata()
click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 70 def read_metadata metadata_size = read(METADATA_SIZE_LEN, 'N') metadata_bytes = read(metadata_size) Pulsar::Proto::MessageMetadata.decode(metadata_bytes) end
read_remaining()
click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 76 def read_remaining payload_size = @buff.size - @readed return if payload_size <= 0 read(payload_size) end
rewind(x = nil)
click to toggle source
回退若干字节,方便处理非连续段
# File lib/pulsar_sdk/protocol/structure.rb, line 55 def rewind(x = nil) return @readed = 0 if x.nil? @readed -= x end
Private Instance Methods
read(size, unpack = nil)
click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 83 def read(size, unpack = nil) bytes = @buff[@readed..(@readed + size - 1)] @readed += size return bytes if unpack.nil? || bytes.nil? bytes.unpack(unpack).first end