class Avro::IO::DatumReader

Attributes

readers_schema[RW]
writers_schema[RW]

Public Class Methods

match_schemas(writers_schema, readers_schema) click to toggle source
    # File lib/avro/io.rb
240 def self.match_schemas(writers_schema, readers_schema)
241   Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema)
242 end
new(writers_schema=nil, readers_schema=nil) click to toggle source
    # File lib/avro/io.rb
246 def initialize(writers_schema=nil, readers_schema=nil)
247   @writers_schema = writers_schema
248   @readers_schema = readers_schema
249 end

Public Instance Methods

read(decoder) click to toggle source
    # File lib/avro/io.rb
251 def read(decoder)
252   self.readers_schema = writers_schema unless readers_schema
253   read_data(writers_schema, readers_schema, decoder)
254 end
read_array(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
313 def read_array(writers_schema, readers_schema, decoder)
314   read_items = []
315   block_count = decoder.read_long
316   while block_count != 0
317     if block_count < 0
318       block_count = -block_count
319       _block_size = decoder.read_long
320     end
321     block_count.times do
322       read_items << read_data(writers_schema.items,
323                               readers_schema.items,
324                               decoder)
325     end
326     block_count = decoder.read_long
327   end
328 
329   read_items
330 end
read_data(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
256 def read_data(writers_schema, readers_schema, decoder)
257   # schema matching
258   unless self.class.match_schemas(writers_schema, readers_schema)
259     raise SchemaMatchException.new(writers_schema, readers_schema)
260   end
261 
262   # schema resolution: reader's schema is a union, writer's
263   # schema is not
264   if writers_schema.type_sym != :union && readers_schema.type_sym == :union
265     rs = readers_schema.schemas.find{|s|
266       self.class.match_schemas(writers_schema, s)
267     }
268     return read_data(writers_schema, rs, decoder) if rs
269     raise SchemaMatchException.new(writers_schema, readers_schema)
270   end
271 
272   # function dispatch for reading data based on type of writer's
273   # schema
274   datum = case writers_schema.type_sym
275   when :null;    decoder.read_null
276   when :boolean; decoder.read_boolean
277   when :string;  decoder.read_string
278   when :int;     decoder.read_int
279   when :long;    decoder.read_long
280   when :float;   decoder.read_float
281   when :double;  decoder.read_double
282   when :bytes;   decoder.read_bytes
283   when :fixed;   read_fixed(writers_schema, readers_schema, decoder)
284   when :enum;    read_enum(writers_schema, readers_schema, decoder)
285   when :array;   read_array(writers_schema, readers_schema, decoder)
286   when :map;     read_map(writers_schema, readers_schema, decoder)
287   when :union;   read_union(writers_schema, readers_schema, decoder)
288   when :record, :error, :request;  read_record(writers_schema, readers_schema, decoder)
289   else
290     raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
291   end
292 
293   readers_schema.type_adapter.decode(datum)
294 end
read_default_value(field_schema, default_value) click to toggle source
    # File lib/avro/io.rb
391 def read_default_value(field_schema, default_value)
392   # Basically a JSON Decoder?
393   case field_schema.type_sym
394   when :null
395     return nil
396   when :int, :long
397     return Integer(default_value)
398   when :float, :double
399     return Float(default_value)
400   when :boolean, :enum, :fixed, :string, :bytes
401     return default_value
402   when :array
403     read_array = []
404     default_value.each do |json_val|
405       item_val = read_default_value(field_schema.items, json_val)
406       read_array << item_val
407     end
408     return read_array
409   when :map
410     read_map = {}
411     default_value.each do |key, json_val|
412       map_val = read_default_value(field_schema.values, json_val)
413       read_map[key] = map_val
414     end
415     return read_map
416   when :union
417     return read_default_value(field_schema.schemas[0], default_value)
418   when :record, :error
419     read_record = {}
420     field_schema.fields.each do |field|
421       json_val = default_value[field.name]
422       json_val = field.default unless json_val
423       field_val = read_default_value(field.type, json_val)
424       read_record[field.name] = field_val
425     end
426     return read_record
427   else
428     fail_msg = "Unknown type: #{field_schema.type}"
429     raise AvroError, fail_msg
430   end
431 end
read_enum(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
300 def read_enum(writers_schema, readers_schema, decoder)
301   index_of_symbol = decoder.read_int
302   read_symbol = writers_schema.symbols[index_of_symbol]
303 
304   if !readers_schema.symbols.include?(read_symbol) && readers_schema.default
305     read_symbol = readers_schema.default
306   end
307 
308   # This implementation deviates from the spec by always returning
309   # a symbol.
310   read_symbol
311 end
read_fixed(writers_schema, _readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
296 def read_fixed(writers_schema, _readers_schema, decoder)
297   decoder.read(writers_schema.size)
298 end
read_map(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
332 def read_map(writers_schema, readers_schema, decoder)
333   read_items = {}
334   block_count = decoder.read_long
335   while block_count != 0
336     if block_count < 0
337       block_count = -block_count
338       _block_size = decoder.read_long
339     end
340     block_count.times do
341       key = decoder.read_string
342       read_items[key] = read_data(writers_schema.values,
343                                   readers_schema.values,
344                                   decoder)
345     end
346     block_count = decoder.read_long
347   end
348 
349   read_items
350 end
read_record(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
359 def read_record(writers_schema, readers_schema, decoder)
360   readers_fields_hash = readers_schema.fields_hash
361   read_record = {}
362   writers_schema.fields.each do |field|
363     readers_field = readers_fields_hash[field.name]
364     if readers_field
365       field_val = read_data(field.type, readers_field.type, decoder)
366       read_record[field.name] = field_val
367     elsif readers_schema.fields_by_alias.key?(field.name)
368       readers_field = readers_schema.fields_by_alias[field.name]
369       field_val = read_data(field.type, readers_field.type, decoder)
370       read_record[readers_field.name] = field_val
371     else
372       skip_data(field.type, decoder)
373     end
374   end
375 
376   # fill in the default values
377   readers_fields_hash.each do |field_name, field|
378     next if read_record.key?(field_name)
379 
380     if field.default?
381       field_val = read_default_value(field.type, field.default)
382       read_record[field.name] = field_val
383     else
384       raise AvroError, "Missing data for #{field.type} with no default"
385     end
386   end
387 
388   read_record
389 end
read_union(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
352 def read_union(writers_schema, readers_schema, decoder)
353   index_of_schema = decoder.read_long
354   selected_writers_schema = writers_schema.schemas[index_of_schema]
355 
356   read_data(selected_writers_schema, readers_schema, decoder)
357 end
skip_array(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
481 def skip_array(writers_schema, decoder)
482   skip_blocks(decoder) { skip_data(writers_schema.items, decoder) }
483 end
skip_data(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
433 def skip_data(writers_schema, decoder)
434   case writers_schema.type_sym
435   when :null
436     decoder.skip_null
437   when :boolean
438     decoder.skip_boolean
439   when :string
440     decoder.skip_string
441   when :int
442     decoder.skip_int
443   when :long
444     decoder.skip_long
445   when :float
446     decoder.skip_float
447   when :double
448     decoder.skip_double
449   when :bytes
450     decoder.skip_bytes
451   when :fixed
452     skip_fixed(writers_schema, decoder)
453   when :enum
454     skip_enum(writers_schema, decoder)
455   when :array
456     skip_array(writers_schema, decoder)
457   when :map
458     skip_map(writers_schema, decoder)
459   when :union
460     skip_union(writers_schema, decoder)
461   when :record, :error, :request
462     skip_record(writers_schema, decoder)
463   else
464     raise AvroError, "Unknown schema type: #{writers_schema.type}"
465   end
466 end
skip_enum(_writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
472 def skip_enum(_writers_schema, decoder)
473   decoder.skip_int
474 end
skip_fixed(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
468 def skip_fixed(writers_schema, decoder)
469   decoder.skip(writers_schema.size)
470 end
skip_map(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
485 def skip_map(writers_schema, decoder)
486   skip_blocks(decoder) {
487     decoder.skip_string
488     skip_data(writers_schema.values, decoder)
489   }
490 end
skip_record(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
492 def skip_record(writers_schema, decoder)
493   writers_schema.fields.each{|f| skip_data(f.type, decoder) }
494 end
skip_union(writers_schema, decoder) click to toggle source
    # File lib/avro/io.rb
476 def skip_union(writers_schema, decoder)
477   index = decoder.read_long
478   skip_data(writers_schema.schemas[index], decoder)
479 end

Private Instance Methods

skip_blocks(decoder, &blk) click to toggle source
    # File lib/avro/io.rb
497 def skip_blocks(decoder, &blk)
498   block_count = decoder.read_long
499   while block_count != 0
500     if block_count < 0
501       decoder.skip(decoder.read_long)
502     else
503       block_count.times(&blk)
504     end
505     block_count = decoder.read_long
506   end
507 end