class PulsarSdk::Producer::Base
Public Class Methods
new(client, opts)
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 6 def initialize(client, opts) @opts = opts @client = client end
Public Instance Methods
close()
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 49 def close return if @stoped base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::CLOSE_PRODUCER, close_producer: Pulsar::Proto::CommandCloseProducer.new ) execute(base_cmd) unless disconnect? unbind_handler! @stoped = true @receipt_queue.close end
disconnect?()
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 65 def disconnect? !@established end
execute(cmd, msg = nil, timeout = nil)
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 28 def execute(cmd, msg = nil, timeout = nil) write(cmd, msg, false, timeout) end
execute_async(cmd, msg = nil)
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 32 def execute_async(cmd, msg = nil) write(cmd, msg, true) end
grab_cnx()
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 11 def grab_cnx topic = @opts.topic @conn = @client.connection(*@client.lookup(topic)) @established = true @seq_generator = SeqGenerator.new(@conn.seq_generator) @producer_id = @seq_generator.new_producer_id @producer_name = [@opts.name, @producer_id].join('.') @receipt_queue = ReceiptQueue.new @stoped = false @producer_name = init_producer(topic) end
receipt() { |receipt_| ... }
click to toggle source
获取发送回执 TODO get receipt by sequence_id
# File lib/pulsar_sdk/producer/base.rb, line 38 def receipt receipt_ = @receipt_queue.pop.first return if receipt_.nil? if block_given? yield receipt_ end receipt_ end
Private Instance Methods
bind_handler!()
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 116 def bind_handler! handler = Proc.new do |send_receipt| send_receipt.nil? ? (@established = false) : @receipt_queue.add(send_receipt) end @conn.producer_handlers.add(@producer_id, handler) end
filling_message(msg)
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 110 def filling_message(msg) return if msg.nil? msg.producer_name = @producer_name msg end
init_producer(topic)
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 70 def init_producer(topic) bind_handler! base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::PRODUCER, producer: Pulsar::Proto::CommandProducer.new( topic: topic ) ) result = execute(base_cmd) result.producer_success.producer_name end
set_seq_generator(cmd)
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 105 def set_seq_generator(cmd) cmd.seq_generator = @seq_generator cmd end
unbind_handler!()
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 123 def unbind_handler! @conn.producer_handlers.delete(@producer_id) true end
write(cmd, msg, *args)
click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 83 def write(cmd, msg, *args) unless msg.nil? || msg.is_a?(PulsarSdk::Producer::Message) raise "msg expected a PulsarSdk::Producer::Message got #{msg.class}" end grab_cnx if disconnect? cmd.seq_generator = @seq_generator unless msg.nil? msg.producer_name = @producer_name msg.sequence_id = @seq_generator.new_sequence_id end result = @conn.request(set_seq_generator(cmd), filling_message(msg), *args) # increase sequence_id when success @seq_generator.new_sequence_id(false) unless msg.nil? result end