class Avro::IPC::Requestor
Base class for the client side of a protocol interaction.
Attributes
local_protocol[R]
remote_hash[R]
remote_protocol[R]
send_protocol[RW]
transport[R]
Public Class Methods
new(local_protocol, transport)
click to toggle source
# File lib/avro/ipc.rb 86 def initialize(local_protocol, transport) 87 @local_protocol = local_protocol 88 @transport = transport 89 @remote_protocol = nil 90 @remote_hash = nil 91 @send_protocol = nil 92 end
Public Instance Methods
read_call_response(message_name, decoder)
click to toggle source
# File lib/avro/ipc.rb 194 def read_call_response(message_name, decoder) 195 # The format of a call response is: 196 # * response metadata, a map with values of type bytes 197 # * a one-byte error flag boolean, followed by either: 198 # * if the error flag is false, 199 # the message response, serialized per the message's response schema. 200 # * if the error flag is true, 201 # the error, serialized per the message's error union schema. 202 _response_metadata = META_READER.read(decoder) 203 204 # remote response schema 205 remote_message_schema = remote_protocol.messages[message_name] 206 raise AvroError.new("Unknown remote message: #{message_name}") unless remote_message_schema 207 208 # local response schema 209 local_message_schema = local_protocol.messages[message_name] 210 unless local_message_schema 211 raise AvroError.new("Unknown local message: #{message_name}") 212 end 213 214 # error flag 215 if !decoder.read_boolean 216 writers_schema = remote_message_schema.response 217 readers_schema = local_message_schema.response 218 read_response(writers_schema, readers_schema, decoder) 219 else 220 writers_schema = remote_message_schema.errors || SYSTEM_ERROR_SCHEMA 221 readers_schema = local_message_schema.errors || SYSTEM_ERROR_SCHEMA 222 raise read_error(writers_schema, readers_schema, decoder) 223 end 224 end
read_error(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/ipc.rb 231 def read_error(writers_schema, readers_schema, decoder) 232 datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) 233 AvroRemoteError.new(datum_reader.read(decoder)) 234 end
read_handshake_response(decoder)
click to toggle source
# File lib/avro/ipc.rb 168 def read_handshake_response(decoder) 169 handshake_response = HANDSHAKE_REQUESTOR_READER.read(decoder) 170 we_have_matching_schema = false 171 172 case handshake_response['match'] 173 when 'BOTH' 174 self.send_protocol = false 175 we_have_matching_schema = true 176 when 'CLIENT' 177 raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol 178 self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) 179 self.remote_hash = handshake_response['serverHash'] 180 self.send_protocol = false 181 we_have_matching_schema = true 182 when 'NONE' 183 raise AvroError.new('Handshake failure. match == NONE') if send_protocol 184 self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) 185 self.remote_hash = handshake_response['serverHash'] 186 self.send_protocol = true 187 else 188 raise AvroError.new("Unexpected match: #{match}") 189 end 190 191 return we_have_matching_schema 192 end
read_response(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/ipc.rb 226 def read_response(writers_schema, readers_schema, decoder) 227 datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) 228 datum_reader.read(decoder) 229 end
remote_hash=(new_remote_hash)
click to toggle source
# File lib/avro/ipc.rb 99 def remote_hash=(new_remote_hash) 100 @remote_hash = new_remote_hash 101 REMOTE_HASHES[transport.remote_name] = remote_hash 102 end
remote_protocol=(new_remote_protocol)
click to toggle source
# File lib/avro/ipc.rb 94 def remote_protocol=(new_remote_protocol) 95 @remote_protocol = new_remote_protocol 96 REMOTE_PROTOCOLS[transport.remote_name] = remote_protocol 97 end
request(message_name, request_datum)
click to toggle source
# File lib/avro/ipc.rb 104 def request(message_name, request_datum) 105 # Writes a request message and reads a response or error message. 106 # build handshake and call request 107 buffer_writer = StringIO.new(String.new('', encoding: 'BINARY')) 108 buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer) 109 write_handshake_request(buffer_encoder) 110 write_call_request(message_name, request_datum, buffer_encoder) 111 112 # send the handshake and call request; block until call response 113 call_request = buffer_writer.string 114 call_response = transport.transceive(call_request) 115 116 # process the handshake and call response 117 buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_response)) 118 if read_handshake_response(buffer_decoder) 119 read_call_response(message_name, buffer_decoder) 120 else 121 request(message_name, request_datum) 122 end 123 end
write_call_request(message_name, request_datum, encoder)
click to toggle source
# File lib/avro/ipc.rb 143 def write_call_request(message_name, request_datum, encoder) 144 # The format of a call request is: 145 # * request metadata, a map with values of type bytes 146 # * the message name, an Avro string, followed by 147 # * the message parameters. Parameters are serialized according to 148 # the message's request declaration. 149 150 # TODO request metadata (not yet implemented) 151 request_metadata = {} 152 META_WRITER.write(request_metadata, encoder) 153 154 message = local_protocol.messages[message_name] 155 unless message 156 raise AvroError, "Unknown message: #{message_name}" 157 end 158 encoder.write_string(message.name) 159 160 write_request(message.request, request_datum, encoder) 161 end
write_handshake_request(encoder)
click to toggle source
# File lib/avro/ipc.rb 125 def write_handshake_request(encoder) 126 local_hash = local_protocol.md5 127 remote_name = transport.remote_name 128 remote_hash = REMOTE_HASHES[remote_name] 129 unless remote_hash 130 remote_hash = local_hash 131 self.remote_protocol = local_protocol 132 end 133 request_datum = { 134 'clientHash' => local_hash, 135 'serverHash' => remote_hash 136 } 137 if send_protocol 138 request_datum['clientProtocol'] = local_protocol.to_s 139 end 140 HANDSHAKE_REQUESTOR_WRITER.write(request_datum, encoder) 141 end
write_request(request_schema, request_datum, encoder)
click to toggle source
# File lib/avro/ipc.rb 163 def write_request(request_schema, request_datum, encoder) 164 datum_writer = Avro::IO::DatumWriter.new(request_schema) 165 datum_writer.write(request_datum, encoder) 166 end