class PulsarSdk::Client::Rpc

Public Class Methods

new(opts) click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 6
def initialize(opts)
  raise "opts expected a PulsarSdk::Options::Connection got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Connection)

  @opts = opts

  @cnx = ::PulsarSdk::Client::ConnectionPool.new(opts).tap {|x| x.run_checker}

  @producer_id = 0
  @consumer_id = 0
end

Public Instance Methods

close() click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 44
def close
  @cnx.close
end
connection(logical_addr = nil, physical_addr = nil) click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 17
def connection(logical_addr = nil, physical_addr = nil)
  logical_addr ||= @opts.logical_addr
  @cnx.fetch(logical_addr, physical_addr)
end
create_producer(opts) click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 48
def create_producer(opts)
  raise "opts expected a PulsarSdk::Options::Producer got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Producer)
  # FIXME check if connection ready
  ::PulsarSdk::Producer.create(self, opts)
end
create_reader(opts = {}) click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 62
def create_reader(opts = {})

end
lookup(topic) click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 22
def lookup(topic)
  @lookup_service ||= ::PulsarSdk::Protocol::Lookup.new(self, @opts.logical_addr)
  @lookup_service.lookup(topic)
end
namespace_topics(namespace) click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 27
def namespace_topics(namespace)
  @namespace_service ||= ::PulsarSdk::Protocol::Namespace.new(self)
  @namespace_service.topics(namespace)
end
partition_topics(topic) click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 32
def partition_topics(topic)
  ::PulsarSdk::Protocol::Partitioned.new(self, topic)&.partitions || []
end
request(physical_addr, logical_addr, cmd) click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 36
def request(physical_addr, logical_addr, cmd)
  connection(physical_addr, logical_addr).request(cmd, nil, true)
end
request_any_broker(cmd) click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 40
def request_any_broker(cmd)
  connection.request(cmd)
end
subscribe(opts) click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 54
def subscribe(opts)
  raise "opts expected a PulsarSdk::Options::Consumer got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Consumer)
  # FIXME check if connection ready
  consumer = ::PulsarSdk::Consumer.create(self, opts)

  consumer
end