class Kura::Client

Public Class Methods

new(default_project_id: nil, email_address: nil, private_key: nil, scope: nil, http_options: {timeout: 60}, default_retries: 5) click to toggle source
# File lib/kura/client.rb, line 12
def initialize(default_project_id: nil, email_address: nil, private_key: nil, scope: nil, http_options: {timeout: 60}, default_retries: 5)
  @default_project_id = default_project_id
  @scope = ["https://www.googleapis.com/auth/bigquery"] | (scope ? scope.is_a?(Array) ? scope : [scope] : [])
  @email_address = email_address
  @private_key = private_key
  if @email_address and @private_key
    auth = Signet::OAuth2::Client.new(
      token_credential_uri: "https://accounts.google.com/o/oauth2/token",
      audience: "https://accounts.google.com/o/oauth2/token",
      scope: @scope,
      issuer: @email_address,
      signing_key: @private_key)
    # MEMO: signet-0.6.1 depend on Farady.default_connection
    Faraday.default_connection.options.timeout = 60
    auth.fetch_access_token!
  else
    auth = Google::Auth.get_application_default(@scope)
    auth.fetch_access_token!
  end
  Google::Apis::RequestOptions.default.retries = default_retries
  # to support google-api-client.gem both 0.10.x and 0.11.x
  # see https://github.com/google/google-api-ruby-client/blob/master/MIGRATING.md
  if Google::Apis::ClientOptions.default.respond_to?(:open_timeout_sec)
    Google::Apis::ClientOptions.default.open_timeout_sec = http_options[:timeout]
    Google::Apis::ClientOptions.default.read_timeout_sec = http_options[:timeout]
    Google::Apis::ClientOptions.default.send_timeout_sec = http_options[:timeout]
  else
    Google::Apis::RequestOptions.default.open_timeout_sec = http_options[:timeout]
    Google::Apis::RequestOptions.default.timeout_sec = http_options[:timeout]
  end
  @api = Google::Apis::BigqueryV2::BigqueryService.new
  @api.authorization = auth

  if @default_project_id.nil?
    @default_project_id = self.projects.first.id
  end
end

Public Instance Methods

_convert_tabledata_field(x, field_info, convert_numeric_to_float: true) click to toggle source
# File lib/kura/client.rb, line 256
def _convert_tabledata_field(x, field_info, convert_numeric_to_float: true)
  if x.nil? and (field_info["mode"] == "NULLABLE" or field_info["mode"].nil?) # The tables created by New BigQuery Console could have schema without mode...
    return nil
  end
  case field_info["type"]
  when "STRING"
    x.to_s
  when "INTEGER"
    Integer(x)
  when "FLOAT"
    case x
    when "Infinity"
      Float::INFINITY
    when "-Infinity"
      -Float::INFINITY
    when "NaN"
      Float::NAN
    else
      Float(x)
    end
  when "BOOLEAN"
    x.to_s == "true"
  when "TIMESTAMP"
    Time.at(Float(x)).utc.iso8601(6)
  when "RECORD"
    _convert_tabledata_row(x, field_info["fields"], convert_numeric_to_float: convert_numeric_to_float)
  when "NUMERIC"
    if convert_numeric_to_float
      Float(x)
    else
      x
    end
  else
    x
  end
end
_convert_tabledata_row(row, schema, convert_numeric_to_float: true) click to toggle source
# File lib/kura/client.rb, line 293
def _convert_tabledata_row(row, schema, convert_numeric_to_float: true)
  (row.respond_to?(:f) ? row.f : row["f"]).zip(schema).each_with_object({}) do |(v, s), tbl|
    v = JSON.parse(v.to_json)
    if s["mode"] == "REPEATED"
      tbl[s["name"]] = v["v"].map{|c| _convert_tabledata_field(c["v"], s, convert_numeric_to_float: convert_numeric_to_float) }
    else
      tbl[s["name"]] = _convert_tabledata_field(v["v"], s, convert_numeric_to_float: convert_numeric_to_float)
    end
  end
end
batch() { || ... } click to toggle source
# File lib/kura/client.rb, line 79
def batch
  @api.batch do |api|
    original_api, @api = @api, api
    begin
      yield
    ensure
      @api = original_api
    end
  end
