class Bricolage::StreamingLoad::Job::DataConnection

Public Class Methods

new(ds, log_table:, logger: ds.logger) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 203
def initialize(ds, log_table:, logger: ds.logger)
  @ds = ds
  @log_table = log_table
  @logger = logger
  @connection = nil
end
open(ds, log_table:, logger: ds.logger, &block) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 199
def DataConnection.open(ds, log_table:, logger: ds.logger, &block)
  new(ds, log_table: log_table, logger: logger).open(&block)
end

Public Instance Methods

execute_copy(dest_table, manifest, options) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 243
        def execute_copy(dest_table, manifest, options)
          @connection.execute(<<-EndSQL.strip.gsub(/\s+/, ' '))
              copy #{dest_table}
              from #{s manifest.url}
              credentials #{s manifest.credential_string}
              manifest
              statupdate false
              compupdate false
              #{options}
              ;
          EndSQL
          @logger.info "load succeeded: #{manifest.url}"
        rescue JobFailure => ex
          if /stl_load_errors/ =~ ex.message
            # We cannot resolve this load error by retry, give up now.
            raise JobError, ex.message
          end
          raise
        end
get_job_status(job_id) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 219
def get_job_status(job_id)
  count = @connection.query_value("select count(*) from #{@log_table} where job_id = #{job_id}")
  count.to_i > 0 ? 'success' : 'failure'
end
load_objects(dest_table, manifest, options, log) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 236
def load_objects(dest_table, manifest, options, log)
  @connection.transaction {|txn|
    execute_copy dest_table, manifest, options
    write_load_log log
  }
end
load_with_work_table(work_table, manifest, options, sql_source, log) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 224
def load_with_work_table(work_table, manifest, options, sql_source, log)
  @connection.transaction {|txn|
    # NOTE: This transaction ends with truncation, this DELETE does nothing
    # from the second time.  So don't worry about DELETE cost here.
    @connection.execute("delete from #{work_table}")
    execute_copy work_table, manifest, options
    @connection.execute sql_source
    write_load_log log
    txn.truncate_and_commit work_table
  }
end
open() { |self| ... } click to toggle source
# File lib/bricolage/streamingload/job.rb, line 210
def open(&block)
  @ds.open {|conn|
    @connection = conn
    yield self
  }
rescue ConnectionError => ex
  raise DataConnectionFailed, "data connection failed: #{ex.message}"
end
write_load_log(log) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 263
def write_load_log(log)
  @connection.execute("insert into #{@log_table} (task_id, job_id, finish_time) values (#{log.task_id}, #{log.job_id}, current_timestamp)")
end