class Avro::DataFile::Writer

Attributes

block_count[RW]
buffer_encoder[R]
buffer_writer[R]
codec[R]
datum_writer[R]
encoder[R]
meta[R]
sync_marker[R]
writer[R]

Public Class Methods

generate_sync_marker() click to toggle source
   # File lib/avro/data_file.rb
90 def self.generate_sync_marker
91   OpenSSL::Random.random_bytes(16)
92 end
new(writer, datum_writer, writers_schema=nil, codec=nil, meta={}) click to toggle source
    # File lib/avro/data_file.rb
 97 def initialize(writer, datum_writer, writers_schema=nil, codec=nil, meta={})
 98   # If writers_schema is not present, presume we're appending
 99   @writer = writer
100   @encoder = IO::BinaryEncoder.new(@writer)
101   @datum_writer = datum_writer
102   @meta = meta
103   @buffer_writer = StringIO.new(+'', 'w')
104   @buffer_writer.set_encoding('BINARY') if @buffer_writer.respond_to?(:set_encoding)
105   @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer)
106   @block_count = 0
107 
108   if writers_schema
109     @sync_marker = Writer.generate_sync_marker
110     @codec = DataFile.get_codec(codec)
111     @meta['avro.codec'] = @codec.codec_name.to_s
112     @meta['avro.schema'] = writers_schema.to_s
113     datum_writer.writers_schema = writers_schema
114     write_header
115   else
116     # open writer for reading to collect metadata
117     dfr = Reader.new(writer, Avro::IO::DatumReader.new)
118 
119     # FIXME(jmhodges): collect arbitrary metadata
120     # collect metadata
121     @sync_marker = dfr.sync_marker
122     @meta['avro.codec'] = dfr.meta['avro.codec']
123     @codec = DataFile.get_codec(meta['avro.codec'])
124 
125     # get schema used to write existing file
126     schema_from_file = dfr.meta['avro.schema']
127     @meta['avro.schema'] = schema_from_file
128     datum_writer.writers_schema = Schema.parse(schema_from_file)
129 
130     # seek to the end of the file and prepare for writing
131     writer.seek(0,2)
132   end
133 end

Public Instance Methods

<<(datum) click to toggle source

Append a datum to the file

    # File lib/avro/data_file.rb
136 def <<(datum)
137   datum_writer.write(datum, buffer_encoder)
138   self.block_count += 1
139 
140   # if the data to write is larger than the sync interval, write
141   # the block
142   if buffer_writer.tell >= SYNC_INTERVAL
143     write_block
144   end
145 end
close() click to toggle source
    # File lib/avro/data_file.rb
161 def close
162   flush
163   writer.close
164 end
flush() click to toggle source

Flush the current state of the file, including metadata

    # File lib/avro/data_file.rb
156 def flush
157   write_block
158   writer.flush
159 end
sync() click to toggle source

Return the current position as a value that may be passed to DataFileReader.seek(long). Forces the end of the current block, emitting a synchronization marker.

    # File lib/avro/data_file.rb
150 def sync
151   write_block
152   writer.tell
153 end

Private Instance Methods

write_block() click to toggle source

TODO(jmhodges): make a schema for blocks and use datum_writer TODO(jmhodges): do we really need the number of items in the block?

    # File lib/avro/data_file.rb
181 def write_block
182   if block_count > 0
183     # write number of items in block and block size in bytes
184     encoder.write_long(block_count)
185     to_write = codec.compress(buffer_writer.string)
186     encoder.write_long(to_write.respond_to?(:bytesize) ? to_write.bytesize : to_write.size)
187 
188     # write block contents
189     writer.write(to_write)
190 
191     # write sync marker
192     writer.write(sync_marker)
193 
194     # reset buffer
195     buffer_writer.truncate(0)
196     buffer_writer.rewind
197     self.block_count = 0
198   end
199 end
write_header() click to toggle source
    # File lib/avro/data_file.rb
168 def write_header
169   # write magic
170   writer.write(MAGIC)
171 
172   # write metadata
173   datum_writer.write_data(META_SCHEMA, meta, encoder)
174 
175   # write sync marker
176   writer.write(sync_marker)
177 end