class PulsarSdk::Producer::Base

Public Class Methods

new(client, opts) click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 6
def initialize(client, opts)
  @opts = opts
  @client = client
end

Public Instance Methods

close() click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 49
def close
  return if @stoped

  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::CLOSE_PRODUCER,
    close_producer: Pulsar::Proto::CommandCloseProducer.new
  )
  execute(base_cmd) unless disconnect?

  unbind_handler!

  @stoped = true

  @receipt_queue.close
end
disconnect?() click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 65
def disconnect?
  !@established
end
execute(cmd, msg = nil, timeout = nil) click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 28
def execute(cmd, msg = nil, timeout = nil)
  write(cmd, msg, false, timeout)
end
execute_async(cmd, msg = nil) click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 32
def execute_async(cmd, msg = nil)
  write(cmd, msg, true)
end
grab_cnx() click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 11
def grab_cnx
  topic = @opts.topic
  @conn = @client.connection(*@client.lookup(topic))
  @established = true

  @seq_generator = SeqGenerator.new(@conn.seq_generator)
  @producer_id = @seq_generator.new_producer_id

  @producer_name = [@opts.name, @producer_id].join('.')

  @receipt_queue = ReceiptQueue.new

  @stoped = false

  @producer_name = init_producer(topic)
end
receipt() { |receipt_| ... } click to toggle source

获取发送回执 TODO get receipt by sequence_id

# File lib/pulsar_sdk/producer/base.rb, line 38
def receipt
  receipt_ = @receipt_queue.pop.first
  return if receipt_.nil?

  if block_given?
    yield receipt_
  end

  receipt_
end

Private Instance Methods

bind_handler!() click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 116
def bind_handler!
  handler = Proc.new do |send_receipt|
    send_receipt.nil? ? (@established = false) : @receipt_queue.add(send_receipt)
  end
  @conn.producer_handlers.add(@producer_id, handler)
end
filling_message(msg) click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 110
def filling_message(msg)
  return if msg.nil?
  msg.producer_name = @producer_name
  msg
end
init_producer(topic) click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 70
def init_producer(topic)
  bind_handler!

  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::PRODUCER,
    producer: Pulsar::Proto::CommandProducer.new(
      topic: topic
    )
  )
  result = execute(base_cmd)
  result.producer_success.producer_name
end
set_seq_generator(cmd) click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 105
def set_seq_generator(cmd)
  cmd.seq_generator = @seq_generator
  cmd
end
unbind_handler!() click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 123
def unbind_handler!
  @conn.producer_handlers.delete(@producer_id)

  true
end
write(cmd, msg, *args) click to toggle source
# File lib/pulsar_sdk/producer/base.rb, line 83
def write(cmd, msg, *args)
  unless msg.nil? || msg.is_a?(PulsarSdk::Producer::Message)
    raise "msg expected a PulsarSdk::Producer::Message got #{msg.class}"
  end

  grab_cnx if disconnect?

  cmd.seq_generator = @seq_generator

  unless msg.nil?
    msg.producer_name = @producer_name
    msg.sequence_id = @seq_generator.new_sequence_id
  end

  result = @conn.request(set_seq_generator(cmd), filling_message(msg), *args)

  # increase sequence_id when success
  @seq_generator.new_sequence_id(false) unless msg.nil?

  result
end