end
cancel_job(job, location: nil, project_id: @default_project_id, &blk) click to toggle source
# File lib/kura/client.rb, line 698
def cancel_job(job, location: nil, project_id: @default_project_id, &blk)
  case job
  when String
    jobid = job
  when Google::Apis::BigqueryV2::Job
    project_id = job.job_reference.project_id
    jobid = job.job_reference.job_id
  else
    raise TypeError, "Kura::Client#cancel_job accept String(job-id) or Google::Apis::BigqueryV2::Job"
  end
  if blk
    @api.cancel_job(project_id, jobid, location: location) do |r, e|
      j = (r && r.job)
      j.kura_api = self if j
      blk.call(j, e)
    end
  else
    @api.cancel_job(project_id, jobid, location: location).job.tap{|j| j.kura_api = self if j }
  end
end
copy(src_dataset_id, src_table_id, dest_dataset_id, dest_table_id, mode: :truncate, src_project_id: @default_project_id, dest_project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, wait: nil, dry_run: false, **kwrest, &blk) click to toggle source
# File lib/kura/client.rb, line 624
def copy(src_dataset_id, src_table_id, dest_dataset_id, dest_table_id,
         mode: :truncate,
         src_project_id: @default_project_id,
         dest_project_id: @default_project_id,
         job_project_id: @default_project_id,
         job_id: nil,
         wait: nil,
         dry_run: false,
         **kwrest,
         &blk)
  write_disposition = mode_to_write_disposition(mode)
  configuration = Google::Apis::BigqueryV2::JobConfiguration.new(
    copy: Google::Apis::BigqueryV2::JobConfigurationTableCopy.new(
      destination_table: Google::Apis::BigqueryV2::TableReference.new(
        project_id: dest_project_id,
        dataset_id: dest_dataset_id,
        table_id: dest_table_id,
      ),
      source_table: Google::Apis::BigqueryV2::TableReference.new(
        project_id: src_project_id,
        dataset_id: src_dataset_id,
        table_id: src_table_id,
      ),
      write_disposition: write_disposition,
    )
  )
  if dry_run
    configuration.dry_run = true
    wait = nil
  end
  kwrest.each do |kw, opt|
    if configuration.copy.respond_to?("#{kw}=")
      configuration.copy.__send__("#{kw}=", opt)
    else
      raise ArgumentError, "Unknown keyword argument for Kura::Client#copy: #{kw}"
    end
  end
  insert_job(configuration, wait: wait, job_id: job_id, project_id: job_project_id, &blk)
end
dataset(dataset_id, project_id: @default_project_id, &blk) click to toggle source
# File lib/kura/client.rb, line 119
def dataset(dataset_id, project_id: @default_project_id, &blk)
  if blk
    @api.get_dataset(project_id, dataset_id) do |result, err|
      if err.respond_to?(:status_code) and err.status_code == 404
        result = nil
        err = nil
      end
      blk.call(result, err)
    end
  else
    @api.get_dataset(project_id, dataset_id)
  end
rescue
  return nil if $!.respond_to?(:status_code) and $!.status_code == 404
  process_error($!)
end
datasets(project_id: @default_project_id, all: false, limit: 1000, &blk) click to toggle source
# File lib/kura/client.rb, line 104
def datasets(project_id: @default_project_id, all: false, limit: 1000, &blk)
  all = normalize_parameter(all)
  if blk
    @api.list_datasets(project_id, all: all, max_results: limit) do |result, err|
      result &&= (result.datasets || [])
      blk.call(result, err)
    end
  else
    result = @api.list_datasets(project_id, all: all, max_results: limit)
    result.datasets || []
  end
rescue
  process_error($!)
end
delete_dataset(dataset_id, project_id: @default_project_id, delete_contents: false, &blk) click to toggle source
# File lib/kura/client.rb, line 151
def delete_dataset(dataset_id, project_id: @default_project_id, delete_contents: false, &blk)
  delete_contents = normalize_parameter(delete_contents)
  @api.delete_dataset(project_id, dataset_id, delete_contents: delete_contents, &blk)
rescue
  return nil if $!.respond_to?(:status_code) and $!.status_code == 404
  process_error($!)
end
delete_model(dataset_id, model_id, project_id: @default_project_id, &blk) click to toggle source
# File lib/kura/client.rb, line 799
def delete_model(dataset_id, model_id, project_id: @default_project_id, &blk)
  @api.delete_model(project_id, dataset_id, model_id, &blk)
rescue
  return nil if $!.respond_to?(:status_code) and $!.status_code == 404
  process_error($!)
