class PulsarSdk::Protocol::Frame
Constants
- CHECKSUM_SIZE
- MAGIC_NUMBER
- PREPENDED_SIZE
预留4byte存放帧长度
Public Class Methods
binary(*obj)
click to toggle source
# File lib/pulsar_sdk/protocol/frame.rb, line 42 def self.binary(*obj) obj.map { |x| Array(x).pack('N') }.join end
crc32(bytes)
click to toggle source
# File lib/pulsar_sdk/protocol/frame.rb, line 46 def self.crc32(bytes) crc = Digest::CRC32c.new crc << bytes crc.checksum end
decode(byte)
click to toggle source
# File lib/pulsar_sdk/protocol/frame.rb, line 38 def self.decode(byte) Pulsar::Proto::BaseCommand.decode(byte) end
encode(command, message = nil)
click to toggle source
# File lib/pulsar_sdk/protocol/frame.rb, line 11 def self.encode(command, message = nil) raise "command MUST be Pulsar::Proto::BaseCommand but got #{command.class}" unless command.is_a?(Pulsar::Proto::BaseCommand) pb_cmd = command.to_proto # 非发送消息帧 return encode_command(pb_cmd) if message.nil? # 消息发送帧 # [TOTAL_SIZE] [CMD_SIZE] [CMD] [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD] raise "message MUST be PulsarSdk::Producer::Message but got #{message.class}" unless message.is_a?(PulsarSdk::Producer::Message) metadata = message.metadata pb_meta = metadata.to_proto meta_payload = binary(pb_meta.size) + pb_meta + message.binary_string checksum = crc32(meta_payload) total_size = PREPENDED_SIZE + pb_cmd.size + MAGIC_NUMBER.size + CHECKSUM_SIZE + meta_payload.size binary(total_size, pb_cmd.size) + pb_cmd + MAGIC_NUMBER + binary(checksum) + meta_payload end
encode_command(pb_cmd)
click to toggle source
# File lib/pulsar_sdk/protocol/frame.rb, line 34 def self.encode_command(pb_cmd) binary(pb_cmd.size + PREPENDED_SIZE, pb_cmd.size) + pb_cmd end