class Avro::IPC::Responder

Base class for the server side of a protocol interaction.

Attributes

local_hash[R]
local_protocol[R]
protocol_cache[R]

Public Class Methods

new(local_protocol) click to toggle source
    # File lib/avro/ipc.rb
240 def initialize(local_protocol)
241   @local_protocol = local_protocol
242   @local_hash = self.local_protocol.md5
243   @protocol_cache = {}
244   protocol_cache[local_hash] = local_protocol
245 end

Public Instance Methods

call(_local_message, _request) click to toggle source
    # File lib/avro/ipc.rb
357 def call(_local_message, _request)
358   # Actual work done by server: cf. handler in thrift.
359   raise NotImplementedError
360 end
process_handshake(decoder, encoder, connection=nil) click to toggle source
    # File lib/avro/ipc.rb
310 def process_handshake(decoder, encoder, connection=nil)
311   if connection && connection.is_connected?
312     return connection.protocol
313   end
314   handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
315   handshake_response = {}
316 
317   # determine the remote protocol
318   client_hash = handshake_request['clientHash']
319   client_protocol = handshake_request['clientProtocol']
320   remote_protocol = protocol_cache[client_hash]
321 
322   if !remote_protocol && client_protocol
323     remote_protocol = Avro::Protocol.parse(client_protocol)
324     protocol_cache[client_hash] = remote_protocol
325   end
326 
327   # evaluate remote's guess of the local protocol
328   server_hash = handshake_request['serverHash']
329   if local_hash == server_hash
330     if !remote_protocol
331       handshake_response['match'] = 'NONE'
332     else
333       handshake_response['match'] = 'BOTH'
334     end
335   else
336     if !remote_protocol
337       handshake_response['match'] = 'NONE'
338     else
339       handshake_response['match'] = 'CLIENT'
340     end
341   end
342 
343   if handshake_response['match'] != 'BOTH'
344     handshake_response['serverProtocol'] = local_protocol.to_s
345     handshake_response['serverHash'] = local_hash
346   end
347 
348   HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder)
349 
350   if connection && handshake_response['match'] != 'NONE'
351     connection.protocol = remote_protocol
352   end
353 
354   remote_protocol
355 end
read_request(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/ipc.rb
362 def read_request(writers_schema, readers_schema, decoder)
363   datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
364   datum_reader.read(decoder)
365 end
respond(call_request, transport=nil) click to toggle source

Called by a server to deserialize a request, compute and serialize a response or error. Compare to 'handle()' in Thrift.

    # File lib/avro/ipc.rb
249 def respond(call_request, transport=nil)
250   buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request))
251   buffer_writer = StringIO.new(String.new('', encoding: 'BINARY'))
252   buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
253   error = nil
254   response_metadata = {}
255 
256   begin
257     remote_protocol = process_handshake(buffer_decoder, buffer_encoder, transport)
258     # handshake failure
259     unless remote_protocol
260       return buffer_writer.string
261     end
262 
263     # read request using remote protocol
264     _request_metadata = META_READER.read(buffer_decoder)
265     remote_message_name = buffer_decoder.read_string
266 
267     # get remote and local request schemas so we can do
268     # schema resolution (one fine day)
269     remote_message = remote_protocol.messages[remote_message_name]
270     unless remote_message
271       raise AvroError.new("Unknown remote message: #{remote_message_name}")
272     end
273     local_message = local_protocol.messages[remote_message_name]
274     unless local_message
275       raise AvroError.new("Unknown local message: #{remote_message_name}")
276     end
277     writers_schema = remote_message.request
278     readers_schema = local_message.request
279     request = read_request(writers_schema, readers_schema, buffer_decoder)
280     # perform server logic
281     begin
282       response = call(local_message, request)
283     rescue AvroRemoteError => e
284       error = e
285     rescue Exception => e # rubocop:disable Lint/RescueException
286       error = AvroRemoteError.new(e.to_s)
287     end
288 
289     # write response using local protocol
290     META_WRITER.write(response_metadata, buffer_encoder)
291     buffer_encoder.write_boolean(!!error)
292     if error.nil?
293       writers_schema = local_message.response
294       write_response(writers_schema, response, buffer_encoder)
295     else
296       writers_schema = local_message.errors || SYSTEM_ERROR_SCHEMA
297       write_error(writers_schema, error, buffer_encoder)
298     end
299   rescue Avro::AvroError => e
300     error = AvroRemoteException.new(e.to_s)
301     # TODO does the stuff written here ever get used?
302     buffer_encoder = Avro::IO::BinaryEncoder.new(StringIO.new)
303     META_WRITER.write(response_metadata, buffer_encoder)
304     buffer_encoder.write_boolean(true)
305     self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
306   end
307   buffer_writer.string
308 end
write_error(writers_schema, error_exception, encoder) click to toggle source
    # File lib/avro/ipc.rb
372 def write_error(writers_schema, error_exception, encoder)
373   datum_writer = Avro::IO::DatumWriter.new(writers_schema)
374   datum_writer.write(error_exception.to_s, encoder)
375 end
write_response(writers_schema, response_datum, encoder) click to toggle source
    # File lib/avro/ipc.rb
367 def write_response(writers_schema, response_datum, encoder)
368   datum_writer = Avro::IO::DatumWriter.new(writers_schema)
369   datum_writer.write(response_datum, encoder)
370 end