end
delete_routine(dataset_id, routine_id, project_id: @default_project_id, &blk) click to toggle source
# File lib/kura/client.rb, line 837
def delete_routine(dataset_id, routine_id, project_id: @default_project_id, &blk)
  @api.delete_routine(project_id, dataset_id, routine_id, &blk)
rescue
  return nil if $!.respond_to?(:status_code) and $!.status_code == 404
  process_error($!)
end
delete_table(dataset_id, table_id, project_id: @default_project_id, &blk) click to toggle source
# File lib/kura/client.rb, line 249
def delete_table(dataset_id, table_id, project_id: @default_project_id, &blk)
  @api.delete_table(project_id, dataset_id, table_id, &blk)
rescue
  return nil if $!.respond_to?(:status_code) and $!.status_code == 404
  process_error($!)
end
extract(dataset_id, table_id, dest_uris, compression: "NONE", destination_format: "CSV", field_delimiter: ",", print_header: true, project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, wait: nil, dry_run: false, **kwrest, &blk) click to toggle source
# File lib/kura/client.rb, line 581
def extract(dataset_id, table_id, dest_uris,
            compression: "NONE",
            destination_format: "CSV",
            field_delimiter: ",",
            print_header: true,
            project_id: @default_project_id,
            job_project_id: @default_project_id,
            job_id: nil,
            wait: nil,
            dry_run: false,
            **kwrest,
            &blk)
  dest_uris = [ dest_uris ] if dest_uris.is_a?(String)
  configuration = Google::Apis::BigqueryV2::JobConfiguration.new(
    extract: Google::Apis::BigqueryV2::JobConfigurationExtract.new(
      compression: compression,
      destination_format: destination_format,
      source_table: Google::Apis::BigqueryV2::TableReference.new(
        project_id: project_id,
        dataset_id: dataset_id,
        table_id: table_id,
      ),
      destination_uris: dest_uris,
    )
  )
  if dry_run
    configuration.dry_run = true
    wait = nil
  end
  if destination_format == "CSV"
    configuration.extract.field_delimiter = field_delimiter
    configuration.extract.print_header = normalize_parameter(print_header)
  end
  kwrest.each do |kw, opt|
    if configuration.extract.respond_to?("#{kw}=")
      configuration.extract.__send__("#{kw}=", opt)
    else
      raise ArgumentError, "Unknown keyword argument for Kura::Client#extract: #{kw}"
    end
  end
  insert_job(configuration, wait: wait, job_id: job_id, project_id: job_project_id, &blk)
end
insert_dataset(dataset_id, project_id: @default_project_id, &blk) click to toggle source
# File lib/kura/client.rb, line 136
def insert_dataset(dataset_id, project_id: @default_project_id, &blk)
  case dataset_id
  when String
    obj = Google::Apis::BigqueryV2::Dataset.new(dataset_reference: Google::Apis::BigqueryV2::DatasetReference.new(project_id: project_id, dataset_id: dataset_id))
  when Hash
    obj = Google::Apis::BigqueryV2::Dataset.new(**dataset_id)
  when Google::Apis::BigqueryV2::Dataset
    obj = dataset_id
  end

  @api.insert_dataset(project_id, obj, &blk)
rescue
  process_error($!)
end
insert_job(configuration, job_id: nil, project_id: @default_project_id, media: nil, wait: nil, &blk) click to toggle source
# File lib/kura/client.rb, line 379
def insert_job(configuration, job_id: nil, project_id: @default_project_id, media: nil, wait: nil, &blk)
  job_object = Google::Apis::BigqueryV2::Job.new
  job_object.configuration = configuration
  if job_id
    job_object.job_reference = Google::Apis::BigqueryV2::JobReference.new
    job_object.job_reference.project_id = project_id
    job_object.job_reference.job_id = job_id
  end
  if wait
    job = @api.insert_job(project_id, job_object, upload_source: media)
    job.kura_api = self
    wait_job(job, wait, &blk)
  else
    if blk
      @api.insert_job(project_id, job_object, upload_source: media) do |r, err|
        if r
          r.kura_api = self
        end
        blk.call(r, err)
      end
    else
      job = @api.insert_job(project_id, job_object, upload_source: media)
      job.kura_api = self
      job
    end
  end
rescue
  process_error($!)
