class IORequest::Client
Connection client.
General scheme:
Client 1 Client 2 | | ( Authorization ) See `Authorizer` class. Error in authorization should close | | connection | | [ Data transition loop ] Loop runs until someone sends 0 sized data. Then everyone | | should close connection. Any R/W errors should also finish the | | loop | | |-> uint(2 bytes) ->| Specifies size of following JSON string |-> Mesage as JSON ->| Message itself. It should contain its `type`, `id` and some | | data hash | | | (Message handling) See `Handler` class | | |<- uint(2 bytes) <-| |<- Mesage as JSON <-|
Attributes
Public Class Methods
new(authorizer: Authorizer.empty)
click to toggle source
Initialize new client.
# File lib/io_request/client.rb, line 32 def initialize(authorizer: Authorizer.empty) @open = false @authorizer = authorizer @mutex_r = Mutex.new @mutex_w = Mutex.new @responses = {} @responses_access_mutex = Mutex.new @responses_access_cv = ConditionVariable.new end
Public Instance Methods
close()
click to toggle source
Close connection.
# File lib/io_request/client.rb, line 66 def close close_internal join_threads end
on_close(&block)
click to toggle source
Code to execute after connection is closed.
# File lib/io_request/client.rb, line 81 def on_close(&block) IORequest.logger.debug(prog_name) { 'Saved on_close block' } @on_close = block end
on_request(&block)
click to toggle source
@yieldparam [Hash] @yieldreturn [Hash]
# File lib/io_request/client.rb, line 74 def on_request(&block) IORequest.logger.debug(prog_name) { 'Saved on_request block' } @on_request = block end
Also aliased as: respond
open(read: nil, write: nil, read_write: nil)
click to toggle source
Start new client connection. @param r [IO] object to read from. @param w [IO] object to write to. @param rw [IO] read-write object (replaces `r` and `w` arguments). @return [Object] data from {Authorizer}
# File lib/io_request/client.rb, line 49 def open(read: nil, write: nil, read_write: nil) @io_r = read_write || read @io_w = read_write || write IORequest.logger.debug(prog_name) { 'Starting connection' } auth_data = authorization @open = true @data_transition_thread = in_thread(name: 'connection') { data_transition_loop } auth_data end
open?()
click to toggle source
# File lib/io_request/client.rb, line 61 def open? @open end
request(data, timeout: nil, &callback)
click to toggle source
If callback block is provided, request will be sent asynchroniously. @param data [Hash] @param timeout [Integer] timeout in seconds.
# File lib/io_request/client.rb, line 89 def request(data, timeout: nil, &callback) message = Message.new(data, type: :request) if block_given? # Async execution of request in_thread(callback, name: 'requesting') do |cb| cb.call(send_request_and_wait_for_response(message, timeout).data) end nil else send_request_and_wait_for_response(message, timeout).data end end
Private Instance Methods
close_internal()
click to toggle source
# File lib/io_request/client.rb, line 107 def close_internal IORequest.logger.debug(prog_name) { 'Closing connection' } send_zero_size_request close_io @data_transition_thread = nil @open = false @on_close&.call if defined?(@on_close) end
close_io()
click to toggle source
# File lib/io_request/client.rb, line 116 def close_io begin @io_r&.close rescue StandardError => e IORequest.logger.debug "Failed to close read IO: #{e}" end begin @io_w&.close rescue StandardError => e IORequest.logger.debug "Failed to close write IO: #{e}" end IORequest.logger.debug(prog_name) { 'Closed IO streams' } end
data_transition_iteration()
click to toggle source
# File lib/io_request/client.rb, line 160 def data_transition_iteration message = @mutex_r.synchronize { Message.read_from(@io_r) } IORequest.logger.debug(prog_name) { "Received message: #{message}" } if message.request? in_thread(name: 'responding') { handle_request(message) } else handle_response(message) end end
data_transition_loop()
click to toggle source
# File lib/io_request/client.rb, line 146 def data_transition_loop IORequest.logger.debug(prog_name) { 'Starting data transition loop' } loop do data_transition_iteration rescue ZeroSizeMessageError IORequest.logger.debug(prog_name) { 'Connection was closed from the other side' } break rescue StandardError => e IORequest.logger.debug(prog_name) { "Data transition unknown error: #{e}" } break end close_internal end
handle_request(message)
click to toggle source
# File lib/io_request/client.rb, line 170 def handle_request(message) data = {} data = @on_request&.call(message.data) if defined?(@on_request) response = Message.new(data, type: :response, to: message.id) send_response(response) end
handle_response(message)
click to toggle source
# File lib/io_request/client.rb, line 177 def handle_response(message) @responses_access_mutex.synchronize do @responses[message.to.to_s] = message @responses_access_cv.broadcast end end
send_request_and_wait_for_response(request, timeout)
click to toggle source
# File lib/io_request/client.rb, line 204 def send_request_and_wait_for_response(request, timeout) @mutex_w.synchronize do IORequest.logger.debug(prog_name) { "Sending message: #{request}" } request.write_to(@io_w) end wait_for_response(request, timeout) end
send_response(response)
click to toggle source
# File lib/io_request/client.rb, line 184 def send_response(response) @mutex_w.synchronize do IORequest.logger.debug(prog_name) { "Sending response: #{response}" } begin response.write_to(@io_w) rescue IOError => e IORequest.logger.debug(prog_name) { "Failed to write response message: #{e}" } end end end
send_zero_size_request()
click to toggle source
# File lib/io_request/client.rb, line 195 def send_zero_size_request @mutex_w.synchronize do IORequest.logger.debug(prog_name) { 'Sending zero size message' } @io_w.write([0].pack('S')) end rescue StandardError => e IORequest.logger.debug(prog_name) { "Failed to send zero-sized message(#{e})" } end
wait_for_response(request, timeout)
click to toggle source
# File lib/io_request/client.rb, line 212 def wait_for_response(request, timeout) IORequest.logger.debug(prog_name) { "Waiting for response for #{request}" } waiting_start_time = Time.now @responses_access_mutex.synchronize do response = nil until response @responses_access_cv.wait(@responses_access_mutex, 1) if @responses_access_mutex.owned? # NOTE: Only accessing responses hash if thread owns access mutex response = @responses.delete(request.id.to_s) end raise RequestTimeoutError if timeout && (Time.now - waiting_start_time >= timeout) end IORequest.logger.debug(prog_name) { "Found response: #{response}" } response end end