class PulsarSdk::Producer::Manager

Public Class Methods

new(client, opts) click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 6
def initialize(client, opts)
  @topic = opts.topic
  @producers = init_producer_by(client, opts)
  @router = opts.router
end

Public Instance Methods

close() click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 39
def close
  @producers.each(&:close)
end
execute(cmd, msg = nil, timeout = nil) click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 12
def execute(cmd, msg = nil, timeout = nil)
  raise "cmd expected a Pulsar::Proto::BaseCommand got #{cmd.class}" unless cmd.is_a?(Pulsar::Proto::BaseCommand)
  real_producer(msg) do |producer|
    producer.execute(cmd, msg, timeout)
  end
end
execute_async(cmd, msg = nil) click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 19
def execute_async(cmd, msg = nil)
  raise "cmd expected a Pulsar::Proto::BaseCommand got #{cmd.class}" unless cmd.is_a?(Pulsar::Proto::BaseCommand)
  real_producer(msg) do |producer|
    producer.execute_async(cmd, msg)
  end
end
real_producer(msg) { |producers| ... } click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 26
def real_producer(msg, &block)
  if @producers.size.zero?
    PulsarSdk.logger.warn(__method__){"There is no available producer for topic: 「#{@topic}」, skipping action!"}
    return
  end

  ensure_connection

  route_index = msg.nil? ? 0 : @router.route(msg.key, @producers.size)

  yield @producers[route_index]
end

Private Instance Methods

ensure_connection() click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 56
def ensure_connection
  @producers.each do |producer|
    next unless producer.disconnect?
    PulsarSdk.logger.warn('PulsarSdk::Producer::Manager#ensure_connection'){
      "connection closed! reconnect now! #{producer.inspect}"
    }
    producer.grab_cnx
  end
end
init_producer_by(client, opts) click to toggle source
# File lib/pulsar_sdk/producer/manager.rb, line 44
def init_producer_by(client, opts)
  opts = opts.dup

  topics = client.partition_topics(@topic)
  topics.map do |topic|
    opts.topic = topic
    PulsarSdk::Producer::Base.new(client, opts).tap do |base|
      base.grab_cnx
    end
  end
end