end
insert_routine(dataset_id, routine_id, body, project_id: @default_project_id, routine_type: "PROCEDURE", language: "SQL", arguments: [], return_type: nil, imported_libraries: [], description: nil) click to toggle source
# File lib/kura/client.rb, line 844
def insert_routine(dataset_id,
                   routine_id,
                   body,
                   project_id: @default_project_id,
                   routine_type: "PROCEDURE",
                   language: "SQL",
                   arguments: [],
                   return_type: nil,
                   imported_libraries: [],
                   description: nil)
  @api.insert_routine(
    project_id,
    dataset_id,
    Google::Apis::BigqueryV2::Routine.new(
      routine_reference: Google::Apis::BigqueryV2::RoutineReference.new(
        project_id: project_id,
        dataset_id: dataset_id,
        routine_id: routine_id
      ),
      arguments: arguments,
      definition_body: body,
      imported_libraries: imported_libraries,
      language: language,
      return_type: return_type,
      routine_type: routine_type,
      description: description
    ))
end
insert_table(dataset_id, table_id, project_id: @default_project_id, expiration_time: nil, friendly_name: nil, schema: nil, description: nil, query: nil, external_data_configuration: nil, use_legacy_sql: false, time_partitioning: nil, clustering_fields: [], &blk) click to toggle source
# File lib/kura/client.rb, line 201
def insert_table(dataset_id, table_id, project_id: @default_project_id, expiration_time: nil,
                 friendly_name: nil, schema: nil, description: nil,
                 query: nil, external_data_configuration: nil,
                 use_legacy_sql: false,
                 time_partitioning: nil,
                 clustering_fields: [],
                 &blk)
  if expiration_time
    expiration_time = (expiration_time.to_f * 1000.0).to_i
  end
  if query
    view = { query: query, use_legacy_sql: !!use_legacy_sql }
  elsif external_data_configuration
  elsif schema
    schema = { fields: normalize_schema(schema) }
  end
  table = Google::Apis::BigqueryV2::Table.new(
    table_reference: {project_id: project_id, dataset_id: dataset_id, table_id: table_id},
    friendly_name: friendly_name,
    description: description,
    schema: schema,
    expiration_time: expiration_time,
    view: view,
    external_data_configuration: external_data_configuration)
  if time_partitioning
    table.time_partitioning = Google::Apis::BigqueryV2::TimePartitioning.new(**time_partitioning)
  end
  if clustering_fields and clustering_fields.size > 0
    table.clustering = Google::Apis::BigqueryV2::Clustering.new(fields: clustering_fields)
  end
  @api.insert_table(project_id, dataset_id, table, &blk)
rescue
  process_error($!)
end
insert_tabledata(dataset_id, table_id, rows, project_id: @default_project_id, ignore_unknown_values: false, skip_invalid_rows: false, template_suffix: nil) click to toggle source
# File lib/kura/client.rb, line 341
def insert_tabledata(dataset_id, table_id, rows, project_id: @default_project_id, ignore_unknown_values: false, skip_invalid_rows: false, template_suffix: nil)
  request = Google::Apis::BigqueryV2::InsertAllTableDataRequest.new
  request.ignore_unknown_values = ignore_unknown_values
  request.skip_invalid_rows = skip_invalid_rows
  if template_suffix
    request.template_suffix = template_suffix
  end
  request.rows = rows.map do |r|
    case r
    when Google::Apis::BigqueryV2::InsertAllTableDataRequest::Row
      r
    when Hash
      row = Google::Apis::BigqueryV2::InsertAllTableDataRequest::Row.new
      if r.keys.map(&:to_s) == %w{ insert_id json }
        row.insert_id = r[:insert_id] || r["insert_id"]
        row.json = r[:json] || r["json"]
      else
        row.json = r
      end
      row
    else
      raise ArgumentError, "invalid row for BigQuery tabledata.insertAll #{r.inspect}"
    end
  end

  @api.insert_all_table_data(project_id, dataset_id, table_id, request)
rescue
  process_error($!)
end
job(job_id, location: nil, project_id: @default_project_id, fields: nil, &blk) click to toggle source
# File lib/kura/client.rb, line 685
def job(job_id, location: nil, project_id: @default_project_id, fields: nil, &blk)
  if blk
    @api.get_job(project_id, job_id, location: location, fields: fields) do |j, e|
      j.kura_api = self if j
      blk.call(j, e)
    end
  else
    @api.get_job(project_id, job_id, location: location, fields: fields).tap{|j| j.kura_api = self if j }
  end
rescue
  process_error($!)
