class PulsarSdk::Client::Rpc
Public Class Methods
new(opts)
click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 6 def initialize(opts) raise "opts expected a PulsarSdk::Options::Connection got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Connection) @opts = opts @cnx = ::PulsarSdk::Client::ConnectionPool.new(opts).tap {|x| x.run_checker} @producer_id = 0 @consumer_id = 0 end
Public Instance Methods
close()
click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 44 def close @cnx.close end
connection(logical_addr = nil, physical_addr = nil)
click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 17 def connection(logical_addr = nil, physical_addr = nil) logical_addr ||= @opts.logical_addr @cnx.fetch(logical_addr, physical_addr) end
create_producer(opts)
click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 48 def create_producer(opts) raise "opts expected a PulsarSdk::Options::Producer got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Producer) # FIXME check if connection ready ::PulsarSdk::Producer.create(self, opts) end
create_reader(opts = {})
click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 62 def create_reader(opts = {}) end
lookup(topic)
click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 22 def lookup(topic) @lookup_service ||= ::PulsarSdk::Protocol::Lookup.new(self, @opts.logical_addr) @lookup_service.lookup(topic) end
namespace_topics(namespace)
click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 27 def namespace_topics(namespace) @namespace_service ||= ::PulsarSdk::Protocol::Namespace.new(self) @namespace_service.topics(namespace) end
partition_topics(topic)
click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 32 def partition_topics(topic) ::PulsarSdk::Protocol::Partitioned.new(self, topic)&.partitions || [] end
request(physical_addr, logical_addr, cmd)
click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 36 def request(physical_addr, logical_addr, cmd) connection(physical_addr, logical_addr).request(cmd, nil, true) end
request_any_broker(cmd)
click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 40 def request_any_broker(cmd) connection.request(cmd) end
subscribe(opts)
click to toggle source
# File lib/pulsar_sdk/client/rpc.rb, line 54 def subscribe(opts) raise "opts expected a PulsarSdk::Options::Consumer got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Consumer) # FIXME check if connection ready consumer = ::PulsarSdk::Consumer.create(self, opts) consumer end