class PulsarSdk::Consumer::Manager
Public Class Methods
new(client, opts)
click to toggle source
# File lib/pulsar_sdk/consumer/manager.rb, line 6 def initialize(client, opts) raise "client expected a PulsarSdk::Client::Rpc got #{client.class}" unless client.is_a?(PulsarSdk::Client::Rpc) raise "opts expected a PulsarSdk::Options::Consumer got #{opts.class}" unless opts.is_a?(PulsarSdk::Options::Consumer) @topic = opts.topic @listen_wait = opts.listen_wait @message_tracker = ::PulsarSdk::Consumer::MessageTracker.new(opts.redelivery_delay) @consumers = init_consumer_by(client, opts) @stoped = false end
Public Instance Methods
close()
click to toggle source
# File lib/pulsar_sdk/consumer/manager.rb, line 67 def close PulsarSdk.logger.debug(__method__){"current @stoped #{@stoped} close now!"} return if @stoped @consumers.each(&:close) @stoped = true @message_tracker.close end
flow()
click to toggle source
NOTE some topic maybe have large permits if there is no message
# File lib/pulsar_sdk/consumer/manager.rb, line 22 def flow ensure_connection @consumers.each(&:flow_if_need) end
listen(autoack = false) { |cmd, msg| ... }
click to toggle source
# File lib/pulsar_sdk/consumer/manager.rb, line 44 def listen(autoack = false) raise 'listen require passing a block!!' if !block_given? ensure_connection loop do return if @stoped flow cmd, msg = receive(@listen_wait) return if msg.nil? result = yield cmd, msg if autoack && result == false msg.nack next end msg.ack if autoack end end
receive(timeout = nil)
click to toggle source
if timeout is nil wait until get message
# File lib/pulsar_sdk/consumer/manager.rb, line 39 def receive(timeout = nil) ensure_connection @message_tracker.shift(timeout) end
subscription()
click to toggle source
NOTE all consumers has same name
# File lib/pulsar_sdk/consumer/manager.rb, line 28 def subscription ensure_connection @consumers.find(&:subscription) end
unsubscribe()
click to toggle source
# File lib/pulsar_sdk/consumer/manager.rb, line 33 def unsubscribe ensure_connection @consumers.each(&:unsubscribe) end
Private Instance Methods
ensure_connection()
click to toggle source
# File lib/pulsar_sdk/consumer/manager.rb, line 119 def ensure_connection @consumers.each do |consumer| next unless consumer.disconnect? consumer.grab_cnx end end
init_consumer_by(client, opts)
click to toggle source
# File lib/pulsar_sdk/consumer/manager.rb, line 77 def init_consumer_by(client, opts) topics = [] case when !opts.topic.nil? PulsarSdk.logger.debug("#{__method__}:single topic"){opts.topic} topics << ::PulsarSdk::Protocol::Topic.parse(opts.topic).to_s when !Array(opts.topics).size.zero? PulsarSdk.logger.debug("#{__method__}:multiple topics"){opts.topics} opts.topics.each do |topic| topics << ::PulsarSdk::Protocol::Topic.parse(topic).to_s end when !opts.topics_pattern.nil? PulsarSdk.logger.debug("#{__method__}:pattern topic"){opts.topics_pattern} tn = ::PulsarSdk::Protocol::Topic.parse(opts.topics_pattern) pattern = Regexp.compile(tn.topic == '*' ? '^*' : tn.topic) client.namespace_topics(tn.namespace).each do |topic| topics << topic if pattern.match(topic) end else raise 'You must provide one topic by 「topic」or「topics」or「topics_pattern」' end PulsarSdk.logger.debug("#{__method__}:topics to initialize"){topics} topics.flat_map do |topic| partition_topics = client.partition_topics(topic) partition_topics.map do |x| opts_ = opts.dup opts_.topic = x PulsarSdk::Consumer::Base.new(client, @message_tracker, opts_).tap do |consumer| consumer.grab_cnx end end end end