class Protobuf::Rpc::Connectors::Zmq

Constants

CLIENT_RETRIES

Class Constants

RequestTimeout
ZmqEagainError
ZmqRecoverableError

Public Class Methods

ping_port_responses() click to toggle source

Class Methods

# File lib/protobuf/rpc/connectors/zmq.rb, line 27
def self.ping_port_responses
  @ping_port_responses ||= ::ThreadSafe::Cache.new
end
zmq_context() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 31
def self.zmq_context
  @zmq_contexts ||= Hash.new do |hash, key|
    hash[key] = ZMQ::Context.new
  end

  @zmq_contexts[Process.pid]
end

Public Instance Methods

log_signature() click to toggle source

Instance methods

# File lib/protobuf/rpc/connectors/zmq.rb, line 42
def log_signature
  @_log_signature ||= "[client-#{self.class}]"
end
send_request() click to toggle source

Start the request/response cycle. We implement the Lazy Pirate req/reply reliability pattern as laid out in the ZMQ Guide, Chapter 4.

@see zguide.zeromq.org/php:chapter4#Client-side-Reliability-Lazy-Pirate-Pattern

# File lib/protobuf/rpc/connectors/zmq.rb, line 51
def send_request
  setup_connection
  send_request_with_lazy_pirate unless error?
end

Private Instance Methods

check_available_rcv_timeout() click to toggle source

Private Instance methods

# File lib/protobuf/rpc/connectors/zmq.rb, line 61
def check_available_rcv_timeout
  @check_available_rcv_timeout ||= begin
    case
    when ENV.key?("PB_ZMQ_CLIENT_CHECK_AVAILABLE_RCV_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_RCV_TIMEOUT"].to_i
    else
      200 # ms
    end
  end
end
check_available_snd_timeout() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 72
def check_available_snd_timeout
  @check_available_snd_timeout ||= begin
    case
    when ENV.key?("PB_ZMQ_CLIENT_CHECK_AVAILABLE_SND_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_SND_TIMEOUT"].to_i
    else
      200 # ms
    end
  end
end
close_connection() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 83
def close_connection
  # The socket is automatically closed after every request.
end
create_socket() click to toggle source

Create a socket connected to a server that can handle the current service. The LINGER is set to 0 so we can close immediately in the event of a timeout

# File lib/protobuf/rpc/connectors/zmq.rb, line 90
def create_socket
  begin
    socket = zmq_context.socket(::ZMQ::REQ)

    if socket # Make sure the context builds the socket
      server_uri = lookup_server_uri
      socket.setsockopt(::ZMQ::LINGER, 0)
      zmq_error_check(socket.connect(server_uri), :socket_connect)
      socket = socket_to_available_server(socket) if first_alive_load_balance?
    end
  end while socket.try(:socket).nil?

  socket
end
error?() click to toggle source

Method to determine error state, must be used with Connector API.

# File lib/protobuf/rpc/connectors/zmq.rb, line 107
def error?
  !! @error
end
host_alive?(host) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 111
def host_alive?(host)
  return true unless ping_port_enabled?

  if (last_response = self.class.ping_port_responses[host])
    if (Time.now.to_i - last_response[:at]) <= host_alive_check_interval
      return last_response[:ping_port_open]
    end
  end

  ping_port_open = ping_port_open?(host)
  self.class.ping_port_responses[host] = {
    :at => Time.now.to_i,
    :ping_port_open => ping_port_open,
  }
  ping_port_open
end
host_alive_check_interval() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 128
def host_alive_check_interval
  @host_alive_check_interval ||= [ENV["PB_ZMQ_CLIENT_HOST_ALIVE_CHECK_INTERVAL"].to_i, 1].max
end
lookup_server_uri() click to toggle source

Lookup a server uri for the requested service in the service directory. If the service directory is not running, default to the host and port in the options

# File lib/protobuf/rpc/connectors/zmq.rb, line 136
def lookup_server_uri
  server_lookup_attempts.times do
    first_alive_listing = service_directory.all_listings_for(service).find do |listing|
      host_alive?(listing.try(:address))
    end

    if first_alive_listing
      host = first_alive_listing.try(:address)
      port = first_alive_listing.try(:port)
      @stats.server = [port, host]
      return "tcp://#{host}:#{port}"
    end

    host = options[:host]
    port = options[:port]

    if host_alive?(host)
      @stats.server = [port, host]
      return "tcp://#{host}:#{port}"
    end

    sleep(1.0 / 100.0)
  end

  fail "Host not found for service #{service}"
end
ping_port_open?(host) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 163
def ping_port_open?(host)
  socket = TCPSocket.new(host, ping_port.to_i)
  socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
  socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_LINGER, [1, 0].pack('ii'))

  true
rescue
  false
ensure
  begin
    socket && socket.close
  rescue IOError
    nil
  end
end
rcv_timeout() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 179
def rcv_timeout
  @rcv_timeout ||= begin
    case
    when options[:timeout] then
      options[:timeout]
    when ENV.key?("PB_ZMQ_CLIENT_RCV_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_RCV_TIMEOUT"].to_i
    else
      300_000 # 300 seconds
    end
  end
