class Egis::Table

Interface for Athena table manipulation.

It is recommended to create table objects using {Egis::Database#table} method.

@!attribute [r] database

@return [Egis::Database]

@!attribute [r] name

@return [String] Athena table name

@!attribute [r] schema

@return [Egis::TableSchema] table's schema object

Constants

DEFAULT_OPTIONS

Attributes

database[R]
name[R]
options[R]
output_downloader[R]
output_parser[R]
partitions_generator[R]
schema[R]
table_data_wiper[R]
table_ddl_generator[R]

Public Class Methods

new(database, name, schema, location, options: {}, client: Egis::Client.new, partitions_generator: Egis::PartitionsGenerator.new, table_ddl_generator: Egis::TableDDLGenerator.new, output_downloader: Egis::OutputDownloader.new(client.aws_s3_client), output_parser: Egis::OutputParser.new, s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client), table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner)) click to toggle source
# File lib/egis/table.rb, line 19
def initialize(database, name, schema, location, options: {},
               client: Egis::Client.new,
               partitions_generator: Egis::PartitionsGenerator.new,
               table_ddl_generator: Egis::TableDDLGenerator.new,
               output_downloader: Egis::OutputDownloader.new(client.aws_s3_client),
               output_parser: Egis::OutputParser.new,
               s3_cleaner: Egis::S3Cleaner.new(client.aws_s3_client),
               table_data_wiper: Egis::TableDataWiper.new(s3_cleaner: s3_cleaner))
  @database = database
  @name = name
  @schema = schema
  @location = location
  @options = DEFAULT_OPTIONS.merge(options)
  @partitions_generator = partitions_generator
  @table_ddl_generator = table_ddl_generator
  @output_downloader = output_downloader
  @output_parser = output_parser
  @table_data_wiper = table_data_wiper
end

Public Instance Methods

add_partitions(partitions) click to toggle source

Creates partitions with all possible combinations of given partition values.

@example

table.add_partitions(year: [2000, 2001], type: ['user'])

@param [Hash] partitions @return [void]

# File lib/egis/table.rb, line 74
def add_partitions(partitions)
  load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: true)
  database.execute_query(load_partitions_query, async: false, system_execution: true)
end
add_partitions!(partitions) click to toggle source

(see add_partitions) It raises error when a partition already exists.

# File lib/egis/table.rb, line 83
def add_partitions!(partitions)
  load_partitions_query = partitions_generator.to_sql(name, partitions, permissive: false)
  database.execute_query(load_partitions_query, async: false, system_execution: true)
end
create() click to toggle source

Creates table in Athena.

@return [void]

# File lib/egis/table.rb, line 46
def create
  log_table_creation

  create_table_sql = table_ddl_generator.create_table_sql(self, permissive: true)
  database.execute_query(create_table_sql, async: false, system_execution: true)
end
create!() click to toggle source

The same as {#create} but raising error when table with a given name already exists.

@return [void]

# File lib/egis/table.rb, line 58
def create!
  log_table_creation

  create_table_sql = table_ddl_generator.create_table_sql(self, permissive: false)
  database.execute_query(create_table_sql, async: false, system_execution: true)
end
discover_partitions() click to toggle source

Tells Athena to automatically discover table's partitions by scanning table's S3 location. This operation might take long time with big number of partitions. If that's the case, instead of this method use {#add_partitions} to define partitions manually.

@return [void]

# File lib/egis/table.rb, line 95
def discover_partitions
  database.execute_query("MSCK REPAIR TABLE #{name};", async: false, system_execution: true)
end
download_data() click to toggle source

Downloads table contents into memory. Mostly useful for testing purposes.

@return [Array] Array of arrays with row values.

# File lib/egis/table.rb, line 127
def download_data
  result = database.execute_query("SELECT * FROM #{name};", async: false, system_execution: true)
  content = output_downloader.download(result.output_location)
  output_parser.parse(content, column_types)
end
format() click to toggle source

@return Table data format

# File lib/egis/table.rb, line 146
def format
  options.fetch(:format)
end
location() click to toggle source

@return [String] table location URL

# File lib/egis/table.rb, line 153
def location
  Egis.mode.s3_path(@location)
end
upload_data(rows) click to toggle source

Insert data into the table. Mostly useful for testing purposes.

@example Insert with array of arrays

table.upload_data([
    ['hello world', 'mx', 1],
    ['hello again', 'us', 2]
])

@example Insert with array of hashes

table.upload_data([
    {message: 'hello world', country: 'mx', type: 1},
    {message: 'hello again', country: 'us', type: 2}
])

@param [Array] rows Array of arrays or hashes with row values @return [void]

# File lib/egis/table.rb, line 117
def upload_data(rows)
  query = data_insert_query(rows)
  database.execute_query(query, async: false, system_execution: true)
end
wipe_data(partitions: nil) click to toggle source

Removes table's content on S3. Optionally, you can limit files removed to specific partitions.

@param [Hash] partitions Partitions values to remove. Follows the same argument format as {#add_partitions}. @return [void]

# File lib/egis/table.rb, line 139
def wipe_data(partitions: nil)
  table_data_wiper.wipe_table_data(self, partitions)
end

Private Instance Methods

all_columns() click to toggle source
# File lib/egis/table.rb, line 170
def all_columns
  schema.columns + schema.partitions
end
column_types() click to toggle source
# File lib/egis/table.rb, line 166
def column_types
  all_columns.map(&:type)
end
data_insert_query(rows) click to toggle source
# File lib/egis/table.rb, line 174
    def data_insert_query(rows)
      insert_values = rows.map { |row| row_literal_values(row) }
      row_clause = insert_values.map { |row| row_values_statement(row) }.join(",\n")

      <<~SQL
        INSERT INTO #{name} VALUES
        #{row_clause}
      SQL
    end
log_table_creation() click to toggle source
# File lib/egis/table.rb, line 162
def log_table_creation
  Egis.logger.info { "Creating table #{database.name}.#{name} located in #{location}" }
end
row_literal_values(row) click to toggle source
# File lib/egis/table.rb, line 184
def row_literal_values(row)
  all_columns.map.with_index do |column, index|
    value = row.is_a?(Hash) ? row[column.name] : row[index]
    Egis::Types.serializer(column.type).literal(value)
  end
end
row_values_statement(row) click to toggle source
# File lib/egis/table.rb, line 191
def row_values_statement(row)
  "(#{row.join(', ')})"
end