end
job_finished?(r) click to toggle source
# File lib/kura/client.rb, line 719
def job_finished?(r)
  if r.status.state == "DONE"
    if r.status.error_result
      if r.status.errors
        raise Kura::ApiError.new(r.status.errors.map(&:reason).join(","),
                                 r.status.errors.map{|e|
          msg = "reason=#{e.reason} message=#{e.message}"
          msg += " location=#{e.location}" if e.location
          msg += " debug_info=#{e.debug_info}" if e.debug_info
          msg
        }.join("\n"))
      else
        e = r.status.error_result
        msg = "reason=#{e.reason} message=#{e.message}"
        msg += " location=#{e.location}" if e.location
        msg += " debug_info=#{e.debug_info}" if e.debug_info
        raise Kura::ApiError.new(e.reason, msg)
      end
    end
    return true
  end
  return false
end
jobs(project_id: @default_project_id, all_users: nil, max_creation_time: nil, min_creation_time: nil, max_results: nil, page_token: nil, parent_job_id: nil, projection: nil, state_filter: nil) click to toggle source
# File lib/kura/client.rb, line 664
def jobs(project_id: @default_project_id,
         all_users: nil,
         max_creation_time: nil,
         min_creation_time: nil,
         max_results: nil,
         page_token: nil,
         parent_job_id: nil,
         projection: nil,
         state_filter: nil)
  @api.list_jobs(
    project_id,
    all_users: all_users,
    max_creation_time: max_creation_time,
    min_creation_time: min_creation_time,
    max_results: max_results,
    page_token: page_token,
    parent_job_id: parent_job_id,
    projection: projection,
    state_filter: state_filter)
end
list_tabledata(dataset_id, table_id, project_id: @default_project_id, start_index: 0, max_result: 100, page_token: nil, schema: nil, convert_numeric_to_float: true, &blk) click to toggle source
# File lib/kura/client.rb, line 315
def list_tabledata(dataset_id, table_id, project_id: @default_project_id, start_index: 0, max_result: 100, page_token: nil, schema: nil, convert_numeric_to_float: true, &blk)
  if schema.nil?
    _t = table(dataset_id, table_id, project_id: project_id)
    if _t
      schema = table(dataset_id, table_id, project_id: project_id).schema.fields
    else
      raise Kura::ApiError.new("notFound", "Not found: Table #{project_id}:#{dataset_id}.#{table_id}")
    end
  end
  schema = schema.map{|s| JSON.parse(s.to_json) }

  if blk
    @api.list_table_data(project_id, dataset_id, table_id, max_results: max_result, start_index: start_index, page_token: page_token) do |r, err|
      if r
        r = format_tabledata(r, schema, convert_numeric_to_float: convert_numeric_to_float)
      end
      blk.call(r, err)
    end
  else
    r = @api.list_table_data(project_id, dataset_id, table_id, max_results: max_result, start_index: start_index, page_token: page_token)
    format_tabledata(r, schema, convert_numeric_to_float: convert_numeric_to_float)
  end
rescue
  process_error($!)
end
load(dataset_id, table_id, source_uris=nil, schema: nil, delimiter: ",", field_delimiter: delimiter, mode: :append, allow_jagged_rows: false, max_bad_records: 0, ignore_unknown_values: false, allow_quoted_newlines: false, quote: '"', skip_leading_rows: 0, source_format: "CSV", autodetect: false, range_partitioning: nil, time_partitioning: nil, project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, file: nil, wait: nil, dry_run: false, **kwrest, &blk) click to toggle source
# File lib/kura/client.rb, line 509
def load(dataset_id, table_id, source_uris=nil,
         schema: nil, delimiter: ",", field_delimiter: delimiter, mode: :append,
         allow_jagged_rows: false, max_bad_records: 0,
         ignore_unknown_values: false,
         allow_quoted_newlines: false,
         quote: '"', skip_leading_rows: 0,
         source_format: "CSV",
         autodetect: false,
         range_partitioning: nil,
         time_partitioning: nil,
         project_id: @default_project_id,
         job_project_id: @default_project_id,
         job_id: nil,
         file: nil, wait: nil,
         dry_run: false,
         **kwrest,
         &blk)
  write_disposition = mode_to_write_disposition(mode)
  source_uris = [source_uris] if source_uris.is_a?(String)
  case range_partitioning
  when Hash
    range_partitioning = Google::Apis::BigqueryV2::RangePartitioning.new(**range_partitioning)
  end
  case time_partitioning
  when Hash
    time_partitioning = Google::Apis::BigqueryV2::TimePartitioning.new(**time_partitioning)
  end
  configuration = Google::Apis::BigqueryV2::JobConfiguration.new(
    load: Google::Apis::BigqueryV2::JobConfigurationLoad.new(
      destination_table: Google::Apis::BigqueryV2::TableReference.new(
        project_id: project_id,
        dataset_id: dataset_id,
        table_id: table_id,
      ),
      write_disposition: write_disposition,
      allow_jagged_rows: normalize_parameter(allow_jagged_rows),
      max_bad_records: max_bad_records,
      ignore_unknown_values: normalize_parameter(ignore_unknown_values),
      source_format: source_format,
      range_partitioning: range_partitioning,
      time_partitioning: time_partitioning,
    )
  )
  if dry_run
    configuration.dry_run = true
    wait = nil
  end
  if schema
    configuration.load.schema = Google::Apis::BigqueryV2::TableSchema.new(fields: normalize_schema(schema))
  end
  if source_format == "CSV"
    configuration.load.field_delimiter = field_delimiter
    configuration.load.allow_quoted_newlines = normalize_parameter(allow_quoted_newlines)
    configuration.load.quote = quote
    configuration.load.skip_leading_rows = skip_leading_rows
    configuration.load.autodetect = autodetect
  elsif source_format == "NEWLINE_DELIMITED_JSON"
    configuration.load.autodetect = autodetect
  end
  unless file
    configuration.load.source_uris = source_uris
  end
  kwrest.each do |kw, opt|
    if configuration.load.respond_to?("#{kw}=")
      configuration.load.__send__("#{kw}=", opt)
    else
      raise ArgumentError, "Unknown keyword argument for Kura::Client#load: #{kw}"
    end
  end
  insert_job(configuration, media: file, wait: wait, job_id: job_id, project_id: job_project_id, &blk)
