class PulsarSdk::Consumer::Base
Attributes
consumer_id[R]
topic[R]
Public Class Methods
new(client, message_tracker, opts)
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 6 def initialize(client, message_tracker, opts) @opts = opts @topic = @opts.topic @message_tracker = message_tracker @client = client end
Public Instance Methods
close()
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 65 def close base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::CLOSE_CONSUMER, close_consumer: Pulsar::Proto::CommandCloseConsumer.new( consumer_id: @consumer_id ) ) execute(base_cmd) unless disconnect? remove_handler! end
disconnect?()
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 77 def disconnect? !@established end
execute(cmd)
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 81 def execute(cmd) write(cmd) end
execute_async(cmd)
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 85 def execute_async(cmd) write(cmd, nil, true) end
flow()
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 35 def flow base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::FLOW, flow: Pulsar::Proto::CommandFlow.new( messagePermits: @prefetch ) ) execute(base_cmd) @capacity += @prefetch end
flow_if_need()
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 60 def flow_if_need return if @capacity > 0 && [@prefetch / 2, 1].max.ceil < (@capacity - @fetched) flow end
grab_cnx()
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 13 def grab_cnx @prefetch = @opts.prefetch @fetched = 0 @capacity = 0 @conn = @client.connection(*@client.lookup(@topic)) @established = true @seq_generator = SeqGenerator.new(@conn.seq_generator) @consumer_id = @seq_generator.new_consumer_id @consumer_name = @opts.name @subscription_name = @opts.subscription_name result = init_consumer @consumer_name = result.consumerName unless result.nil? end
increase_fetched(n = 1)
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 31 def increase_fetched(n = 1) @fetched += n end
subscription()
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 48 def subscription @subscription_name end
unsubscribe()
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 52 def unsubscribe base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::UNSUBSCRIBE, unsubscribe: Pulsar::Proto::CommandUnsubscribe.new ) execute_async(base_cmd) end
Private Instance Methods
bind_handler!()
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 97 def bind_handler! handler = Proc.new do |cmd, meta_and_payload| cmd.nil? ? (@established = false) : @message_tracker.receive(cmd, meta_and_payload) end @conn.consumer_handlers.add(@consumer_id, handler) end
init_consumer()
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 110 def init_consumer bind_handler! base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::SUBSCRIBE, subscribe: Pulsar::Proto::CommandSubscribe.new( topic: @opts.topic, subscription: @opts.subscription_name, subType: @opts.subscription_type, consumer_name: @consumer_name, replicate_subscription_state: @opts.replicate_subscription_state, read_compacted: @opts.read_compacted ) ) result = execute(base_cmd) @message_tracker.add_consumer(self) result.consumerStatsResponse end
remove_handler!()
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 104 def remove_handler! @conn.consumer_handlers.delete(@consumer_id) true end
write(cmd, *args)
click to toggle source
# File lib/pulsar_sdk/consumer/base.rb, line 90 def write(cmd, *args) grab_cnx if disconnect? cmd.seq_generator = @seq_generator @conn.request(cmd, *args) end