class Parqueteur::Converter

Constants

DEFAULT_BATCH_SIZE

Public Class Methods

column(name, type, options = {}, &block) click to toggle source
# File lib/parqueteur/converter.rb, line 15
def self.column(name, type, options = {}, &block)
  columns.add(Parqueteur::Column.new(name, type, options, &block))
end
columns() click to toggle source
# File lib/parqueteur/converter.rb, line 11
def self.columns
  @columns ||= Parqueteur::ColumnCollection.new
end
convert(input, **kwargs) click to toggle source
# File lib/parqueteur/converter.rb, line 27
def self.convert(input, **kwargs)
  new(input, **kwargs).to_io
end
convert_to(input, output_path, **kwargs) click to toggle source
# File lib/parqueteur/converter.rb, line 31
def self.convert_to(input, output_path, **kwargs)
  converter = new(input, **kwargs)
  converter.write(output_path)
end
inline(&block) click to toggle source
# File lib/parqueteur/converter.rb, line 7
def self.inline(&block)
  Class.new(self, &block)
end
new(input, **kwargs) click to toggle source

@param [Enumerable] An enumerable object @option [Symbol] compression - :gzip

# File lib/parqueteur/converter.rb, line 38
def initialize(input, **kwargs)
  @input = Parqueteur::Input.from(input)
  @batch_size = kwargs.fetch(:batch_size, DEFAULT_BATCH_SIZE)
  @compression = kwargs.fetch(:compression, nil)&.to_sym
end
transform(method_name = nil, &block) click to toggle source
# File lib/parqueteur/converter.rb, line 23
def self.transform(method_name = nil, &block)
  transforms << (method_name || block)
end
transforms() click to toggle source
# File lib/parqueteur/converter.rb, line 19
def self.transforms
  @transforms ||= []
end

Public Instance Methods

split(size, batch_size: nil, compression: nil) click to toggle source
# File lib/parqueteur/converter.rb, line 44
def split(size, batch_size: nil, compression: nil)
  Enumerator.new do |arr|
    options = {
      batch_size: batch_size || @batch_size,
      compression: compression || @compression
    }
    @input.each_slice(size) do |records|
      local_converter = self.class.new(records, **options)
      file = local_converter.to_tmpfile
      arr << file
      file.close
      file.unlink
    end
  end
end
split_by_io(size, batch_size: nil, compression: nil) click to toggle source
# File lib/parqueteur/converter.rb, line 60
def split_by_io(size, batch_size: nil, compression: nil)
  Enumerator.new do |arr|
    options = {
      batch_size: batch_size || @batch_size,
      compression: compression || @compression
    }
    @input.each_slice(size) do |records|
      local_converter = self.class.new(records, **options)
      arr << local_converter.to_io
    end
  end
end
to_arrow_table(options = {}) click to toggle source
# File lib/parqueteur/converter.rb, line 110
def to_arrow_table(options = {})
  file = to_tmpfile(options)
  table = Arrow::Table.load(file.path, format: :parquet)
  file.close
  file.unlink
  table
end
to_blob(options = {}) click to toggle source
# File lib/parqueteur/converter.rb, line 118
def to_blob(options = {})
  to_tmpfile(options).read
end
to_io(options = {}) click to toggle source
# File lib/parqueteur/converter.rb, line 102
def to_io(options = {})
  tmpfile = to_tmpfile(options)
  strio = StringIO.new(tmpfile.read)
  tmpfile.close
  tmpfile.unlink
  strio
end
to_tmpfile(options = {}) click to toggle source
# File lib/parqueteur/converter.rb, line 94
def to_tmpfile(options = {})
  tempfile = Tempfile.new
  tempfile.binmode
  write(tempfile.path, **options)
  tempfile.rewind
  tempfile
end
write(path, batch_size: nil, compression: nil) click to toggle source
# File lib/parqueteur/converter.rb, line 73
def write(path, batch_size: nil, compression: nil)
  compression = @compression if compression.nil?
  batch_size = @batch_size if batch_size.nil?
  arrow_schema = self.class.columns.arrow_schema
  writer_properties = Parquet::WriterProperties.new
  if !compression.nil? && compression != false
    writer_properties.set_compression(compression)
  end

  Arrow::FileOutputStream.open(path, false) do |output|
    Parquet::ArrowFileWriter.open(arrow_schema, output, writer_properties) do |writer|
      @input.each_slice(batch_size) do |records|
        arrow_table = build_arrow_table(records)
        writer.write_table(arrow_table, 1024)
      end
    end
  end

  true
end

Private Instance Methods

build_arrow_table(records) click to toggle source
# File lib/parqueteur/converter.rb, line 124
def build_arrow_table(records)
  transforms = self.class.transforms

  values = self.class.columns.each_with_object({}) do |column, hash|
    hash[column.name] = []
  end

  records.each do |item|
    if transforms.length > 0
      transforms.each do |transform|
        item = \
          if transform.is_a?(Symbol)
            __send__(transform, item)
          else
            transform.call(item)
          end
      end
    end

    values.each_key do |value_key|
      if item.key?(value_key)
        values[value_key] << item[value_key]
      else
        values[value_key] << nil
      end
    end
  end

  Arrow::Table.new(
    values.each_with_object({}) do |item, hash|
      column = self.class.columns.find(item[0])
      hash[item[0]] = column.type.build_value_array(item[1])
    end
  )
end