end
model(dataset_id, model_id, project_id: @default_project_id, &blk) click to toggle source
# File lib/kura/client.rb, line 782
def model(dataset_id, model_id, project_id: @default_project_id, &blk)
  if blk
    @api.get_model(project_id, dataset_id, model_id) do |result, err|
      if err.respond_to?(:status_code) and err.status_code == 404
        result = nil
        err = nil
      end
      blk.call(result, err)
    end
  else
    @api.get_model(project_id, dataset_id, model_id)
  end
rescue
  return nil if $!.respond_to?(:status_code) and $!.status_code == 404
  process_error($!)
end
models(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk) click to toggle source

Models API

# File lib/kura/client.rb, line 769
def models(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk)
  if blk
    @api.list_models(project_id, dataset_id, max_results: limit, page_token: page_token) do |result, err|
      result &&= (result.models || [])
      blk.call(result, err)
    end
  else
    @api.list_models(project_id, dataset_id, max_results: limit, page_token: page_token)
  end
rescue
  process_error($!)
end
normalize_parameter(v) click to toggle source
# File lib/kura/client.rb, line 50
def normalize_parameter(v)
  case v
  when nil
    nil
  else
    v.to_s
  end
end
patch_dataset(dataset_id, project_id: @default_project_id, access: nil, description: :na, default_table_expiration_ms: :na, friendly_name: :na, &blk) click to toggle source
# File lib/kura/client.rb, line 159
def patch_dataset(dataset_id, project_id: @default_project_id, access: nil, description: :na, default_table_expiration_ms: :na, friendly_name: :na, &blk)
  obj = Google::Apis::BigqueryV2::Dataset.new(dataset_reference: Google::Apis::BigqueryV2::DatasetReference.new(project_id: project_id, dataset_id: dataset_id))
  obj.access = access if access
  obj.default_table_expiration_ms = default_table_expiration_ms if default_table_expiration_ms != :na
  obj.description = description if description != :na
  obj.friendly_name = friendly_name if friendly_name != :na
  @api.patch_dataset(project_id, dataset_id, obj, &blk)
rescue
  process_error($!)
end
patch_table(dataset_id, table_id, project_id: @default_project_id, expiration_time: :na, friendly_name: :na, description: :na, &blk) click to toggle source
# File lib/kura/client.rb, line 236
def patch_table(dataset_id, table_id, project_id: @default_project_id, expiration_time: :na, friendly_name: :na, description: :na, &blk)
  if expiration_time != :na and not(expiration_time.nil?)
    expiration_time = (expiration_time.to_f * 1000.0).to_i
  end
  table = Google::Apis::BigqueryV2::Table.new(table_reference: {project_id: project_id, dataset_id: dataset_id, table_id: table_id})
  table.friendly_name = friendly_name if friendly_name != :na
  table.description = description if description != :na
  table.expiration_time = expiration_time if expiration_time != :na
  @api.patch_table(project_id, dataset_id, table_id, table, &blk)
