module Sequel::CsvToParquet

Public Instance Methods

load_csv(local_csv_path, into_table, opts={}) click to toggle source

Load a CSV file into an existing parquet table. By default, assumes the CSV file has headers that match the column names in the parquet table. If this isn't true, the :headers or :mapping option should be specified.

This works by adding the CSV file to HDFS via hdfs -put, then creating an external CSV table in Impala, then inserting into parquet table from the CSV table.

Options:

:empty_null

Convert empty CSV cells to N when adding to HDFS, so Impala will treat them as NULL instead of the empty string. Defaults to using 2 sed processes to convert empty cells. Can be set to :perl to use a single perl process. Can be set to :ruby to do the processing inside the ruby process, which will also convert quoted CSV cells (which Hive/Impala do not support) to escaped CSV cells (which Hive/Impala do support).

:headers

Specify the headers to use in the CSV file, assuming the csv file does not contain headers. If :skip_headers is set to true, this will ignore the existing headers in the file.

:hdfs_tmp_dir

The temporary HDFS directory to use when uploading.

:mapping

Override the mapping of the CSV columns to the parquet table columns. By default, assumes the CSV header names are the same as the parquet table columns, and uses both. If specified this should be a hash with parquet column symbol keys, with the value being the value to insert into the parquet table. This can be used to transform the data from the CSV table when loading it into the parquet table.

:overwrite

Set to true to overwrite existing data in the parquet table with the information from the CSV file. The default is to append the data to the existing parquet table.

:skip_header

Specifies that the first row contains headers and should be skipped when copying the CSV file to HDFS. If not specified, headers are skipped unless the :headers option is given.

:tmp_table

The temporary table name to use for the CSV table.

:types

Specify the types to use for the temporary CSV table. By default, it introspects the parquet table to get the type information, and uses the type for the matching column name.

    # File lib/sequel/extensions/csv_to_parquet.rb
 47 def load_csv(local_csv_path, into_table, opts={})
 48   tmp_num = SecureRandom.hex(8)
 49   hdfs_tmp_dir = opts[:hdfs_tmp_dir] || "/tmp/cvs-#{tmp_num}"
 50   hdfs_tmp_file = "#{hdfs_tmp_dir}/#{File.basename(local_csv_path)}"
 51   tmp_table = opts[:tmp_table] || "csv_#{tmp_num}"
 52 
 53   skip_header = opts.fetch(:skip_header, !opts.has_key?(:headers))
 54   mapping = opts[:mapping]
 55   overwrite = opts[:overwrite]
 56 
 57   raw_data = File.open(local_csv_path, 'rb')
 58 
 59   if columns = opts[:headers]
 60     columns = columns.split(',') if columns.is_a?(String)
 61     raw_data.readline if skip_header
 62   else
 63     columns = raw_data.readline.chomp.split(',').map(&:downcase).map(&:to_sym)
 64   end
 65   raw_data.seek(raw_data.pos, IO::SEEK_SET)
 66 
 67   into_table_columns = describe(into_table) rescue nil
 68 
 69   if types = opts[:types]
 70     types = types.split(',') if types.is_a?(String)
 71   elsif (into_table_columns)
 72     sch = Hash[into_table_columns.map { |h| [h[:name].downcase.to_sym, h[:type]]}]
 73     types = columns.map { |col| sch[col] || "string" }
 74   else
 75     types = ["string"] * columns.length
 76   end
 77 
 78   unless types.length == columns.length
 79     raise ArgumentError, "number of types doesn't match number of columns"
 80   end
 81 
 82   system("hdfs", "dfs", "-mkdir", hdfs_tmp_dir)
 83   hdfs_put = ['hdfs', 'dfs', '-put', '-', hdfs_tmp_file]
 84 
 85   case opts[:empty_null]
 86   when nil, false
 87     system(*hdfs_put, :in=>raw_data)
 88   when :ruby
 89     error_in_thread = nil
 90     csv_data, input = IO.pipe
 91     csv_thread = Thread.new do
 92       begin
 93         comma = ','.freeze
 94         comma_rep = '\\,'.freeze
 95         nl = "\n".freeze
 96         null = '\\N'.freeze
 97         empty = ''.freeze
 98 
 99         write_col = lambda do |col, after|
100           if !col || col == empty
101             col = null
102           else
103             col.gsub!(nl, empty)
104             col.gsub!(comma, comma_rep)
105           end
106           input.write(col)
107           input.write(after)
108         end
109 
110         raw_data.seek(0, IO::SEEK_SET)
111         CSV.open(raw_data) do |csv|
112           csv.shift if skip_header
113           csv.each do |row|
114             last = row.pop
115             row.each do |col|
116               write_col.call(col, comma)
117             end
118             write_col.call(last, nl)
119           end
120         end
121       ensure
122         input.close
123         csv_data.close
124       end
125     end
126     system(*hdfs_put, :in=>csv_data)
127     csv_thread.join
128   when :perl
129     Open3.pipeline(
130       ['perl', '-p', '-e', 's/(^|,)(?=,|$)/\\1\\\\N/g', {:in=>raw_data}],
131       hdfs_put
132     )
133   else
134     Open3.pipeline(
135       ['sed', '-r', 's/(^|,)(,|$)/\\1\\\\N\\2/g', {:in=>raw_data}],
136       ['sed', '-r', 's/(^|,)(,|$)/\\1\\\\N\\2/g'],
137       hdfs_put
138     )
139   end
140 
141   create_table(tmp_table, :external=>true, :field_term=>',', :field_escape=>'\\', :location=>hdfs_tmp_dir) do
142     columns.zip(types) do |c, t|
143       column c, t
144     end
145   end
146 
147   ds = from(into_table)
148   ds = ds.insert_overwrite if overwrite
149 
150   if mapping
151     table_columns, csv_columns = mapping.to_a.transpose
152   else
153     table_columns = csv_columns = into_table_columns.map { |h| h[:name].to_sym }
154   end
155   ds.insert(table_columns, from(tmp_table).select(*csv_columns))
156 
157 ensure
158   raw_data.close if raw_data && !raw_data.closed?
159 
160   system("hdfs", "dfs", "-rm", hdfs_tmp_file)
161   system("hdfs", "dfs", "-rmdir", hdfs_tmp_dir)
162   drop_table?(tmp_table)
163 end