class PulsarSdk::Protocol::Partitioned

Public Class Methods

new(client, topic) click to toggle source
# File lib/pulsar_sdk/protocol/partitioned.rb, line 4
def initialize(client, topic)
  @client = client
  @tn = ::PulsarSdk::Protocol::Topic.parse(topic)
end

Public Instance Methods

partitioned?() click to toggle source

当前topic是否是分区topic

# File lib/pulsar_sdk/protocol/partitioned.rb, line 27
def partitioned?
  topic_metadata.partitions > 0
end
partitions() click to toggle source
# File lib/pulsar_sdk/protocol/partitioned.rb, line 9
def partitions
  pmr = topic_metadata

  if !success_response?(pmr)
    PulsarSdk.logger.error(__method__){"Get topic partitioned metadata failed, #{pmr.error}: #{pmr.message}"}
    return []
  end

  return [@tn.to_s] if pmr.partitions.zero?

  tn = @tn.dup
  (0..pmr.partitions).map do |i|
    tn.partition = i
    tn.to_s
  end
end

Private Instance Methods

success_response?(pmr) click to toggle source
# File lib/pulsar_sdk/protocol/partitioned.rb, line 32
def success_response?(pmr)
  result = false
  Pulsar::Proto::CommandPartitionedTopicMetadataResponse::LookupType.tap do |x|
    result = x.resolve(pmr.response) == x.const_get(:Success)
  end

  result
end
topic_metadata() click to toggle source
# File lib/pulsar_sdk/protocol/partitioned.rb, line 41
def topic_metadata
  return @topic_metadata_ unless @topic_metadata_.nil?

  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::PARTITIONED_METADATA,
    partitionMetadata: Pulsar::Proto::CommandPartitionedTopicMetadata.new(
      topic: @tn.to_s
    )
  )
  @topic_metadata_ = @client.request_any_broker(base_cmd).partitionMetadataResponse
end