class Protobuf::Rpc::Connectors::Zmq

Constants

CLIENT_RETRIES

Class Constants

RequestTimeout
ZmqEagainError
ZmqRecoverableError

Public Class Methods

ping_port_responses() click to toggle source

Class Methods

# File lib/protobuf/rpc/connectors/zmq.rb, line 27
def self.ping_port_responses
  @ping_port_responses ||= ::ThreadSafe::Cache.new
end
zmq_context(reload = false) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 31
def self.zmq_context(reload = false)
  @zmq_contexts = nil if reload
  @zmq_contexts ||= Hash.new do |hash, key|
    hash[key] = ZMQ::Context.new
  end

  @zmq_contexts[Process.pid]
end

Public Instance Methods

log_signature() click to toggle source

Instance methods

# File lib/protobuf/rpc/connectors/zmq.rb, line 43
def log_signature
  @_log_signature ||= "[client-#{self.class}]"
end
send_request() click to toggle source

Start the request/response cycle. We implement the Lazy Pirate req/reply reliability pattern as laid out in the ZMQ Guide, Chapter 4.

@see zguide.zeromq.org/php:chapter4#Client-side-Reliability-Lazy-Pirate-Pattern

# File lib/protobuf/rpc/connectors/zmq.rb, line 52
def send_request
  setup_connection
  send_request_with_lazy_pirate unless error?
end

Private Instance Methods

check_available_rcv_timeout() click to toggle source

Private Instance methods

# File lib/protobuf/rpc/connectors/zmq.rb, line 62
def check_available_rcv_timeout
  @check_available_rcv_timeout ||= begin
    case
    when ENV.key?("PB_ZMQ_CLIENT_CHECK_AVAILABLE_RCV_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_RCV_TIMEOUT"].to_i
    else
      200 # ms
    end
  end
end
check_available_snd_timeout() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 73
def check_available_snd_timeout
  @check_available_snd_timeout ||= begin
    case
    when ENV.key?("PB_ZMQ_CLIENT_CHECK_AVAILABLE_SND_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_SND_TIMEOUT"].to_i
    else
      200 # ms
    end
  end
end
close_connection() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 84
def close_connection
  # The socket is automatically closed after every request.
end
create_socket() click to toggle source

Create a socket connected to a server that can handle the current service. The LINGER is set to 0 so we can close immediately in the event of a timeout

# File lib/protobuf/rpc/connectors/zmq.rb, line 91
def create_socket
  attempt_number = 0

  begin
    attempt_number += 1
    socket = zmq_context.socket(::ZMQ::REQ)

    if socket # Make sure the context builds the socket
      server_uri = lookup_server_uri
      socket.setsockopt(::ZMQ::LINGER, 0)
      zmq_error_check(socket.connect(server_uri), :socket_connect)
      socket = socket_to_available_server(socket) if first_alive_load_balance?
    end
  end while socket.try(:socket).nil? && attempt_number < socket_creation_attempts

  raise RequestTimeout, "Cannot create new ZMQ client socket" if socket.try(:socket).nil?
  socket
end
error?() click to toggle source

Method to determine error state, must be used with Connector API.

# File lib/protobuf/rpc/connectors/zmq.rb, line 112
def error?
  !! @error
end
host_alive?(host) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 116
def host_alive?(host)
  return true unless ping_port_enabled?

  if (last_response = self.class.ping_port_responses[host])
    if (Time.now.to_i - last_response[:at]) <= host_alive_check_interval
      return last_response[:ping_port_open]
    end
  end

  ping_port_open = ping_port_open?(host)
  self.class.ping_port_responses[host] = {
    :at => Time.now.to_i,
    :ping_port_open => ping_port_open,
  }
  ping_port_open
end
host_alive_check_interval() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 133
def host_alive_check_interval
  @host_alive_check_interval ||= [ENV["PB_ZMQ_CLIENT_HOST_ALIVE_CHECK_INTERVAL"].to_i, 1].max
end
lookup_server_uri() click to toggle source

Lookup a server uri for the requested service in the service directory. If the service directory is not running, default to the host and port in the options

# File lib/protobuf/rpc/connectors/zmq.rb, line 141
def lookup_server_uri
  server_lookup_attempts.times do
    first_alive_listing = service_directory.all_listings_for(service).find do |listing|
      host_alive?(listing.try(:address))
    end

    if first_alive_listing
      host = first_alive_listing.try(:address)
      port = first_alive_listing.try(:port)
      @stats.server = [port, host]
      return "tcp://#{host}:#{port}"
    end

    host = options[:host]
    port = options[:port]

    if host_alive?(host)
      @stats.server = [port, host]
      return "tcp://#{host}:#{port}"
    end

    sleep(1.0 / 100.0)
  end

  fail "Host not found for service #{service}"
end
ping_port_open?(host) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 168
def ping_port_open?(host)
  Ping.new(host, ping_port.to_i).online?
end
rcv_timeout() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 172
def rcv_timeout
  @rcv_timeout ||= begin
    case
    when options[:timeout] then
      options[:timeout]
    when ENV.key?("PB_ZMQ_CLIENT_RCV_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_RCV_TIMEOUT"].to_i
    else
      300_000 # 300 seconds
    end
  end
end
send_request_with_lazy_pirate() click to toggle source

