class Egis::Client

The most fundamental {Egis} class. Provides an interface for executing Athena queries.

@yieldparam config [Egis::Configuration] Egis configuration block, if missing Egis will use global configuration

provided by {Egis.configure}

See configuration instructions {Egis.configure}.

@see Egis.configure

@example Create client and execute asynchronous query

client = Egis::Client.new
status = client.execute_query('SELECT * FROM my_table;')

while status.in_progress?
  # do something useful
  # ...
  status = client.query_status(status.id)
end

status.output_location.url # s3://my-bucket/result/path

@example Execute synchronous query and fetch results

status = client.execute_query('SELECT MAX(time), MIN(id) FROM my_table;', async: false)
status.fetch_result(schema: [:timestamp, :int]) # [[2020-05-04 11:19:03 +0200, 7]]

Constants

QUERY_STATUS_MAPPING

Attributes

aws_athena_client[R]
aws_s3_client[R]
configuration[R]
s3_location_parser[R]

Public Class Methods

new(aws_client_provider: Egis::AwsClientProvider.new, s3_location_parser: Egis::S3LocationParser.new, &block) click to toggle source
# File lib/egis/client.rb, line 43
def initialize(aws_client_provider: Egis::AwsClientProvider.new,
               s3_location_parser: Egis::S3LocationParser.new,
               &block)
  @configuration = block_given? ? Egis.configuration.dup.configure(&block) : Egis.configuration
  @aws_athena_client = aws_client_provider.athena_client(configuration)
  @aws_s3_client = aws_client_provider.s3_client(configuration)
  @s3_location_parser = s3_location_parser
end

Public Instance Methods

database(database_name) click to toggle source

Creates {Egis::Database} object with a given name. Executing it doesn't create Athena database yet.

@param [String] database_name @return [Egis::Database]

# File lib/egis/client.rb, line 58
def database(database_name)
  Database.new(database_name, client: self)
end
execute_query(query, work_group: nil, database: nil, output_location: nil, async: true, system_execution: false) click to toggle source

Executes Athena query. By default, queries are being executed asynchronously.

@param [String] query SQL query to execute @param [Boolean] async Decide whether you want to run query asynchronously or block execution until it finishes @param [String] work_group Change Athena work group the query will be executed in. @param [String] database Run query in the context of a specific database (implicit table references are expected

to be in given database).

@param [String] output_location S3 url of the desired output location. By default, Athena uses location defined in

by workgroup.

@return [Egis::QueryStatus]

# File lib/egis/client.rb, line 74
def execute_query(query, work_group: nil, database: nil, output_location: nil, async: true, system_execution: false)
  query_id = aws_athena_client.start_query_execution(
    query_execution_params(query, work_group, database, output_location)
  ).query_execution_id

  log_query_execution(query, query_id, system_execution)

  return query_status(query_id) if Egis.mode.async(async)

  query_status = wait_for_query_to_finish(query_id)

  raise Egis::Errors::QueryExecutionError, query_status.message unless query_status.finished?

  query_status
end
query_status(query_id) click to toggle source

Check the status of asynchronous query execution.

@param [String] query_id Query id from {Egis::QueryStatus} returned by {#execute_query} method @return [Egis::QueryStatus]

# File lib/egis/client.rb, line 96
def query_status(query_id)
  resp = aws_athena_client.get_query_execution(query_execution_id: query_id)

  query_execution = resp.query_execution
  query_status = query_execution.status.state

  Egis.logger.debug { "Checking query status (#{query_id}): #{query_status}" }

  Egis::QueryStatus.new(
    query_execution.query_execution_id,
    QUERY_STATUS_MAPPING.fetch(query_status),
    query_execution.status.state_change_reason,
    parse_output_location(query_execution),
    client: self
  )
end

Private Instance Methods

database_name(name) click to toggle source
# File lib/egis/client.rb, line 159
def database_name(name)
  Egis.mode.database_name(name)
end
log_query_execution(query, query_id, system_execution) click to toggle source
# File lib/egis/client.rb, line 127
def log_query_execution(query, query_id, system_execution)
  if system_execution
    Egis.logger.debug { "Executing system query (#{query_id}): #{query.gsub(/\s+/, ' ')}" }
  else
    Egis.logger.info { "Executing query (#{query_id}): #{query.gsub(/\s+/, ' ')}" }
  end
end
parse_output_location(query_execution) click to toggle source
# File lib/egis/client.rb, line 147
def parse_output_location(query_execution)
  url = query_execution.result_configuration.output_location

  bucket, path = s3_location_parser.parse_url(url)

  QueryOutputLocation.new(url, bucket, path)
end
query_execution_params(query, work_group, database, output_location) click to toggle source
# File lib/egis/client.rb, line 117
def query_execution_params(query, work_group, database, output_location)
  work_group_params = work_group || configuration.work_group

  params = {query_string: query}
  params[:work_group] = work_group_params if work_group_params
  params[:query_execution_context] = {database: database_name(database)} if database
  params[:result_configuration] = {output_location: translate_path(output_location)} if output_location
  params
end
translate_path(s3_url) click to toggle source
# File lib/egis/client.rb, line 155
def translate_path(s3_url)
  Egis.mode.s3_path(s3_url)
end
wait_for_query_to_finish(query_id) click to toggle source
# File lib/egis/client.rb, line 135
def wait_for_query_to_finish(query_id)
  attempt = 1
  loop do
    sleep(configuration.query_status_backoff.call(attempt))
    status = query_status(query_id)

    return status unless status.queued? || status.running?

    attempt += 1
  end
end