class Embulk::Filter::Jsoncolumn

Public Class Methods

transaction(config, in_schema) { |task, out_columns| ... } click to toggle source
# File lib/embulk/filter/jsoncolumn.rb, line 9
def self.transaction(config, in_schema, &control)
  # configuration code:
  task = {
    'schema' => config.param("schema", :array),
    'columns' =>
      config.param('schema', :array, :default => []).inject({}){|a, col|
        a[col['name']] = col['type'].to_sym
        a
      }
  }

  columns = task['columns'].map.with_index{|(name, type), i|
    Column.new(i, name, type)
  }

  #out_columns = in_schema + columns
  out_columns = columns

  yield(task, out_columns)
end

Public Instance Methods

add(page) click to toggle source
# File lib/embulk/filter/jsoncolumn.rb, line 37
def add(page)
  # filtering code:
  page.each do |records|
    records.each do |record|
      r = JSON.parse(record)
      page_builder.add(make_record(task['schema'], r))
    end
  end
end
close() click to toggle source
# File lib/embulk/filter/jsoncolumn.rb, line 34
def close
end
find_by_path(e, path) click to toggle source
# File lib/embulk/filter/jsoncolumn.rb, line 83
def find_by_path(e, path)
  JsonPath.on(e, path).first
end
finish() click to toggle source
# File lib/embulk/filter/jsoncolumn.rb, line 87
def finish
  page_builder.finish
end
init() click to toggle source
# File lib/embulk/filter/jsoncolumn.rb, line 30
def init
  # initialization code:
end
make_record(schema, e) click to toggle source
# File lib/embulk/filter/jsoncolumn.rb, line 47
def make_record(schema, e)
  schema.map do |c|
    name = c["name"]
    path = c["path"]
    val = path.nil? ? e[name] : find_by_path(e, path)

    puts "PATH: #{path}"
    puts "VAL: #{val}"

    v = val.nil? ? "" : val
    type = c["type"]
    case type
      when "string"
        v
      when "long"
        v.to_i
      when "double"
        v.to_f
      when "boolean"
        if v.nil?
          nil
        elsif v.kind_of?(String)
          ["yes", "true", "1"].include?(v.downcase)
        elsif v.kind_of?(Numeric)
          !v.zero?
        else
          !!v
        end
      when "timestamp"
        v.empty? ? nil : Time.strptime(v, c["format"])
      else
        raise "Unsupported type #{type}"
    end
  end
end