class PulsarSdk::Producer::Manager
Public Class Methods
new(client, opts)
click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 6 def initialize(client, opts) @topic = opts.topic @producers = init_producer_by(client, opts) @router = opts.router end
Public Instance Methods
close()
click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 39 def close @producers.each(&:close) end
execute(cmd, msg = nil, timeout = nil)
click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 12 def execute(cmd, msg = nil, timeout = nil) raise "cmd expected a Pulsar::Proto::BaseCommand got #{cmd.class}" unless cmd.is_a?(Pulsar::Proto::BaseCommand) real_producer(msg) do |producer| producer.execute(cmd, msg, timeout) end end
execute_async(cmd, msg = nil)
click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 19 def execute_async(cmd, msg = nil) raise "cmd expected a Pulsar::Proto::BaseCommand got #{cmd.class}" unless cmd.is_a?(Pulsar::Proto::BaseCommand) real_producer(msg) do |producer| producer.execute_async(cmd, msg) end end
real_producer(msg) { |producers| ... }
click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 26 def real_producer(msg, &block) if @producers.size.zero? PulsarSdk.logger.warn(__method__){"There is no available producer for topic: 「#{@topic}」, skipping action!"} return end ensure_connection route_index = msg.nil? ? 0 : @router.route(msg.key, @producers.size) yield @producers[route_index] end
Private Instance Methods
ensure_connection()
click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 56 def ensure_connection @producers.each do |producer| next unless producer.disconnect? PulsarSdk.logger.warn('PulsarSdk::Producer::Manager#ensure_connection'){ "connection closed! reconnect now! #{producer.inspect}" } producer.grab_cnx end end
init_producer_by(client, opts)
click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 44 def init_producer_by(client, opts) opts = opts.dup topics = client.partition_topics(@topic) topics.map do |topic| opts.topic = topic PulsarSdk::Producer::Base.new(client, opts).tap do |base| base.grab_cnx end end end