class Embulk::Parser::FluentS3LogParserPlugin
Public Class Methods
transaction(config) { |task, columns| ... }
click to toggle source
# File lib/embulk/parser/fluent-s3-log.rb, line 9 def self.transaction(config, &control) # configuration code: task = { :columns => config.param("columns", :array) } columns = task[:columns].each_with_index.map do |c, i| Column.new(i+2, c["name"], c["type"].to_sym) end columns.insert(0, Column.new(0, "time", :timestamp)) columns.insert(1, Column.new(1, "key", :string)) yield(task, columns) end
Public Instance Methods
run(file_input)
click to toggle source
# File lib/embulk/parser/fluent-s3-log.rb, line 23 def run(file_input) while file = file_input.next_file StringIO.new(file.read).each_line do |buffer| # parsering code splitted = buffer.split("\t") record = [ Time.strptime(splitted[0], "%Y-%m-%dT%H:%M:%S%Z"), splitted[1] ] j = JSON.parse(splitted[2]) record += make_record(@task["columns"], j) page_builder.add(record) end end page_builder.finish end
Private Instance Methods
make_record(schema, e)
click to toggle source
# File lib/embulk/parser/fluent-s3-log.rb, line 41 def make_record(schema, e) schema.map do |c| name = c["name"] val = e[name] v = val.nil? ? "" : val type = c["type"] case type when "string" v when "long" v.to_i when "double" v.to_f when "boolean" ["yes", "true", "1"].include?(v.downcase) when "timestamp" v.empty? ? nil : Time.strptime(v, c["format"]) else raise "Unsupported type #{type}" end end end