class Embulk::Parser::FluentS3LogParserPlugin

Public Class Methods

transaction(config) { |task, columns| ... } click to toggle source
# File lib/embulk/parser/fluent-s3-log.rb, line 9
def self.transaction(config, &control)
  # configuration code:
  task = {
    :columns => config.param("columns", :array)
  }

  columns = task[:columns].each_with_index.map do |c, i|
    Column.new(i+2, c["name"], c["type"].to_sym)
  end
  columns.insert(0, Column.new(0, "time", :timestamp))
  columns.insert(1, Column.new(1, "key", :string))
  yield(task, columns)
end

Public Instance Methods

run(file_input) click to toggle source
# File lib/embulk/parser/fluent-s3-log.rb, line 23
def run(file_input)
  while file = file_input.next_file
    StringIO.new(file.read).each_line do |buffer|
      # parsering code
      splitted = buffer.split("\t")
      record = [
        Time.strptime(splitted[0], "%Y-%m-%dT%H:%M:%S%Z"),
        splitted[1]
      ]
      j = JSON.parse(splitted[2])
      record += make_record(@task["columns"], j)
      page_builder.add(record)
    end
  end
  page_builder.finish
end

Private Instance Methods

make_record(schema, e) click to toggle source
# File lib/embulk/parser/fluent-s3-log.rb, line 41
def make_record(schema, e)
  schema.map do |c|
    name = c["name"]
    val = e[name]

    v = val.nil? ? "" : val
    type = c["type"]
    case type
      when "string"
        v
      when "long"
        v.to_i
      when "double"
        v.to_f
      when "boolean"
        ["yes", "true", "1"].include?(v.downcase)
      when "timestamp"
        v.empty? ? nil : Time.strptime(v, c["format"])
      else
        raise "Unsupported type #{type}"
    end
  end
end