class IORequest::Client

Connection client.

General scheme:

Client 1                 Client 2
   |                        |
 (        Authorization       )  See `Authorizer` class. Error in authorization should close
   |                        |    connection
   |                        |
 [    Data transition loop    ]  Loop runs until someone sends 0 sized data. Then everyone
   |                        |    should close connection. Any R/W errors should also finish the
   |                        |    loop
   |                        |
   |-> uint(2 bytes)      ->|    Specifies size of following JSON string
   |-> Mesage as JSON     ->|    Message itself. It should contain its `type`, `id` and some
   |                        |    data hash
   |                        |
   |               (Message handling) See `Handler` class
   |                        |
   |<- uint(2 bytes)      <-|
   |<- Mesage as JSON     <-|

Attributes

authorizer[R]

Public Class Methods

new(authorizer: Authorizer.empty) click to toggle source

Initialize new client.

# File lib/io_request/client.rb, line 32
def initialize(authorizer: Authorizer.empty)
  @open = false
  @authorizer = authorizer

  @mutex_r = Mutex.new
  @mutex_w = Mutex.new

  @responses = {}
  @responses_access_mutex = Mutex.new
  @responses_access_cv = ConditionVariable.new
end

Public Instance Methods

close() click to toggle source

Close connection.

# File lib/io_request/client.rb, line 66
def close
  close_internal

  join_threads
end
on_close(&block) click to toggle source

Code to execute after connection is closed.

# File lib/io_request/client.rb, line 81
def on_close(&block)
  IORequest.logger.debug(prog_name) { 'Saved on_close block' }
  @on_close = block
end
on_request(&block) click to toggle source

@yieldparam [Hash] @yieldreturn [Hash]

# File lib/io_request/client.rb, line 74
def on_request(&block)
  IORequest.logger.debug(prog_name) { 'Saved on_request block' }
  @on_request = block
end
Also aliased as: respond
open(read: nil, write: nil, read_write: nil) click to toggle source

Start new client connection. @param r [IO] object to read from. @param w [IO] object to write to. @param rw [IO] read-write object (replaces `r` and `w` arguments). @return [Object] data from {Authorizer}

# File lib/io_request/client.rb, line 49
def open(read: nil, write: nil, read_write: nil)
  @io_r = read_write || read
  @io_w = read_write || write

  IORequest.logger.debug(prog_name) { 'Starting connection' }

  auth_data = authorization
  @open = true
  @data_transition_thread = in_thread(name: 'connection') { data_transition_loop }
  auth_data
end
open?() click to toggle source
# File lib/io_request/client.rb, line 61
def open?
  @open
end
request(data, timeout: nil, &callback) click to toggle source

If callback block is provided, request will be sent asynchroniously. @param data [Hash] @param timeout [Integer] timeout in seconds.

# File lib/io_request/client.rb, line 89
def request(data, timeout: nil, &callback)
  message = Message.new(data, type: :request)

  if block_given?
    # Async execution of request
    in_thread(callback, name: 'requesting') do |cb|
      cb.call(send_request_and_wait_for_response(message, timeout).data)
    end
    nil
  else
    send_request_and_wait_for_response(message, timeout).data
  end
end
respond(&block)
Alias for: on_request

Private Instance Methods

authorization() click to toggle source
# File lib/io_request/client.rb, line 130
def authorization
  auth_successful = false
  data = nil
  @mutex_r.synchronize do
    @mutex_w.synchronize do
      IORequest.logger.debug(prog_name) { 'Authorizing new client' }
      auth_successful = @authorizer.authorize(@io_r, @io_w)
      data = @authorizer.data
    end
  end
  raise AuthorizationFailureError unless auth_successful

  IORequest.logger.debug(prog_name) { "New client authorized with data #{data}" }
  data
