module Sequel::CsvToParquet
Public Instance Methods
Load a CSV file into an existing parquet table. By default, assumes the CSV file has headers that match the column names in the parquet table. If this isn't true, the :headers or :mapping option should be specified.
This works by adding the CSV file to HDFS via hdfs -put, then creating an external CSV table in Impala
, then inserting into parquet table from the CSV table.
Options:
- :empty_null
-
Convert empty CSV cells to N when adding to HDFS, so
Impala
will treat them as NULL instead of the empty string. Defaults to using 2 sed processes to convert empty cells. Can be set to :perl to use a single perl process. Can be set to :ruby to do the processing inside the ruby process, which will also convert quoted CSV cells (which Hive/Impala do not support) to escaped CSV cells (which Hive/Impala do support). - :headers
-
Specify the headers to use in the CSV file, assuming the csv file does not contain headers. If :skip_headers is set to true, this will ignore the existing headers in the file.
- :hdfs_tmp_dir
-
The temporary HDFS directory to use when uploading.
- :mapping
-
Override the mapping of the CSV columns to the parquet table columns. By default, assumes the CSV header names are the same as the parquet table columns, and uses both. If specified this should be a hash with parquet column symbol keys, with the value being the value to insert into the parquet table. This can be used to transform the data from the CSV table when loading it into the parquet table.
- :overwrite
-
Set to true to overwrite existing data in the parquet table with the information from the CSV file. The default is to append the data to the existing parquet table.
- :skip_header
-
Specifies that the first row contains headers and should be skipped when copying the CSV file to HDFS. If not specified, headers are skipped unless the :headers option is given.
- :tmp_table
-
The temporary table name to use for the CSV table.
- :types
-
Specify the types to use for the temporary CSV table. By default, it introspects the parquet table to get the type information, and uses the type for the matching column name.
# File lib/sequel/extensions/csv_to_parquet.rb 47 def load_csv(local_csv_path, into_table, opts={}) 48 tmp_num = SecureRandom.hex(8) 49 hdfs_tmp_dir = opts[:hdfs_tmp_dir] || "/tmp/cvs-#{tmp_num}" 50 hdfs_tmp_file = "#{hdfs_tmp_dir}/#{File.basename(local_csv_path)}" 51 tmp_table = opts[:tmp_table] || "csv_#{tmp_num}" 52 53 skip_header = opts.fetch(:skip_header, !opts.has_key?(:headers)) 54 mapping = opts[:mapping] 55 overwrite = opts[:overwrite] 56 57 raw_data = File.open(local_csv_path, 'rb') 58 59 if columns = opts[:headers] 60 columns = columns.split(',') if columns.is_a?(String) 61 raw_data.readline if skip_header 62 else 63 columns = raw_data.readline.chomp.split(',').map(&:downcase).map(&:to_sym) 64 end 65 raw_data.seek(raw_data.pos, IO::SEEK_SET) 66 67 into_table_columns = describe(into_table) rescue nil 68 69 if types = opts[:types] 70 types = types.split(',') if types.is_a?(String) 71 elsif (into_table_columns) 72 sch = Hash[into_table_columns.map { |h| [h[:name].downcase.to_sym, h[:type]]}] 73 types = columns.map { |col| sch[col] || "string" } 74 else 75 types = ["string"] * columns.length 76 end 77 78 unless types.length == columns.length 79 raise ArgumentError, "number of types doesn't match number of columns" 80 end 81 82 system("hdfs", "dfs", "-mkdir", hdfs_tmp_dir) 83 hdfs_put = ['hdfs', 'dfs', '-put', '-', hdfs_tmp_file] 84 85 case opts[:empty_null] 86 when nil, false 87 system(*hdfs_put, :in=>raw_data) 88 when :ruby 89 error_in_thread = nil 90 csv_data, input = IO.pipe 91 csv_thread = Thread.new do 92 begin 93 comma = ','.freeze 94 comma_rep = '\\,'.freeze 95 nl = "\n".freeze 96 null = '\\N'.freeze 97 empty = ''.freeze 98 99 write_col = lambda do |col, after| 100 if !col || col == empty 101 col = null 102 else 103 col.gsub!(nl, empty) 104 col.gsub!(comma, comma_rep) 105 end 106 input.write(col) 107 input.write(after) 108 end 109 110 raw_data.seek(0, IO::SEEK_SET) 111 CSV.open(raw_data) do |csv| 112 csv.shift if skip_header 113 csv.each do |row| 114 last = row.pop 115 row.each do |col| 116 write_col.call(col, comma) 117 end 118 write_col.call(last, nl) 119 end 120 end 121 ensure 122 input.close 123 csv_data.close 124 end 125 end 126 system(*hdfs_put, :in=>csv_data) 127 csv_thread.join 128 when :perl 129 Open3.pipeline( 130 ['perl', '-p', '-e', 's/(^|,)(?=,|$)/\\1\\\\N/g', {:in=>raw_data}], 131 hdfs_put 132 ) 133 else 134 Open3.pipeline( 135 ['sed', '-r', 's/(^|,)(,|$)/\\1\\\\N\\2/g', {:in=>raw_data}], 136 ['sed', '-r', 's/(^|,)(,|$)/\\1\\\\N\\2/g'], 137 hdfs_put 138 ) 139 end 140 141 create_table(tmp_table, :external=>true, :field_term=>',', :field_escape=>'\\', :location=>hdfs_tmp_dir) do 142 columns.zip(types) do |c, t| 143 column c, t 144 end 145 end 146 147 ds = from(into_table) 148 ds = ds.insert_overwrite if overwrite 149 150 if mapping 151 table_columns, csv_columns = mapping.to_a.transpose 152 else 153 table_columns = csv_columns = into_table_columns.map { |h| h[:name].to_sym } 154 end 155 ds.insert(table_columns, from(tmp_table).select(*csv_columns)) 156 157 ensure 158 raw_data.close if raw_data && !raw_data.closed? 159 160 system("hdfs", "dfs", "-rm", hdfs_tmp_file) 161 system("hdfs", "dfs", "-rmdir", hdfs_tmp_dir) 162 drop_table?(tmp_table) 163 end