class PulsarSdk::Consumer::Base

Attributes

consumer_id[R]
topic[R]

Public Class Methods

new(client, message_tracker, opts) click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 6
def initialize(client, message_tracker, opts)
  @opts = opts
  @topic = @opts.topic
  @message_tracker = message_tracker
  @client = client
end

Public Instance Methods

close() click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 65
def close
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::CLOSE_CONSUMER,
    close_consumer: Pulsar::Proto::CommandCloseConsumer.new(
      consumer_id: @consumer_id
    )
  )
  execute(base_cmd) unless disconnect?

  remove_handler!
end
disconnect?() click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 77
def disconnect?
  !@established
end
execute(cmd) click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 81
def execute(cmd)
  write(cmd)
end
execute_async(cmd) click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 85
def execute_async(cmd)
  write(cmd, nil, true)
end
flow() click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 35
def flow
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::FLOW,
    flow: Pulsar::Proto::CommandFlow.new(
      messagePermits: @prefetch
    )
  )

  execute(base_cmd)

  @capacity += @prefetch
end
flow_if_need() click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 60
def flow_if_need
  return if @capacity > 0 && [@prefetch / 2, 1].max.ceil < (@capacity - @fetched)
  flow
end
grab_cnx() click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 13
def grab_cnx
  @prefetch = @opts.prefetch
  @fetched = 0
  @capacity = 0

  @conn = @client.connection(*@client.lookup(@topic))
  @established = true

  @seq_generator = SeqGenerator.new(@conn.seq_generator)

  @consumer_id = @seq_generator.new_consumer_id
  @consumer_name = @opts.name
  @subscription_name = @opts.subscription_name

  result = init_consumer
  @consumer_name = result.consumerName unless result.nil?
end
increase_fetched(n = 1) click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 31
def increase_fetched(n = 1)
  @fetched += n
end
subscription() click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 48
def subscription
  @subscription_name
end
unsubscribe() click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 52
def unsubscribe
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::UNSUBSCRIBE,
    unsubscribe: Pulsar::Proto::CommandUnsubscribe.new
  )
  execute_async(base_cmd)
end

Private Instance Methods

bind_handler!() click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 97
def bind_handler!
  handler = Proc.new do |cmd, meta_and_payload|
    cmd.nil? ? (@established = false) : @message_tracker.receive(cmd, meta_and_payload)
  end
  @conn.consumer_handlers.add(@consumer_id, handler)
end
init_consumer() click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 110
def init_consumer
  bind_handler!

  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::SUBSCRIBE,
    subscribe: Pulsar::Proto::CommandSubscribe.new(
      topic: @opts.topic,
      subscription: @opts.subscription_name,
      subType: @opts.subscription_type,
      consumer_name: @consumer_name,
      replicate_subscription_state: @opts.replicate_subscription_state,
      read_compacted: @opts.read_compacted
    )
  )
  result = execute(base_cmd)

  @message_tracker.add_consumer(self)

  result.consumerStatsResponse
end
remove_handler!() click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 104
def remove_handler!
  @conn.consumer_handlers.delete(@consumer_id)

  true
end
write(cmd, *args) click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 90
def write(cmd, *args)
  grab_cnx if disconnect?
  cmd.seq_generator = @seq_generator

  @conn.request(cmd, *args)
end