Trying a number of times, attempt to get a response from the server. If we haven't received a legitimate response in the CLIENT_RETRIES number of retries, fail the request.

# File lib/protobuf/rpc/connectors/zmq.rb, line 189
def send_request_with_lazy_pirate
  attempt = 0

  begin
    attempt += 1
    send_request_with_timeout(attempt)
    parse_response
  rescue RequestTimeout
    retry if attempt < CLIENT_RETRIES
    failure(:RPC_FAILED, "The server repeatedly failed to respond within #{timeout} seconds")
  end
end
send_request_with_timeout(attempt = 0) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 202
def send_request_with_timeout(attempt = 0)
  socket = create_socket
  socket.setsockopt(::ZMQ::RCVTIMEO, rcv_timeout)
  socket.setsockopt(::ZMQ::SNDTIMEO, snd_timeout)

  logger.debug { sign_message("Sending Request (attempt #{attempt}, #{socket})") }
  zmq_eagain_error_check(socket.send_string(@request_data), :socket_send_string)
  logger.debug { sign_message("Waiting #{rcv_timeout}ms for response (attempt #{attempt}, #{socket})") }
  zmq_eagain_error_check(socket.recv_string(@response_data = ""), :socket_recv_string)
  logger.debug { sign_message("Response received (attempt #{attempt}, #{socket})") }
rescue ZmqEagainError
  logger.debug { sign_message("Timed out waiting for response (attempt #{attempt}, #{socket})") }
  raise RequestTimeout
ensure
  logger.debug { sign_message("Closing Socket") }
  zmq_error_check(socket.close, :socket_close) if socket
  logger.debug { sign_message("Socket closed") }
end
server_lookup_attempts() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 221
def server_lookup_attempts
  @server_lookup_attempts ||= [ENV["PB_ZMQ_CLIENT_SERVER_LOOKUP_ATTEMPTS"].to_i, 5].max
end
service() click to toggle source

The service we're attempting to connect to

# File lib/protobuf/rpc/connectors/zmq.rb, line 227
def service
  options[:service]
end
service_directory() click to toggle source

Alias for ::Protobuf::Rpc::ServiceDirectory.instance

# File lib/protobuf/rpc/connectors/zmq.rb, line 232
def service_directory
  ::Protobuf::Rpc.service_directory
end
snd_timeout() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 236
def snd_timeout
  @snd_timeout ||= begin
    case
    when options[:timeout] then
      options[:timeout]
    when ENV.key?("PB_ZMQ_CLIENT_SND_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_SND_TIMEOUT"].to_i
    else
      300_000 # 300 seconds
    end
  end
end
socket_creation_attempts() click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 249
def socket_creation_attempts
  @socket_creation_attempts ||= (ENV["PB_ZMQ_CLIENT_SOCKET_CREATION_ATTEMPTS"] || 5).to_i
end
socket_to_available_server(socket) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 253
def socket_to_available_server(socket)
  check_available_response = ""
  socket.setsockopt(::ZMQ::RCVTIMEO, check_available_rcv_timeout)
  socket.setsockopt(::ZMQ::SNDTIMEO, check_available_snd_timeout)
  zmq_recoverable_error_check(socket.send_string(::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE), :socket_send_string)
  zmq_recoverable_error_check(socket.recv_string(check_available_response), :socket_recv_string)

  if check_available_response == ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE
    zmq_recoverable_error_check(socket.close, :socket_close)
  end

  socket.setsockopt(::ZMQ::RCVTIMEO, -1)
  socket.setsockopt(::ZMQ::SNDTIMEO, -1)
  socket
rescue ZmqRecoverableError
  return nil # couldn't make a connection and need to try again
end
zmq_context(reload = false) click to toggle source

Return the ZMQ Context to use for this process. If the context does not exist, create it, then register an exit block to ensure the context is terminated correctly.

# File lib/protobuf/rpc/connectors/zmq.rb, line 275
def zmq_context(reload = false)
  self.class.zmq_context(reload)
end
zmq_eagain_error_check(return_code, source) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 279
        def zmq_eagain_error_check(return_code, source)
          return if ::ZMQ::Util.resultcode_ok?(return_code || -1)

          if ::ZMQ::Util.errno == ::ZMQ::EAGAIN # rubocop:disable Style/GuardClause
            fail ZmqEagainError, <<-ERROR
            Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".

            #{caller(1).join($INPUT_RECORD_SEPARATOR)}
            ERROR
          else
            fail <<-ERROR
            Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".

            #{caller(1).join($INPUT_RECORD_SEPARATOR)}
            ERROR
          end
        end
zmq_error_check(return_code, source) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 297
        def zmq_error_check(return_code, source)
          return if ::ZMQ::Util.resultcode_ok?(return_code || -1)

          fail <<-ERROR
          Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".

          #{caller(1).join($INPUT_RECORD_SEPARATOR)}
          ERROR
        end
zmq_recoverable_error_check(return_code, source) click to toggle source
# File lib/protobuf/rpc/connectors/zmq.rb, line 307
        def zmq_recoverable_error_check(return_code, source)
          return if ::ZMQ::Util.resultcode_ok?(return_code || -1)

          fail ZmqRecoverableError, <<-ERROR
          Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".

          #{caller(1).join($INPUT_RECORD_SEPARATOR)}
          ERROR
        end