rescue
  process_error($!)
end
projects(limit: 1000, &blk) click to toggle source
# File lib/kura/client.rb, line 90
def projects(limit: 1000, &blk)
  if blk
    @api.list_projects(max_results: limit) do |result, err|
      result &&= result.projects
      blk.call(result, err)
    end
  else
    result = @api.list_projects(max_results: limit)
    result.projects
  end
rescue
  process_error($!)
end
query(sql, mode: :truncate, dataset_id: nil, table_id: nil, allow_large_result: true, allow_large_results: allow_large_result, flatten_results: true, priority: "INTERACTIVE", use_query_cache: true, user_defined_function_resources: nil, use_legacy_sql: false, maximum_billing_tier: nil, maximum_bytes_billed: nil, external_data_configuration: nil, project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, wait: nil, dry_run: false, **kwrest, &blk) click to toggle source
# File lib/kura/client.rb, line 409
def query(sql, mode: :truncate,
          dataset_id: nil, table_id: nil,
          allow_large_result: true, # for backward compatibility
          allow_large_results: allow_large_result,
          flatten_results: true,
          priority: "INTERACTIVE",
          use_query_cache: true,
          user_defined_function_resources: nil,
          use_legacy_sql: false,
          maximum_billing_tier: nil,
          maximum_bytes_billed: nil,
          external_data_configuration: nil,
          project_id: @default_project_id,
          job_project_id: @default_project_id,
          job_id: nil,
          wait: nil,
          dry_run: false,
          **kwrest,
          &blk)
  configuration = Google::Apis::BigqueryV2::JobConfiguration.new(
    query: Google::Apis::BigqueryV2::JobConfigurationQuery.new(
      query: sql,
      allow_large_results: normalize_parameter(allow_large_results),
      flatten_results: normalize_parameter(flatten_results),
      priority: priority,
      use_query_cache: normalize_parameter(use_query_cache),
      use_legacy_sql: use_legacy_sql,
    )
  )
  if mode
    configuration.query.write_disposition = mode_to_write_disposition(mode)
  end
  if dry_run
    configuration.dry_run = true
    wait = nil
  end
  if maximum_billing_tier
    configuration.query.maximum_billing_tier = maximum_billing_tier
  end
  if maximum_bytes_billed
    configuration.query.maximum_bytes_billed = maximum_bytes_billed
  end
  if dataset_id and table_id
    configuration.query.destination_table = Google::Apis::BigqueryV2::TableReference.new(project_id: project_id, dataset_id: dataset_id, table_id: table_id)
  end
  if user_defined_function_resources
    configuration.query.user_defined_function_resources = Array(user_defined_function_resources).map do |r|
      r = r.to_s
      if r.start_with?("gs://")
        Google::Apis::BigqueryV2::UserDefinedFunctionResource.new(resource_uri: r)
      else
        Google::Apis::BigqueryV2::UserDefinedFunctionResource.new(inline_code: r)
      end
    end
  end
  if external_data_configuration
    configuration.query.table_definitions = external_data_configuration
  end
  kwrest.each do |kw, opt|
    if configuration.query.respond_to?("#{kw}=")
      configuration.query.__send__("#{kw}=", opt)
    else
      raise ArgumentError, "Unknown keyword argument for Kura::Client#query: #{kw}"
    end
  end
  insert_job(configuration, wait: wait, job_id: job_id, project_id: job_project_id, &blk)
end
routine(dataset_id, routine_id, project_id: @default_project_id, &blk) click to toggle source
# File lib/kura/client.rb, line 820
def routine(dataset_id, routine_id, project_id: @default_project_id, &blk)
  if blk
    @api.get_routine(project_id, dataset_id, routine_id) do |result, err|
      if err.respond_to?(:status_code) and err.status_code == 404
        result = nil
        err = nil
      end
      blk.call(result, err)
    end
  else
    @api.get_routine(project_id, dataset_id, routine_id)
  end
rescue
  return nil if $!.respond_to?(:status_code) and $!.status_code == 404
  process_error($!)
end
routines(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk) click to toggle source

Routines API

# File lib/kura/client.rb, line 807
def routines(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk)
  if blk
    @api.list_routines(project_id, dataset_id, max_results: limit, page_token: page_token) do |result, err|
      result &&= (result.routines || [])
      blk.call(result, err)
    end
  else
    @api.list_routines(project_id, dataset_id, max_results: limit, page_token: page_token)
  end
