class LogStash::Codecs::AvroDataFile
Logstash Codec - Avro Data File¶ ↑
This plugin is used to process logstash events that represent Avro data files, like the S3 input can produce.
Options¶ ↑
-
“temporary_directory“ - optional. Specifies a directory to store temporary files in
-
“decorate_events“ - will add avro schema metadata to the events.
Usage¶ ↑
input {
stdin { codec => 'avro-data-file' }
}
Attributes
tempfile[RW]
temporary_directory[R]
Public Instance Methods
decode(data)
click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 40 def decode(data) merge(data) end
encode(_event)
click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 63 def encode(_event) raise 'Not implemented' end
flush() { |event| ... }
click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 44 def flush tempfile.flush return unless block_given? Avro::DataFile.open(tempfile.path, 'r') do |reader| schema = reader.datum_reader.writers_schema reader.each do |avro_message| event = LogStash::Event.new(avro_message) decorate_event(event, schema) if decorate_events? yield event end end rescue => e puts e @logger.error('Avro parse error', error: e) ensure reset end
register()
click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 34 def register require 'fileutils' FileUtils.mkdir_p(temporary_directory) unless Dir.exist?(temporary_directory) reset end
Private Instance Methods
decorate_event(event, schema)
click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 91 def decorate_event(event, schema) event.set('[@metadata][avro][type]', schema.type) if schema.is_a?(Avro::Schema::NamedSchema) event.set('[@metadata][avro][name]', schema.name) event.set('[@metadata][avro][namespace]', schema.namespace) end end
decorate_events?()
click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 87 def decorate_events? @decorate_events end
merge(bytes)
click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 72 def merge(bytes) tempfile.write(bytes) end
reset()
click to toggle source
# File lib/logstash/codecs/avro-data-file.rb, line 76 def reset unless tempfile.nil? begin File.unlink(tempfile.path) tempfile.close rescue Errno::ENOENT # rubocop:disable Lint/HandleExceptions end end self.tempfile = Tempfile.create('', temporary_directory) end