end
close_internal() click to toggle source
# File lib/io_request/client.rb, line 107
def close_internal
  IORequest.logger.debug(prog_name) { 'Closing connection' }
  send_zero_size_request
  close_io
  @data_transition_thread = nil
  @open = false
  @on_close&.call if defined?(@on_close)
end
close_io() click to toggle source
# File lib/io_request/client.rb, line 116
def close_io
  begin
    @io_r&.close
  rescue StandardError => e
    IORequest.logger.debug "Failed to close read IO: #{e}"
  end
  begin
    @io_w&.close
  rescue StandardError => e
    IORequest.logger.debug "Failed to close write IO: #{e}"
  end
  IORequest.logger.debug(prog_name) { 'Closed IO streams' }
end
data_transition_iteration() click to toggle source
# File lib/io_request/client.rb, line 160
def data_transition_iteration
  message = @mutex_r.synchronize { Message.read_from(@io_r) }
  IORequest.logger.debug(prog_name) { "Received message: #{message}" }
  if message.request?
    in_thread(name: 'responding') { handle_request(message) }
  else
    handle_response(message)
  end
end
data_transition_loop() click to toggle source
# File lib/io_request/client.rb, line 146
def data_transition_loop
  IORequest.logger.debug(prog_name) { 'Starting data transition loop' }
  loop do
    data_transition_iteration
  rescue ZeroSizeMessageError
    IORequest.logger.debug(prog_name) { 'Connection was closed from the other side' }
    break
  rescue StandardError => e
    IORequest.logger.debug(prog_name) { "Data transition unknown error: #{e}" }
    break
  end
  close_internal
end
handle_request(message) click to toggle source
# File lib/io_request/client.rb, line 170
def handle_request(message)
  data = {}
  data = @on_request&.call(message.data) if defined?(@on_request)
  response = Message.new(data, type: :response, to: message.id)
  send_response(response)
end
handle_response(message) click to toggle source
# File lib/io_request/client.rb, line 177
def handle_response(message)
  @responses_access_mutex.synchronize do
    @responses[message.to.to_s] = message
    @responses_access_cv.broadcast
  end
end
send_request_and_wait_for_response(request, timeout) click to toggle source
# File lib/io_request/client.rb, line 204
def send_request_and_wait_for_response(request, timeout)
  @mutex_w.synchronize do
    IORequest.logger.debug(prog_name) { "Sending message: #{request}" }
    request.write_to(@io_w)
  end
  wait_for_response(request, timeout)
end
send_response(response) click to toggle source
# File lib/io_request/client.rb, line 184
def send_response(response)
  @mutex_w.synchronize do
    IORequest.logger.debug(prog_name) { "Sending response: #{response}" }
    begin
      response.write_to(@io_w)
    rescue IOError => e
      IORequest.logger.debug(prog_name) { "Failed to write response message: #{e}" }
    end
  end
end
send_zero_size_request() click to toggle source
# File lib/io_request/client.rb, line 195
def send_zero_size_request
  @mutex_w.synchronize do
    IORequest.logger.debug(prog_name) { 'Sending zero size message' }
    @io_w.write([0].pack('S'))
  end
rescue StandardError => e
  IORequest.logger.debug(prog_name) { "Failed to send zero-sized message(#{e})" }
end
wait_for_response(request, timeout) click to toggle source
# File lib/io_request/client.rb, line 212
def wait_for_response(request, timeout)
  IORequest.logger.debug(prog_name) { "Waiting for response for #{request}" }
  waiting_start_time = Time.now
  @responses_access_mutex.synchronize do
    response = nil
    until response
      @responses_access_cv.wait(@responses_access_mutex, 1)
      if @responses_access_mutex.owned?
        # NOTE: Only accessing responses hash if thread owns access mutex
        response = @responses.delete(request.id.to_s)
      end
      raise RequestTimeoutError if timeout && (Time.now - waiting_start_time >= timeout)
    end
    IORequest.logger.debug(prog_name) { "Found response: #{response}" }
    response
  end
end