class Embulk::Guess::Msgpack

Public Instance Methods

guess(config, sample_buffer) click to toggle source
# File lib/embulk/guess/msgpack.rb, line 7
def guess(config, sample_buffer)
  return {} unless config.fetch("parser", {}).fetch("type", "msgpack") == "msgpack"

  parser_config = config["parser"] || {}

  classpath = File.expand_path('../../../../classpath', __FILE__)
  Dir["#{classpath}/*.jar"].each {|jar| require jar }

  file_encoding = parser_config["file_encoding"]
  row_encoding = parser_config["row_encoding"]

  if !file_encoding || !row_encoding
    uk = new_unpacker(sample_buffer)
    begin
      n = uk.unpackArrayHeader
      begin
        n = uk.unpackArrayHeader
        file_encoding = "array"
        row_encoding = "array"
      rescue org.msgpack.core.MessageTypeException
        file_encoding = "sequence"
        row_encoding = "array"
      end
    rescue org.msgpack.core.MessageTypeException
      uk = new_unpacker(sample_buffer)  # TODO unpackArrayHeader consumes buffer (unexpectedly)
      begin
        n = uk.unpackMapHeader
        file_encoding = "sequence"
        row_encoding = "map"
      rescue org.msgpack.core.MessageTypeException
        return {}  # not a msgpack
      end
    end
  end

  uk = new_unpacker(sample_buffer)

  case file_encoding
  when "array"
    uk.unpackArrayHeader  # skip array header to convert to sequence
  when "sequence"
    # do nothing
  end

  rows = []

  begin
    while true
      rows << JSON.parse(uk.unpackValue.toJson)
    end
  rescue java.io.EOFException
  end

  if rows.size <= 3
    return {}
  end

  case row_encoding
  when "map"
    schema = Embulk::Guess::SchemaGuess.from_hash_records(rows)
  when "array"
    column_count = rows.map {|r| r.size }.max
    column_names = column_count.times.map {|i| "c#{i}" }
    schema = Embulk::Guess::SchemaGuess.from_array_records(column_names, rows)
  end

  parser_guessed = {"type" => "msgpack"}
  parser_guessed["row_encoding"] = row_encoding
  parser_guessed["file_encoding"] = file_encoding
  parser_guessed["columns"] = schema

  return {"parser" => parser_guessed}

rescue org.msgpack.core.MessagePackException
  return {}
end
new_unpacker(sample_buffer) click to toggle source
# File lib/embulk/guess/msgpack.rb, line 84
def new_unpacker(sample_buffer)
  org.msgpack.core.MessagePack.newDefaultUnpacker(sample_buffer.to_java_bytes)
end