class PulsarSdk::Client::Connection
Constants
- CLIENT_NAME
- PROTOCOL_VER
Attributes
consumer_handlers[R]
producer_handlers[R]
response_container[R]
seq_generator[R]
Public Class Methods
establish(opts)
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 36 def self.establish(opts) conn = new(opts).tap do |c| c.start end # TODO check connection ready conn end
new(opts)
click to toggle source
opts PulsarSdk::Options::Connection
# File lib/pulsar_sdk/client/connection.rb, line 17 def initialize(opts) @conn_options = opts @socket = nil @state = Status.new @seq_generator = SeqGenerator.new @consumer_handlers = ConsumerHandler.new @producer_handlers = ProducerHandler.new @response_container = ResponseContainer.new end
Public Instance Methods
active_status()
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 69 def active_status [@state.last_ping_at, @state.last_received_at] end
close()
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 44 def close @state.closed! consumer_handlers.each{|_k, v| v.call} producer_handlers.each{|_k, v| v.call} Timeout::timeout(2) {@pong&.join} rescue @pong&.kill @pong&.join ensure @socket.close end
closed?()
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 54 def closed? @state.closed? end
ping()
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 58 def ping base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::PING, ping: Pulsar::Proto::CommandPing.new ) request(base_cmd, nil, true) @state.ping! end
request(cmd, msg = nil, async = false, timeout = nil)
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 73 def request(cmd, msg = nil, async = false, timeout = nil) raise 'connection was closed!' if closed? cmd.seq_generator ||= @seq_generator # NOTE try to auto set *_id cmd.handle_ids frame = PulsarSdk::Protocol::Frame.encode(cmd, msg) write(frame) return true if async if request_id = cmd.get_request_id return @response_container.delete(request_id, timeout) end true end
start()
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 30 def start unless connect && do_hand_shake && listen @state.closed! end end
Private Instance Methods
connect()
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 126 def connect return true if (@socket && !closed?) @socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) @socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true) host_port = @conn_options.port_and_host_from(:logical_addr) sockaddr = Socket.sockaddr_in(*host_port) begin # Initiate the socket connection in the background. If it doesn't fail # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS) # indicating the connection is in progress. @socket.connect_nonblock(sockaddr) rescue IO::WaitWritable # IO.select will block until the socket is writable or the timeout # is exceeded, whichever comes first. unless IO.select(nil, [@socket], nil, @conn_options.connection_timeout) # IO.select returns nil when the socket is not ready before timeout # seconds have elapsed @socket.close return false end begin # Verify there is now a good connection. @socket.connect_nonblock(sockaddr) rescue Errno::EISCONN # The socket is connected, we're good! end end @state.tcp_connected! true end
do_hand_shake()
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 164 def do_hand_shake base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::CONNECT, connect: Pulsar::Proto::CommandConnect.new( client_version: CLIENT_NAME, protocol_version: PROTOCOL_VER, proxy_to_broker_url: @conn_options.proxy_to_broker_url ) ) request(base_cmd) @state.ready! true end
handle_base_command(cmd, payload)
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 189 def handle_base_command(cmd, payload) PulsarSdk.logger.debug(__method__){cmd.type} unless cmd.typeof_ping? case when cmd.typeof_success? handle_response(cmd) when cmd.typeof_connected? PulsarSdk.logger.info(__method__){"#{cmd.type}: #{cmd.connected}"} when cmd.typeof_producer_success? handle_response(cmd) when cmd.typeof_lookup_response? handle_response(cmd) when cmd.typeof_get_last_message_id_response? handle_response(cmd) when cmd.typeof_consumer_stats_response? handle_response(cmd) when cmd.typeof_reached_end_of_topic? # TODO notify consumer no more message when cmd.typeof_get_topics_of_namespace_response? handle_response(cmd) when cmd.typeof_get_schema_response? when cmd.typeof_partitioned_metadata_response? handle_response(cmd) when cmd.typeof_error? PulsarSdk.logger.error(__method__){"#{cmd.error}: #{cmd.message}"} when cmd.typeof_close_producer? producer_id = cmd.close_producer.producer_id producer_handlers.find(producer_id)&.call when cmd.typeof_close_consumer? consumer_id = cmd.close_consumer.consumer_id consumer_handlers.find(consumer_id)&.call when cmd.typeof_active_consumer_change? when cmd.typeof_message? handle_message(cmd, payload) when cmd.typeof_send_receipt? handle_send_receipt(cmd) when cmd.typeof_ping? handle_ping when cmd.typeof_pong? else close raise "Received invalid command type: #{cmd.type}" end true end
handle_message(cmd, payload)
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 258 def handle_message(cmd, payload) consumer_id = cmd.get_consumer_id if consumer_id.nil? ::PulsarSdk.logger.warn(__method__){"can not get consumer id from cmd: #{cmd.inspect}"} return end handler = consumer_handlers.find(consumer_id) if handler.nil? ::PulsarSdk.logger.warn(__method__){"can not get consumer_handler from cmd: #{cmd.inspect}"} return end handler.call(cmd, payload) end
handle_ping()
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 280 def handle_ping base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::PONG, pong: Pulsar::Proto::CommandPong.new ) request(base_cmd, nil, true) end
handle_response(cmd)
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 252 def handle_response(cmd) request_id = cmd.get_request_id return if request_id.nil? @response_container.add(request_id, cmd) end
handle_send_receipt(cmd)
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 272 def handle_send_receipt(cmd) send_receipt = cmd.send_receipt producer_id = send_receipt.producer_id handler = producer_handlers.find(producer_id) return if handler.nil? handler.call(send_receipt) end
listen()
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 106 def listen @pong = Thread.new do loop do break if closed? begin @state.ready? ? read_from_connection : @state.wait rescue Errno::ETIMEDOUT # read timeout, do nothing rescue => e PulsarSdk.logger.error("reader error") {e} close end end end @pong.abort_on_exception = false true end
read_from_connection()
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 180 def read_from_connection base_cmd, meta_and_payload = reader.read_fully return if base_cmd.nil? @state.received! handle_base_command(base_cmd, meta_and_payload) end
reader()
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 93 def reader @reader ||= PulsarSdk::Protocol::Reader.new(@socket) end
write(bytes)
click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 97 def write(bytes) begin @socket.write_nonblock(bytes) rescue IO::WaitWritable IO.select(nil, [@socket], nil, @conn_options.operation_timeout) retry end end