rescue
  process_error($!)
end
table(dataset_id, table_id, project_id: @default_project_id, &blk) click to toggle source
# File lib/kura/client.rb, line 184
def table(dataset_id, table_id, project_id: @default_project_id, &blk)
  if blk
    @api.get_table(project_id, dataset_id, table_id) do |result, err|
      if err.respond_to?(:status_code) and err.status_code == 404
        result = nil
        err = nil
      end
      blk.call(result, err)
    end
  else
    @api.get_table(project_id, dataset_id, table_id)
  end
rescue
  return nil if $!.respond_to?(:status_code) and $!.status_code == 404
  process_error($!)
end
tables(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk) click to toggle source
# File lib/kura/client.rb, line 170
def tables(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk)
  if blk
    @api.list_tables(project_id, dataset_id, max_results: limit, page_token: page_token) do |result, err|
      result &&= (result.tables || [])
      blk.call(result, err)
    end
  else
    result = @api.list_tables(project_id, dataset_id, max_results: limit, page_token: page_token)
    result.tables || []
  end
rescue
  process_error($!)
end
wait_job(job, timeout=60*10, location: nil, project_id: @default_project_id) { |j| ... } click to toggle source
# File lib/kura/client.rb, line 743
def wait_job(job, timeout=60*10, location: nil, project_id: @default_project_id)
  case job
  when String
    job_id = job
  when Google::Apis::BigqueryV2::Job
    project_id = job.job_reference.project_id
    job_id = job.job_reference.job_id
    location = job.job_reference.location
  else
    raise TypeError, "Kura::Client#wait_job accept String(job-id) or Google::Apis::BigqueryV2::Job"
  end
  expire = Time.now + timeout
  while expire > Time.now
    j = job(job_id, project_id: project_id, location: location)
    if job_finished?(j)
      return j
    end
    if block_given?
      yield j
    end
    sleep 1
  end
  raise Kura::TimeoutError, "wait job timeout"
end

Private Instance Methods

format_tabledata(r, schema, convert_numeric_to_float: true) click to toggle source
# File lib/kura/client.rb, line 304
def format_tabledata(r, schema, convert_numeric_to_float: true)
  {
    total_rows: r.total_rows.to_i,
    next_token: r.page_token,
    rows: (r.rows || []).map do |row|
      _convert_tabledata_row(row, schema, convert_numeric_to_float: convert_numeric_to_float)
    end
  }
end
mode_to_write_disposition(mode) click to toggle source
# File lib/kura/client.rb, line 371
def mode_to_write_disposition(mode)
  unless %i{ append truncate empty }.include?(mode)
    raise "mode option should be one of :append, :truncate, :empty but #{mode}"
  end
  "WRITE_#{mode.to_s.upcase}"
end
normalize_schema(schema) click to toggle source
# File lib/kura/client.rb, line 477
def normalize_schema(schema)
  schema.map do |s|
    if s.respond_to?(:[])
      f = {
        name: (s[:name] || s["name"]),
        type: (s[:type] || s["type"]),
        mode: (s[:mode] || s["mode"]),
      }
      if (desc = (s[:description] || s["description"]))
        f[:description] = desc
      end
      if (sub_fields = (s[:fields] || s["fields"]))
        f[:fields] = normalize_schema(sub_fields)
      end
    else
      f = {
        name: s.name,
        type: s.type,
        mode: s.mode,
      }
      if s.respond_to?(:description)
        f[:description] = s.description
      end
      if (sub_fields = s.fields)
        f[:fields] = normalize_schema(sub_fields)
      end
    end
    f
  end
end
process_error(err) click to toggle source
# File lib/kura/client.rb, line 59
def process_error(err)
  if err.respond_to?(:body) and err.body
    begin
      jobj = JSON.parse(err.body)
      error = jobj["error"]
      reason = error["errors"].map{|e| e["reason"]}.join(",")
      errors = error["errors"].map{|e| e["message"] }.join("\n")
    rescue JSON::ParserError
      reason = err.status_code.to_s
      errors = "HTTP Status: #{err.status_code}\nHeaders: #{err.header.inspect}\nBody:\n#{err.body}"
    end
    raise Kura::ApiError.new(reason, errors)
  elsif err.is_a?(SystemCallError) or err.is_a?(OpenSSL::SSL::SSLError)
    raise Kura::ApiError.new("internalError", err.message)
  else
    raise err
  end
end