class PulsarSdk::Protocol::Structure

Constants

CHECKSUM_LEN
MAGIC_NUMBER
MAGIC_NUMBER
CHECKSUM
METADATA_SIZE
METADATA
PAYLOAD
MAGIC_NUMBER_LEN
METADATA_SIZE_LEN

Public Class Methods

new(buff) click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 12
def initialize(buff)
  @buff = buff
  rewind
end

Public Instance Methods

decode() click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 17
def decode
  metadata = nil

  message = PulsarSdk::Protocol::Message.new

  mn_bytes = read_magic_number
  if mn_bytes == MAGIC_NUMBER
    _checksum = read_checksum
    # TODO 可能需要校验一下,防止错误消息
    metadata = read_metadata
  else
    rewind(MAGIC_NUMBER_LEN)
    metadata = read_metadata
  end

  msg = read_remaining

  # NOTE 同为Ruby SDK时可以根据Content-Type预先还原
  # 复杂类型依旧为string,需要特别注意
  metadata.properties.each do |x|
    next unless x.key.to_s =~ /Content-Type/i
    next unless x.value.to_s =~ /json/i
    PulsarSdk.logger.info("#{self.class}::#{__method__}"){"Found json encode remark, parse JSON mesaage!"}
    msg = JSON.parse(msg)
  end

  message.assign_attributes(
    publish_time: metadata.publish_time,
    event_time: metadata.event_time,
    partition_key: metadata.partition_key,
    properties: metadata.properties,
    payload: msg
  )

  message
end
read_checksum() click to toggle source

crc32

# File lib/pulsar_sdk/protocol/structure.rb, line 66
def read_checksum
  read(CHECKSUM_LEN)
end
read_magic_number() click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 61
def read_magic_number
  read(MAGIC_NUMBER_LEN)
end
read_metadata() click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 70
def read_metadata
  metadata_size = read(METADATA_SIZE_LEN, 'N')
  metadata_bytes = read(metadata_size)
  Pulsar::Proto::MessageMetadata.decode(metadata_bytes)
end
read_remaining() click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 76
def read_remaining
  payload_size = @buff.size - @readed
  return if payload_size <= 0
  read(payload_size)
end
rewind(x = nil) click to toggle source

回退若干字节,方便处理非连续段

# File lib/pulsar_sdk/protocol/structure.rb, line 55
def rewind(x = nil)
  return @readed = 0 if x.nil?

  @readed -= x
end

Private Instance Methods

read(size, unpack = nil) click to toggle source
# File lib/pulsar_sdk/protocol/structure.rb, line 83
def read(size, unpack = nil)
  bytes = @buff[@readed..(@readed + size - 1)]
  @readed += size

  return bytes if unpack.nil? || bytes.nil?

  bytes.unpack(unpack).first
end