class BigQueryAdapter::Connection

Driver for bigquery connection

Constants

Result

Attributes

project[R]

Public Class Methods

new(project:, keyfile:, timeout: nil, datasets: []) click to toggle source
# File lib/big_query_adapter/connection.rb, line 11
def initialize(project:, keyfile:, timeout: nil, datasets: [])
  @project = project
  @bigquery = Google::Cloud::Bigquery.new(
    project: project,
    keyfile: keyfile
  )
  @dataset_ids = datasets
  @timeout = timeout.to_i if timeout
end

Public Instance Methods

columns(table_name) click to toggle source
# File lib/big_query_adapter/connection.rb, line 43
def columns(table_name)
  table_schema = table_schema(table_name)
  return [] if table_schema.fields.nil?
  table_schema.fields
end
run(statement) click to toggle source
# File lib/big_query_adapter/connection.rb, line 21
def run(statement)
  columns = []
  rows = []

  options = {}
  options[:timeout] = @timeout if @timeout
  results = @bigquery.query(statement, options) # ms
  if results.complete?
    columns = results.first.keys.map(&:to_s) unless results.empty?
    rows = results.map(&:values)
  end

  Result.new(columns, rows)
end
tables() click to toggle source
# File lib/big_query_adapter/connection.rb, line 36
def tables
  table_refs
    .map { |table_ref| table_ref_name(table_ref) }
    .group_by { |table_ref_name| table_ref_wildcard_name(table_ref_name) }
    .keys
end

Private Instance Methods

datasets() click to toggle source
# File lib/big_query_adapter/connection.rb, line 74
def datasets
  return @bigquery.datasets if @dataset_ids.empty?
  @bigquery.datasets.select do |dataset|
    @dataset_ids.include?(dataset.dataset_id)
  end
end
partitioned_table?(table_ref_name) click to toggle source
# File lib/big_query_adapter/connection.rb, line 65
def partitioned_table?(table_ref_name)
  return false if table_ref_name.split('_').size < 2
  date_str = table_ref_name.split('_').last
  date = Date.strptime(date_str, '%Y%m%d')
  return !date.nil?
rescue StandardError => _error
  return nil
end
table_ref(table_name) click to toggle source
# File lib/big_query_adapter/connection.rb, line 87
def table_ref(table_name)
  if table_name.ends_with?('_*')
    table_name = table_name[0...-1]
    table_refs.find do |table_ref|
      table_ref_name(table_ref) =~ /#{table_name}[0-9]{8}/
    end
  else
    table_refs.find { |table_ref| table_ref_name(table_ref) == table_name }
  end
end
table_ref_name(table_ref) click to toggle source
# File lib/big_query_adapter/connection.rb, line 51
def table_ref_name(table_ref)
  "#{table_ref.project_id}.#{table_ref.dataset_id}.#{table_ref.table_id}"
end
table_ref_wildcard_name(table_ref_name) click to toggle source
# File lib/big_query_adapter/connection.rb, line 55
def table_ref_wildcard_name(table_ref_name)
  if partitioned_table?(table_ref_name)
    base_name = table_ref_name.split('_')
    base_name.pop
    base_name.join('_') << '_*'
  else
    table_ref_name
  end
end
table_refs() click to toggle source
# File lib/big_query_adapter/connection.rb, line 81
def table_refs
  datasets
    .map(&:tables)
    .flat_map { |table_list| table_list.map(&:table_ref) }
end
table_schema(table_name) click to toggle source
# File lib/big_query_adapter/connection.rb, line 98
def table_schema(table_name)
  table_ref = table_ref(table_name)
  @bigquery.service.get_table(
    table_ref.dataset_id,
    table_ref.table_id
  ).schema
end