class Kura::Client
Public Class Methods
new(default_project_id: nil, email_address: nil, private_key: nil, scope: nil, http_options: {timeout: 60}, default_retries: 5)
click to toggle source
# File lib/kura/client.rb, line 12 def initialize(default_project_id: nil, email_address: nil, private_key: nil, scope: nil, http_options: {timeout: 60}, default_retries: 5) @default_project_id = default_project_id @scope = ["https://www.googleapis.com/auth/bigquery"] | (scope ? scope.is_a?(Array) ? scope : [scope] : []) @email_address = email_address @private_key = private_key if @email_address and @private_key auth = Signet::OAuth2::Client.new( token_credential_uri: "https://accounts.google.com/o/oauth2/token", audience: "https://accounts.google.com/o/oauth2/token", scope: @scope, issuer: @email_address, signing_key: @private_key) # MEMO: signet-0.6.1 depend on Farady.default_connection Faraday.default_connection.options.timeout = 60 auth.fetch_access_token! else auth = Google::Auth.get_application_default(@scope) auth.fetch_access_token! end Google::Apis::RequestOptions.default.retries = default_retries # to support google-api-client.gem both 0.10.x and 0.11.x # see https://github.com/google/google-api-ruby-client/blob/master/MIGRATING.md if Google::Apis::ClientOptions.default.respond_to?(:open_timeout_sec) Google::Apis::ClientOptions.default.open_timeout_sec = http_options[:timeout] Google::Apis::ClientOptions.default.read_timeout_sec = http_options[:timeout] Google::Apis::ClientOptions.default.send_timeout_sec = http_options[:timeout] else Google::Apis::RequestOptions.default.open_timeout_sec = http_options[:timeout] Google::Apis::RequestOptions.default.timeout_sec = http_options[:timeout] end @api = Google::Apis::BigqueryV2::BigqueryService.new @api.authorization = auth if @default_project_id.nil? @default_project_id = self.projects.first.id end end
Public Instance Methods
_convert_tabledata_field(x, field_info, convert_numeric_to_float: true)
click to toggle source
# File lib/kura/client.rb, line 256 def _convert_tabledata_field(x, field_info, convert_numeric_to_float: true) if x.nil? and (field_info["mode"] == "NULLABLE" or field_info["mode"].nil?) # The tables created by New BigQuery Console could have schema without mode... return nil end case field_info["type"] when "STRING" x.to_s when "INTEGER" Integer(x) when "FLOAT" case x when "Infinity" Float::INFINITY when "-Infinity" -Float::INFINITY when "NaN" Float::NAN else Float(x) end when "BOOLEAN" x.to_s == "true" when "TIMESTAMP" Time.at(Float(x)).utc.iso8601(6) when "RECORD" _convert_tabledata_row(x, field_info["fields"], convert_numeric_to_float: convert_numeric_to_float) when "NUMERIC" if convert_numeric_to_float Float(x) else x end else x end end
_convert_tabledata_row(row, schema, convert_numeric_to_float: true)
click to toggle source
# File lib/kura/client.rb, line 293 def _convert_tabledata_row(row, schema, convert_numeric_to_float: true) (row.respond_to?(:f) ? row.f : row["f"]).zip(schema).each_with_object({}) do |(v, s), tbl| v = JSON.parse(v.to_json) if s["mode"] == "REPEATED" tbl[s["name"]] = v["v"].map{|c| _convert_tabledata_field(c["v"], s, convert_numeric_to_float: convert_numeric_to_float) } else tbl[s["name"]] = _convert_tabledata_field(v["v"], s, convert_numeric_to_float: convert_numeric_to_float) end end end
batch() { || ... }
click to toggle source
# File lib/kura/client.rb, line 79 def batch @api.batch do |api| original_api, @api = @api, api begin yield ensure @api = original_api end end end
cancel_job(job, location: nil, project_id: @default_project_id, &blk)
click to toggle source
# File lib/kura/client.rb, line 698 def cancel_job(job, location: nil, project_id: @default_project_id, &blk) case job when String jobid = job when Google::Apis::BigqueryV2::Job project_id = job.job_reference.project_id jobid = job.job_reference.job_id else raise TypeError, "Kura::Client#cancel_job accept String(job-id) or Google::Apis::BigqueryV2::Job" end if blk @api.cancel_job(project_id, jobid, location: location) do |r, e| j = (r && r.job) j.kura_api = self if j blk.call(j, e) end else @api.cancel_job(project_id, jobid, location: location).job.tap{|j| j.kura_api = self if j } end end
copy(src_dataset_id, src_table_id, dest_dataset_id, dest_table_id, mode: :truncate, src_project_id: @default_project_id, dest_project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, wait: nil, dry_run: false, **kwrest, &blk)
click to toggle source
# File lib/kura/client.rb, line 624 def copy(src_dataset_id, src_table_id, dest_dataset_id, dest_table_id, mode: :truncate, src_project_id: @default_project_id, dest_project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, wait: nil, dry_run: false, **kwrest, &blk) write_disposition = mode_to_write_disposition(mode) configuration = Google::Apis::BigqueryV2::JobConfiguration.new( copy: Google::Apis::BigqueryV2::JobConfigurationTableCopy.new( destination_table: Google::Apis::BigqueryV2::TableReference.new( project_id: dest_project_id, dataset_id: dest_dataset_id, table_id: dest_table_id, ), source_table: Google::Apis::BigqueryV2::TableReference.new( project_id: src_project_id, dataset_id: src_dataset_id, table_id: src_table_id, ), write_disposition: write_disposition, ) ) if dry_run configuration.dry_run = true wait = nil end kwrest.each do |kw, opt| if configuration.copy.respond_to?("#{kw}=") configuration.copy.__send__("#{kw}=", opt) else raise ArgumentError, "Unknown keyword argument for Kura::Client#copy: #{kw}" end end insert_job(configuration, wait: wait, job_id: job_id, project_id: job_project_id, &blk) end
dataset(dataset_id, project_id: @default_project_id, &blk)
click to toggle source
# File lib/kura/client.rb, line 119 def dataset(dataset_id, project_id: @default_project_id, &blk) if blk @api.get_dataset(project_id, dataset_id) do |result, err| if err.respond_to?(:status_code) and err.status_code == 404 result = nil err = nil end blk.call(result, err) end else @api.get_dataset(project_id, dataset_id) end rescue return nil if $!.respond_to?(:status_code) and $!.status_code == 404 process_error($!) end
datasets(project_id: @default_project_id, all: false, limit: 1000, &blk)
click to toggle source
# File lib/kura/client.rb, line 104 def datasets(project_id: @default_project_id, all: false, limit: 1000, &blk) all = normalize_parameter(all) if blk @api.list_datasets(project_id, all: all, max_results: limit) do |result, err| result &&= (result.datasets || []) blk.call(result, err) end else result = @api.list_datasets(project_id, all: all, max_results: limit) result.datasets || [] end rescue process_error($!) end
delete_dataset(dataset_id, project_id: @default_project_id, delete_contents: false, &blk)
click to toggle source
# File lib/kura/client.rb, line 151 def delete_dataset(dataset_id, project_id: @default_project_id, delete_contents: false, &blk) delete_contents = normalize_parameter(delete_contents) @api.delete_dataset(project_id, dataset_id, delete_contents: delete_contents, &blk) rescue return nil if $!.respond_to?(:status_code) and $!.status_code == 404 process_error($!) end
delete_model(dataset_id, model_id, project_id: @default_project_id, &blk)
click to toggle source
# File lib/kura/client.rb, line 799 def delete_model(dataset_id, model_id, project_id: @default_project_id, &blk) @api.delete_model(project_id, dataset_id, model_id, &blk) rescue return nil if $!.respond_to?(:status_code) and $!.status_code == 404 process_error($!) end
delete_routine(dataset_id, routine_id, project_id: @default_project_id, &blk)
click to toggle source
# File lib/kura/client.rb, line 837 def delete_routine(dataset_id, routine_id, project_id: @default_project_id, &blk) @api.delete_routine(project_id, dataset_id, routine_id, &blk) rescue return nil if $!.respond_to?(:status_code) and $!.status_code == 404 process_error($!) end
delete_table(dataset_id, table_id, project_id: @default_project_id, &blk)
click to toggle source
# File lib/kura/client.rb, line 249 def delete_table(dataset_id, table_id, project_id: @default_project_id, &blk) @api.delete_table(project_id, dataset_id, table_id, &blk) rescue return nil if $!.respond_to?(:status_code) and $!.status_code == 404 process_error($!) end
extract(dataset_id, table_id, dest_uris, compression: "NONE", destination_format: "CSV", field_delimiter: ",", print_header: true, project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, wait: nil, dry_run: false, **kwrest, &blk)
click to toggle source
# File lib/kura/client.rb, line 581 def extract(dataset_id, table_id, dest_uris, compression: "NONE", destination_format: "CSV", field_delimiter: ",", print_header: true, project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, wait: nil, dry_run: false, **kwrest, &blk) dest_uris = [ dest_uris ] if dest_uris.is_a?(String) configuration = Google::Apis::BigqueryV2::JobConfiguration.new( extract: Google::Apis::BigqueryV2::JobConfigurationExtract.new( compression: compression, destination_format: destination_format, source_table: Google::Apis::BigqueryV2::TableReference.new( project_id: project_id, dataset_id: dataset_id, table_id: table_id, ), destination_uris: dest_uris, ) ) if dry_run configuration.dry_run = true wait = nil end if destination_format == "CSV" configuration.extract.field_delimiter = field_delimiter configuration.extract.print_header = normalize_parameter(print_header) end kwrest.each do |kw, opt| if configuration.extract.respond_to?("#{kw}=") configuration.extract.__send__("#{kw}=", opt) else raise ArgumentError, "Unknown keyword argument for Kura::Client#extract: #{kw}" end end insert_job(configuration, wait: wait, job_id: job_id, project_id: job_project_id, &blk) end
insert_dataset(dataset_id, project_id: @default_project_id, &blk)
click to toggle source
# File lib/kura/client.rb, line 136 def insert_dataset(dataset_id, project_id: @default_project_id, &blk) case dataset_id when String obj = Google::Apis::BigqueryV2::Dataset.new(dataset_reference: Google::Apis::BigqueryV2::DatasetReference.new(project_id: project_id, dataset_id: dataset_id)) when Hash obj = Google::Apis::BigqueryV2::Dataset.new(**dataset_id) when Google::Apis::BigqueryV2::Dataset obj = dataset_id end @api.insert_dataset(project_id, obj, &blk) rescue process_error($!) end
insert_job(configuration, job_id: nil, project_id: @default_project_id, media: nil, wait: nil, &blk)
click to toggle source
# File lib/kura/client.rb, line 379 def insert_job(configuration, job_id: nil, project_id: @default_project_id, media: nil, wait: nil, &blk) job_object = Google::Apis::BigqueryV2::Job.new job_object.configuration = configuration if job_id job_object.job_reference = Google::Apis::BigqueryV2::JobReference.new job_object.job_reference.project_id = project_id job_object.job_reference.job_id = job_id end if wait job = @api.insert_job(project_id, job_object, upload_source: media) job.kura_api = self wait_job(job, wait, &blk) else if blk @api.insert_job(project_id, job_object, upload_source: media) do |r, err| if r r.kura_api = self end blk.call(r, err) end else job = @api.insert_job(project_id, job_object, upload_source: media) job.kura_api = self job end end rescue process_error($!) end
insert_routine(dataset_id, routine_id, body, project_id: @default_project_id, routine_type: "PROCEDURE", language: "SQL", arguments: [], return_type: nil, imported_libraries: [], description: nil)
click to toggle source
# File lib/kura/client.rb, line 844 def insert_routine(dataset_id, routine_id, body, project_id: @default_project_id, routine_type: "PROCEDURE", language: "SQL", arguments: [], return_type: nil, imported_libraries: [], description: nil) @api.insert_routine( project_id, dataset_id, Google::Apis::BigqueryV2::Routine.new( routine_reference: Google::Apis::BigqueryV2::RoutineReference.new( project_id: project_id, dataset_id: dataset_id, routine_id: routine_id ), arguments: arguments, definition_body: body, imported_libraries: imported_libraries, language: language, return_type: return_type, routine_type: routine_type, description: description )) end
insert_table(dataset_id, table_id, project_id: @default_project_id, expiration_time: nil, friendly_name: nil, schema: nil, description: nil, query: nil, external_data_configuration: nil, use_legacy_sql: false, time_partitioning: nil, clustering_fields: [], &blk)
click to toggle source
# File lib/kura/client.rb, line 201 def insert_table(dataset_id, table_id, project_id: @default_project_id, expiration_time: nil, friendly_name: nil, schema: nil, description: nil, query: nil, external_data_configuration: nil, use_legacy_sql: false, time_partitioning: nil, clustering_fields: [], &blk) if expiration_time expiration_time = (expiration_time.to_f * 1000.0).to_i end if query view = { query: query, use_legacy_sql: !!use_legacy_sql } elsif external_data_configuration elsif schema schema = { fields: normalize_schema(schema) } end table = Google::Apis::BigqueryV2::Table.new( table_reference: {project_id: project_id, dataset_id: dataset_id, table_id: table_id}, friendly_name: friendly_name, description: description, schema: schema, expiration_time: expiration_time, view: view, external_data_configuration: external_data_configuration) if time_partitioning table.time_partitioning = Google::Apis::BigqueryV2::TimePartitioning.new(**time_partitioning) end if clustering_fields and clustering_fields.size > 0 table.clustering = Google::Apis::BigqueryV2::Clustering.new(fields: clustering_fields) end @api.insert_table(project_id, dataset_id, table, &blk) rescue process_error($!) end
insert_tabledata(dataset_id, table_id, rows, project_id: @default_project_id, ignore_unknown_values: false, skip_invalid_rows: false, template_suffix: nil)
click to toggle source
# File lib/kura/client.rb, line 341 def insert_tabledata(dataset_id, table_id, rows, project_id: @default_project_id, ignore_unknown_values: false, skip_invalid_rows: false, template_suffix: nil) request = Google::Apis::BigqueryV2::InsertAllTableDataRequest.new request.ignore_unknown_values = ignore_unknown_values request.skip_invalid_rows = skip_invalid_rows if template_suffix request.template_suffix = template_suffix end request.rows = rows.map do |r| case r when Google::Apis::BigqueryV2::InsertAllTableDataRequest::Row r when Hash row = Google::Apis::BigqueryV2::InsertAllTableDataRequest::Row.new if r.keys.map(&:to_s) == %w{ insert_id json } row.insert_id = r[:insert_id] || r["insert_id"] row.json = r[:json] || r["json"] else row.json = r end row else raise ArgumentError, "invalid row for BigQuery tabledata.insertAll #{r.inspect}" end end @api.insert_all_table_data(project_id, dataset_id, table_id, request) rescue process_error($!) end
job(job_id, location: nil, project_id: @default_project_id, fields: nil, &blk)
click to toggle source
# File lib/kura/client.rb, line 685 def job(job_id, location: nil, project_id: @default_project_id, fields: nil, &blk) if blk @api.get_job(project_id, job_id, location: location, fields: fields) do |j, e| j.kura_api = self if j blk.call(j, e) end else @api.get_job(project_id, job_id, location: location, fields: fields).tap{|j| j.kura_api = self if j } end rescue process_error($!) end
job_finished?(r)
click to toggle source
# File lib/kura/client.rb, line 719 def job_finished?(r) if r.status.state == "DONE" if r.status.error_result if r.status.errors raise Kura::ApiError.new(r.status.errors.map(&:reason).join(","), r.status.errors.map{|e| msg = "reason=#{e.reason} message=#{e.message}" msg += " location=#{e.location}" if e.location msg += " debug_info=#{e.debug_info}" if e.debug_info msg }.join("\n")) else e = r.status.error_result msg = "reason=#{e.reason} message=#{e.message}" msg += " location=#{e.location}" if e.location msg += " debug_info=#{e.debug_info}" if e.debug_info raise Kura::ApiError.new(e.reason, msg) end end return true end return false end
jobs(project_id: @default_project_id, all_users: nil, max_creation_time: nil, min_creation_time: nil, max_results: nil, page_token: nil, parent_job_id: nil, projection: nil, state_filter: nil)
click to toggle source
# File lib/kura/client.rb, line 664 def jobs(project_id: @default_project_id, all_users: nil, max_creation_time: nil, min_creation_time: nil, max_results: nil, page_token: nil, parent_job_id: nil, projection: nil, state_filter: nil) @api.list_jobs( project_id, all_users: all_users, max_creation_time: max_creation_time, min_creation_time: min_creation_time, max_results: max_results, page_token: page_token, parent_job_id: parent_job_id, projection: projection, state_filter: state_filter) end
list_tabledata(dataset_id, table_id, project_id: @default_project_id, start_index: 0, max_result: 100, page_token: nil, schema: nil, convert_numeric_to_float: true, &blk)
click to toggle source
# File lib/kura/client.rb, line 315 def list_tabledata(dataset_id, table_id, project_id: @default_project_id, start_index: 0, max_result: 100, page_token: nil, schema: nil, convert_numeric_to_float: true, &blk) if schema.nil? _t = table(dataset_id, table_id, project_id: project_id) if _t schema = table(dataset_id, table_id, project_id: project_id).schema.fields else raise Kura::ApiError.new("notFound", "Not found: Table #{project_id}:#{dataset_id}.#{table_id}") end end schema = schema.map{|s| JSON.parse(s.to_json) } if blk @api.list_table_data(project_id, dataset_id, table_id, max_results: max_result, start_index: start_index, page_token: page_token) do |r, err| if r r = format_tabledata(r, schema, convert_numeric_to_float: convert_numeric_to_float) end blk.call(r, err) end else r = @api.list_table_data(project_id, dataset_id, table_id, max_results: max_result, start_index: start_index, page_token: page_token) format_tabledata(r, schema, convert_numeric_to_float: convert_numeric_to_float) end rescue process_error($!) end
load(dataset_id, table_id, source_uris=nil, schema: nil, delimiter: ",", field_delimiter: delimiter, mode: :append, allow_jagged_rows: false, max_bad_records: 0, ignore_unknown_values: false, allow_quoted_newlines: false, quote: '"', skip_leading_rows: 0, source_format: "CSV", autodetect: false, range_partitioning: nil, time_partitioning: nil, project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, file: nil, wait: nil, dry_run: false, **kwrest, &blk)
click to toggle source
# File lib/kura/client.rb, line 509 def load(dataset_id, table_id, source_uris=nil, schema: nil, delimiter: ",", field_delimiter: delimiter, mode: :append, allow_jagged_rows: false, max_bad_records: 0, ignore_unknown_values: false, allow_quoted_newlines: false, quote: '"', skip_leading_rows: 0, source_format: "CSV", autodetect: false, range_partitioning: nil, time_partitioning: nil, project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, file: nil, wait: nil, dry_run: false, **kwrest, &blk) write_disposition = mode_to_write_disposition(mode) source_uris = [source_uris] if source_uris.is_a?(String) case range_partitioning when Hash range_partitioning = Google::Apis::BigqueryV2::RangePartitioning.new(**range_partitioning) end case time_partitioning when Hash time_partitioning = Google::Apis::BigqueryV2::TimePartitioning.new(**time_partitioning) end configuration = Google::Apis::BigqueryV2::JobConfiguration.new( load: Google::Apis::BigqueryV2::JobConfigurationLoad.new( destination_table: Google::Apis::BigqueryV2::TableReference.new( project_id: project_id, dataset_id: dataset_id, table_id: table_id, ), write_disposition: write_disposition, allow_jagged_rows: normalize_parameter(allow_jagged_rows), max_bad_records: max_bad_records, ignore_unknown_values: normalize_parameter(ignore_unknown_values), source_format: source_format, range_partitioning: range_partitioning, time_partitioning: time_partitioning, ) ) if dry_run configuration.dry_run = true wait = nil end if schema configuration.load.schema = Google::Apis::BigqueryV2::TableSchema.new(fields: normalize_schema(schema)) end if source_format == "CSV" configuration.load.field_delimiter = field_delimiter configuration.load.allow_quoted_newlines = normalize_parameter(allow_quoted_newlines) configuration.load.quote = quote configuration.load.skip_leading_rows = skip_leading_rows configuration.load.autodetect = autodetect elsif source_format == "NEWLINE_DELIMITED_JSON" configuration.load.autodetect = autodetect end unless file configuration.load.source_uris = source_uris end kwrest.each do |kw, opt| if configuration.load.respond_to?("#{kw}=") configuration.load.__send__("#{kw}=", opt) else raise ArgumentError, "Unknown keyword argument for Kura::Client#load: #{kw}" end end insert_job(configuration, media: file, wait: wait, job_id: job_id, project_id: job_project_id, &blk) end
model(dataset_id, model_id, project_id: @default_project_id, &blk)
click to toggle source
# File lib/kura/client.rb, line 782 def model(dataset_id, model_id, project_id: @default_project_id, &blk) if blk @api.get_model(project_id, dataset_id, model_id) do |result, err| if err.respond_to?(:status_code) and err.status_code == 404 result = nil err = nil end blk.call(result, err) end else @api.get_model(project_id, dataset_id, model_id) end rescue return nil if $!.respond_to?(:status_code) and $!.status_code == 404 process_error($!) end
models(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk)
click to toggle source
Models API
# File lib/kura/client.rb, line 769 def models(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk) if blk @api.list_models(project_id, dataset_id, max_results: limit, page_token: page_token) do |result, err| result &&= (result.models || []) blk.call(result, err) end else @api.list_models(project_id, dataset_id, max_results: limit, page_token: page_token) end rescue process_error($!) end
normalize_parameter(v)
click to toggle source
# File lib/kura/client.rb, line 50 def normalize_parameter(v) case v when nil nil else v.to_s end end
patch_dataset(dataset_id, project_id: @default_project_id, access: nil, description: :na, default_table_expiration_ms: :na, friendly_name: :na, &blk)
click to toggle source
# File lib/kura/client.rb, line 159 def patch_dataset(dataset_id, project_id: @default_project_id, access: nil, description: :na, default_table_expiration_ms: :na, friendly_name: :na, &blk) obj = Google::Apis::BigqueryV2::Dataset.new(dataset_reference: Google::Apis::BigqueryV2::DatasetReference.new(project_id: project_id, dataset_id: dataset_id)) obj.access = access if access obj.default_table_expiration_ms = default_table_expiration_ms if default_table_expiration_ms != :na obj.description = description if description != :na obj.friendly_name = friendly_name if friendly_name != :na @api.patch_dataset(project_id, dataset_id, obj, &blk) rescue process_error($!) end
patch_table(dataset_id, table_id, project_id: @default_project_id, expiration_time: :na, friendly_name: :na, description: :na, &blk)
click to toggle source
# File lib/kura/client.rb, line 236 def patch_table(dataset_id, table_id, project_id: @default_project_id, expiration_time: :na, friendly_name: :na, description: :na, &blk) if expiration_time != :na and not(expiration_time.nil?) expiration_time = (expiration_time.to_f * 1000.0).to_i end table = Google::Apis::BigqueryV2::Table.new(table_reference: {project_id: project_id, dataset_id: dataset_id, table_id: table_id}) table.friendly_name = friendly_name if friendly_name != :na table.description = description if description != :na table.expiration_time = expiration_time if expiration_time != :na @api.patch_table(project_id, dataset_id, table_id, table, &blk) rescue process_error($!) end
projects(limit: 1000, &blk)
click to toggle source
# File lib/kura/client.rb, line 90 def projects(limit: 1000, &blk) if blk @api.list_projects(max_results: limit) do |result, err| result &&= result.projects blk.call(result, err) end else result = @api.list_projects(max_results: limit) result.projects end rescue process_error($!) end
query(sql, mode: :truncate, dataset_id: nil, table_id: nil, allow_large_result: true, allow_large_results: allow_large_result, flatten_results: true, priority: "INTERACTIVE", use_query_cache: true, user_defined_function_resources: nil, use_legacy_sql: false, maximum_billing_tier: nil, maximum_bytes_billed: nil, external_data_configuration: nil, project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, wait: nil, dry_run: false, **kwrest, &blk)
click to toggle source
# File lib/kura/client.rb, line 409 def query(sql, mode: :truncate, dataset_id: nil, table_id: nil, allow_large_result: true, # for backward compatibility allow_large_results: allow_large_result, flatten_results: true, priority: "INTERACTIVE", use_query_cache: true, user_defined_function_resources: nil, use_legacy_sql: false, maximum_billing_tier: nil, maximum_bytes_billed: nil, external_data_configuration: nil, project_id: @default_project_id, job_project_id: @default_project_id, job_id: nil, wait: nil, dry_run: false, **kwrest, &blk) configuration = Google::Apis::BigqueryV2::JobConfiguration.new( query: Google::Apis::BigqueryV2::JobConfigurationQuery.new( query: sql, allow_large_results: normalize_parameter(allow_large_results), flatten_results: normalize_parameter(flatten_results), priority: priority, use_query_cache: normalize_parameter(use_query_cache), use_legacy_sql: use_legacy_sql, ) ) if mode configuration.query.write_disposition = mode_to_write_disposition(mode) end if dry_run configuration.dry_run = true wait = nil end if maximum_billing_tier configuration.query.maximum_billing_tier = maximum_billing_tier end if maximum_bytes_billed configuration.query.maximum_bytes_billed = maximum_bytes_billed end if dataset_id and table_id configuration.query.destination_table = Google::Apis::BigqueryV2::TableReference.new(project_id: project_id, dataset_id: dataset_id, table_id: table_id) end if user_defined_function_resources configuration.query.user_defined_function_resources = Array(user_defined_function_resources).map do |r| r = r.to_s if r.start_with?("gs://") Google::Apis::BigqueryV2::UserDefinedFunctionResource.new(resource_uri: r) else Google::Apis::BigqueryV2::UserDefinedFunctionResource.new(inline_code: r) end end end if external_data_configuration configuration.query.table_definitions = external_data_configuration end kwrest.each do |kw, opt| if configuration.query.respond_to?("#{kw}=") configuration.query.__send__("#{kw}=", opt) else raise ArgumentError, "Unknown keyword argument for Kura::Client#query: #{kw}" end end insert_job(configuration, wait: wait, job_id: job_id, project_id: job_project_id, &blk) end
routine(dataset_id, routine_id, project_id: @default_project_id, &blk)
click to toggle source
# File lib/kura/client.rb, line 820 def routine(dataset_id, routine_id, project_id: @default_project_id, &blk) if blk @api.get_routine(project_id, dataset_id, routine_id) do |result, err| if err.respond_to?(:status_code) and err.status_code == 404 result = nil err = nil end blk.call(result, err) end else @api.get_routine(project_id, dataset_id, routine_id) end rescue return nil if $!.respond_to?(:status_code) and $!.status_code == 404 process_error($!) end
routines(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk)
click to toggle source
Routines API
# File lib/kura/client.rb, line 807 def routines(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk) if blk @api.list_routines(project_id, dataset_id, max_results: limit, page_token: page_token) do |result, err| result &&= (result.routines || []) blk.call(result, err) end else @api.list_routines(project_id, dataset_id, max_results: limit, page_token: page_token) end rescue process_error($!) end
table(dataset_id, table_id, project_id: @default_project_id, &blk)
click to toggle source
# File lib/kura/client.rb, line 184 def table(dataset_id, table_id, project_id: @default_project_id, &blk) if blk @api.get_table(project_id, dataset_id, table_id) do |result, err| if err.respond_to?(:status_code) and err.status_code == 404 result = nil err = nil end blk.call(result, err) end else @api.get_table(project_id, dataset_id, table_id) end rescue return nil if $!.respond_to?(:status_code) and $!.status_code == 404 process_error($!) end
tables(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk)
click to toggle source
# File lib/kura/client.rb, line 170 def tables(dataset_id, project_id: @default_project_id, limit: 1000, page_token: nil, &blk) if blk @api.list_tables(project_id, dataset_id, max_results: limit, page_token: page_token) do |result, err| result &&= (result.tables || []) blk.call(result, err) end else result = @api.list_tables(project_id, dataset_id, max_results: limit, page_token: page_token) result.tables || [] end rescue process_error($!) end
wait_job(job, timeout=60*10, location: nil, project_id: @default_project_id) { |j| ... }
click to toggle source
# File lib/kura/client.rb, line 743 def wait_job(job, timeout=60*10, location: nil, project_id: @default_project_id) case job when String job_id = job when Google::Apis::BigqueryV2::Job project_id = job.job_reference.project_id job_id = job.job_reference.job_id location = job.job_reference.location else raise TypeError, "Kura::Client#wait_job accept String(job-id) or Google::Apis::BigqueryV2::Job" end expire = Time.now + timeout while expire > Time.now j = job(job_id, project_id: project_id, location: location) if job_finished?(j) return j end if block_given? yield j end sleep 1 end raise Kura::TimeoutError, "wait job timeout" end
Private Instance Methods
format_tabledata(r, schema, convert_numeric_to_float: true)
click to toggle source
# File lib/kura/client.rb, line 304 def format_tabledata(r, schema, convert_numeric_to_float: true) { total_rows: r.total_rows.to_i, next_token: r.page_token, rows: (r.rows || []).map do |row| _convert_tabledata_row(row, schema, convert_numeric_to_float: convert_numeric_to_float) end } end
mode_to_write_disposition(mode)
click to toggle source
# File lib/kura/client.rb, line 371 def mode_to_write_disposition(mode) unless %i{ append truncate empty }.include?(mode) raise "mode option should be one of :append, :truncate, :empty but #{mode}" end "WRITE_#{mode.to_s.upcase}" end
normalize_schema(schema)
click to toggle source
# File lib/kura/client.rb, line 477 def normalize_schema(schema) schema.map do |s| if s.respond_to?(:[]) f = { name: (s[:name] || s["name"]), type: (s[:type] || s["type"]), mode: (s[:mode] || s["mode"]), } if (desc = (s[:description] || s["description"])) f[:description] = desc end if (sub_fields = (s[:fields] || s["fields"])) f[:fields] = normalize_schema(sub_fields) end else f = { name: s.name, type: s.type, mode: s.mode, } if s.respond_to?(:description) f[:description] = s.description end if (sub_fields = s.fields) f[:fields] = normalize_schema(sub_fields) end end f end end
process_error(err)
click to toggle source
# File lib/kura/client.rb, line 59 def process_error(err) if err.respond_to?(:body) and err.body begin jobj = JSON.parse(err.body) error = jobj["error"] reason = error["errors"].map{|e| e["reason"]}.join(",") errors = error["errors"].map{|e| e["message"] }.join("\n") rescue JSON::ParserError reason = err.status_code.to_s errors = "HTTP Status: #{err.status_code}\nHeaders: #{err.header.inspect}\nBody:\n#{err.body}" end raise Kura::ApiError.new(reason, errors) elsif err.is_a?(SystemCallError) or err.is_a?(OpenSSL::SSL::SSLError) raise Kura::ApiError.new("internalError", err.message) else raise err end end