class PulsarSdk::Protocol::Lookup

Constants

MAX_LOOKUP_TIMES

Public Class Methods

new(client, service_url) click to toggle source
# File lib/pulsar_sdk/protocol/lookup.rb, line 6
def initialize(client, service_url)
  @client = client
  @service_url = service_url
end

Public Instance Methods

lookup(topic) click to toggle source

output

[logical_addr, physical_addr]
# File lib/pulsar_sdk/protocol/lookup.rb, line 13
def lookup(topic)
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::LOOKUP,
    lookupTopic: Pulsar::Proto::CommandLookupTopic.new(
      topic: topic,
      authoritative: false
    )
  )
  resp = @client.request_any_broker(base_cmd).lookupTopicResponse

  # 最多查找这么多次
  MAX_LOOKUP_TIMES.times do
    case Pulsar::Proto::CommandLookupTopicResponse::LookupType.resolve(resp.response)
    when Pulsar::Proto::CommandLookupTopicResponse::LookupType::Failed
      PulsarSdk.logger.error(__method__){"Failed to lookup topic 「#{topic}」, #{resp.error}"}
      break
    when Pulsar::Proto::CommandLookupTopicResponse::LookupType::Redirect
      logical_addr, physical_addr = extract_addr(resp)
      base_cmd = Pulsar::Proto::BaseCommand.new(
        type: Pulsar::Proto::BaseCommand::Type::LOOKUP,
        lookupTopic: Pulsar::Proto::CommandLookupTopic.new(
          topic: topic,
          authoritative: resp.authoritative
        )
      )
      # NOTE 从连接池拿
      resp = @client.request(logical_addr, physical_addr, base_cmd).lookupTopicResponse
    when Pulsar::Proto::CommandLookupTopicResponse::LookupType::Connect
      return extract_addr(resp)
    end
  end
end

Private Instance Methods

extract_addr(resp) click to toggle source
# File lib/pulsar_sdk/protocol/lookup.rb, line 47
def extract_addr(resp)
  logical_addr = resp.brokerServiceUrl
  physical_addr = resp.proxy_through_service_url ? @service_url : logical_addr

  [logical_addr, physical_addr]
end