class PulsarSdk::Client::Connection

Constants

CLIENT_NAME
PROTOCOL_VER

Attributes

consumer_handlers[R]
producer_handlers[R]
response_container[R]
seq_generator[R]

Public Class Methods

establish(opts) click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 36
def self.establish(opts)
  conn = new(opts).tap do |c|
    c.start
  end
  # TODO check connection ready
  conn
end
new(opts) click to toggle source

opts PulsarSdk::Options::Connection

# File lib/pulsar_sdk/client/connection.rb, line 17
def initialize(opts)
  @conn_options = opts

  @socket = nil
  @state = Status.new

  @seq_generator = SeqGenerator.new

  @consumer_handlers = ConsumerHandler.new
  @producer_handlers = ProducerHandler.new
  @response_container = ResponseContainer.new
end

Public Instance Methods

active_status() click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 69
def active_status
  [@state.last_ping_at, @state.last_received_at]
end
close() click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 44
def close
  @state.closed!
  consumer_handlers.each{|_k, v| v.call}
  producer_handlers.each{|_k, v| v.call}
  Timeout::timeout(2) {@pong&.join} rescue @pong&.kill
  @pong&.join
ensure
  @socket.close
end
closed?() click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 54
def closed?
  @state.closed?
end
ping() click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 58
def ping
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::PING,
    ping: Pulsar::Proto::CommandPing.new
  )

  request(base_cmd, nil, true)

  @state.ping!
end
request(cmd, msg = nil, async = false, timeout = nil) click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 73
def request(cmd, msg = nil, async = false, timeout = nil)
  raise 'connection was closed!' if closed?

  cmd.seq_generator ||= @seq_generator

  # NOTE try to auto set *_id
  cmd.handle_ids

  frame = PulsarSdk::Protocol::Frame.encode(cmd, msg)
  write(frame)
  return true if async

  if request_id = cmd.get_request_id
    return @response_container.delete(request_id, timeout)
  end

  true
end
start() click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 30
def start
  unless connect && do_hand_shake && listen
    @state.closed!
  end
end

Private Instance Methods

connect() click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 126
def connect
  return true if (@socket && !closed?)

  @socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
  @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
  @socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true)

  host_port = @conn_options.port_and_host_from(:logical_addr)

  sockaddr = Socket.sockaddr_in(*host_port)
  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    @socket.connect_nonblock(sockaddr)
  rescue IO::WaitWritable
    # IO.select will block until the socket is writable or the timeout
    # is exceeded, whichever comes first.
    unless IO.select(nil, [@socket], nil, @conn_options.connection_timeout)
      # IO.select returns nil when the socket is not ready before timeout
      # seconds have elapsed
      @socket.close
      return false
    end

    begin
      # Verify there is now a good connection.
      @socket.connect_nonblock(sockaddr)
    rescue Errno::EISCONN
      # The socket is connected, we're good!
    end
  end

  @state.tcp_connected!

  true
end
do_hand_shake() click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 164
def do_hand_shake
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::CONNECT,
    connect: Pulsar::Proto::CommandConnect.new(
      client_version: CLIENT_NAME,
      protocol_version: PROTOCOL_VER,
      proxy_to_broker_url: @conn_options.proxy_to_broker_url
    )
  )

  request(base_cmd)

  @state.ready!
  true
end
handle_base_command(cmd, payload) click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 189
def handle_base_command(cmd, payload)
  PulsarSdk.logger.debug(__method__){cmd.type} unless cmd.typeof_ping?

  case
  when cmd.typeof_success?
    handle_response(cmd)

  when cmd.typeof_connected?
    PulsarSdk.logger.info(__method__){"#{cmd.type}: #{cmd.connected}"}

  when cmd.typeof_producer_success?
    handle_response(cmd)

  when cmd.typeof_lookup_response?
    handle_response(cmd)

  when cmd.typeof_get_last_message_id_response?
    handle_response(cmd)

  when cmd.typeof_consumer_stats_response?
    handle_response(cmd)

  when cmd.typeof_reached_end_of_topic?
    # TODO notify consumer no more message

  when cmd.typeof_get_topics_of_namespace_response?
    handle_response(cmd)

  when cmd.typeof_get_schema_response?
  when cmd.typeof_partitioned_metadata_response?
    handle_response(cmd)

  when cmd.typeof_error?
    PulsarSdk.logger.error(__method__){"#{cmd.error}: #{cmd.message}"}

  when cmd.typeof_close_producer?
    producer_id = cmd.close_producer.producer_id
    producer_handlers.find(producer_id)&.call

  when cmd.typeof_close_consumer?
    consumer_id = cmd.close_consumer.consumer_id
    consumer_handlers.find(consumer_id)&.call

  when cmd.typeof_active_consumer_change?
  when cmd.typeof_message?
    handle_message(cmd, payload)

  when cmd.typeof_send_receipt?
    handle_send_receipt(cmd)

  when cmd.typeof_ping?
    handle_ping

  when cmd.typeof_pong?

  else
    close
    raise "Received invalid command type: #{cmd.type}"
  end

  true
end
handle_message(cmd, payload) click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 258
def handle_message(cmd, payload)
  consumer_id = cmd.get_consumer_id
  if consumer_id.nil?
    ::PulsarSdk.logger.warn(__method__){"can not get consumer id from cmd: #{cmd.inspect}"}
    return
  end
  handler = consumer_handlers.find(consumer_id)
  if handler.nil?
    ::PulsarSdk.logger.warn(__method__){"can not get consumer_handler from cmd: #{cmd.inspect}"}
    return
  end
  handler.call(cmd, payload)
end
handle_ping() click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 280
def handle_ping
  base_cmd = Pulsar::Proto::BaseCommand.new(
    type: Pulsar::Proto::BaseCommand::Type::PONG,
    pong: Pulsar::Proto::CommandPong.new
  )

  request(base_cmd, nil, true)
end
handle_response(cmd) click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 252
def handle_response(cmd)
  request_id = cmd.get_request_id
  return if request_id.nil?
  @response_container.add(request_id, cmd)
end
handle_send_receipt(cmd) click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 272
def handle_send_receipt(cmd)
  send_receipt = cmd.send_receipt
  producer_id = send_receipt.producer_id
  handler = producer_handlers.find(producer_id)
  return if handler.nil?
  handler.call(send_receipt)
end
listen() click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 106
def listen
  @pong = Thread.new do
    loop do
      break if closed?

      begin
        @state.ready? ? read_from_connection : @state.wait
      rescue Errno::ETIMEDOUT
        # read timeout, do nothing
      rescue => e
        PulsarSdk.logger.error("reader error") {e}
        close
      end
    end
  end
  @pong.abort_on_exception = false

  true
end
read_from_connection() click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 180
def read_from_connection
  base_cmd, meta_and_payload = reader.read_fully
  return if base_cmd.nil?

  @state.received!

  handle_base_command(base_cmd, meta_and_payload)
end
reader() click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 93
def reader
  @reader ||= PulsarSdk::Protocol::Reader.new(@socket)
end
write(bytes) click to toggle source
# File lib/pulsar_sdk/client/connection.rb, line 97
def write(bytes)
  begin
    @socket.write_nonblock(bytes)
  rescue IO::WaitWritable
    IO.select(nil, [@socket], nil, @conn_options.operation_timeout)
    retry
  end
end