end
send_request_with_lazy_pirate() click to toggle source

Trying a number of times, attempt to get a response from the server. If we haven’t received a legitimate response in the CLIENT_RETRIES number of retries, fail the request.

# File lib/protobuf/rpc/connectors/zmq.rb, line 196
def send_request_with_lazy_pirate
  attempt = 0

  begin
    attempt += 1
    send_request_with_timeout(attempt)
    parse_response
  rescue RequestTimeout
    retry if attempt < CLIENT_RETRIES
    failure(:RPC_FAILED, "The server repeatedly failed to respond within #{timeout} seconds")
  end
end
send_request_with_timeout(attempt = 0) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 209
def send_request_with_timeout(attempt = 0)
  socket = create_socket
  socket.setsockopt(::ZMQ::RCVTIMEO, rcv_timeout)
  socket.setsockopt(::ZMQ::SNDTIMEO, snd_timeout)

  logger.debug { sign_message("Sending Request (attempt #{attempt}, #{socket})") }
  zmq_eagain_error_check(socket.send_string(@request_data), :socket_send_string)
  logger.debug { sign_message("Waiting #{rcv_timeout}ms for response (attempt #{attempt}, #{socket})") }
  zmq_eagain_error_check(socket.recv_string(@response_data = ""), :socket_recv_string)
  logger.debug { sign_message("Response received (attempt #{attempt}, #{socket})") }
rescue ZmqEagainError
  logger.debug { sign_message("Timed out waiting for response (attempt #{attempt}, #{socket})") }
  raise RequestTimeout
ensure
  logger.debug { sign_message("Closing Socket")  }
  zmq_error_check(socket.close, :socket_close) if socket
  logger.debug { sign_message("Socket closed")  }
end
server_lookup_attempts() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 228
def server_lookup_attempts
  @server_lookup_attempts ||= [ENV["PB_ZMQ_CLIENT_SERVER_LOOKUP_ATTEMPTS"].to_i, 5].max
end
service() click to toggle source

The service we’re attempting to connect to

# File lib/protobuf/rpc/connectors/zmq.rb, line 234
def service
  options[:service]
end
service_directory() click to toggle source

Alias for ::Protobuf::Rpc::ServiceDirectory.instance

# File lib/protobuf/rpc/connectors/zmq.rb, line 239
def service_directory
  ::Protobuf::Rpc::ServiceDirectory.instance
end
snd_timeout() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 243
def snd_timeout
  @snd_timeout ||= begin
    case
    when options[:timeout] then
      options[:timeout]
    when ENV.key?("PB_ZMQ_CLIENT_SND_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_SND_TIMEOUT"].to_i
    else
      300_000 # 300 seconds
    end
  end
end
socket_to_available_server(socket) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 256
def socket_to_available_server(socket)
  check_available_response = ""
  socket.setsockopt(::ZMQ::RCVTIMEO, check_available_rcv_timeout)
  socket.setsockopt(::ZMQ::SNDTIMEO, check_available_snd_timeout)
  zmq_recoverable_error_check(socket.send_string(::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE), :socket_send_string)
  zmq_recoverable_error_check(socket.recv_string(check_available_response), :socket_recv_string)

  if check_available_response == ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE
    zmq_recoverable_error_check(socket.close, :socket_close)
  end

  socket.setsockopt(::ZMQ::RCVTIMEO, -1)
  socket.setsockopt(::ZMQ::SNDTIMEO, -1)
  socket
rescue ZmqRecoverableError
  return nil # couldn't make a connection and need to try again
end
zmq_context() click to toggle source

Return the ZMQ Context to use for this process. If the context does not exist, create it, then register an exit block to ensure the context is terminated correctly.

# File lib/protobuf/rpc/connectors/zmq.rb, line 278
def zmq_context
  self.class.zmq_context
end
zmq_eagain_error_check(return_code, source) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 282
        def zmq_eagain_error_check(return_code, source)
          return if ::ZMQ::Util.resultcode_ok?(return_code || -1)

          if ::ZMQ::Util.errno == ::ZMQ::EAGAIN
            fail ZmqEagainError, <<-ERROR
            Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".

            #{caller(1).join($INPUT_RECORD_SEPARATOR)}
            ERROR
          else
            fail <<-ERROR
            Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".

            #{caller(1).join($INPUT_RECORD_SEPARATOR)}
            ERROR
          end
        end
zmq_error_check(return_code, source) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 300
        def zmq_error_check(return_code, source)
          return if ::ZMQ::Util.resultcode_ok?(return_code || -1)

          fail <<-ERROR
          Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".

          #{caller(1).join($INPUT_RECORD_SEPARATOR)}
          ERROR
        end
zmq_recoverable_error_check(return_code, source) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 310
        def zmq_recoverable_error_check(return_code, source)
          return if ::ZMQ::Util.resultcode_ok?(return_code || -1)

          fail ZmqRecoverableError, <<-ERROR
          Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".

          #{caller(1).join($INPUT_RECORD_SEPARATOR)}
          ERROR
        end