class PulsarSdk::Producer::Message

Attributes

key[R]
message[R]
metadata[R]

Public Class Methods

new(msg, metadata = nil, key = nil) click to toggle source
# File lib/pulsar_sdk/producer/message.rb, line 8
def initialize(msg, metadata = nil, key = nil)
  # TODO check metadata type
  @message, @metadata = msg, metadata
  @metadata ||= Pulsar::Proto::MessageMetadata.new

  # msg must convet to string
  json_encode! unless @message.is_a?(String)

  publish_time = @metadata.publish_time
  @metadata.publish_time = publish_time.zero? ? TimeX.now.timestamp : publish_time

  self.key = key
end

Public Instance Methods

binary_string() click to toggle source
# File lib/pulsar_sdk/producer/message.rb, line 30
def binary_string
  @message.bytes.pack('C*')
end
key=(v, b64 = false) click to toggle source
# File lib/pulsar_sdk/producer/message.rb, line 34
def key=(v, b64 = false)
  @metadata.partition_key = v.to_s
  @metadata.partition_key_b64_encoded = b64
end
producer_name=(v) click to toggle source
# File lib/pulsar_sdk/producer/message.rb, line 22
def producer_name=(v)
  @metadata.producer_name = v
end
sequence_id=(v) click to toggle source
# File lib/pulsar_sdk/producer/message.rb, line 26
def sequence_id=(v)
  @metadata.sequence_id = v
end

Private Instance Methods

json_encode!() click to toggle source
# File lib/pulsar_sdk/producer/message.rb, line 40
def json_encode!
  PulsarSdk.logger.info("#{self.class}::#{__method__}"){"message was 「#{@message.class}」 now encode to json!"}
  @message = @message.respond_to?(:to_json) ? @message.to_json : JSON.dump(@message)
  @metadata.properties << Pulsar::Proto::KeyValue.new(key: 'Content-Type', value: 'application/json; charset=utf-8')
end