class Bricolage::StreamingLoad::Job::DataConnection
Public Class Methods
new(ds, log_table:, logger: ds.logger)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 203 def initialize(ds, log_table:, logger: ds.logger) @ds = ds @log_table = log_table @logger = logger @connection = nil end
open(ds, log_table:, logger: ds.logger, &block)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 199 def DataConnection.open(ds, log_table:, logger: ds.logger, &block) new(ds, log_table: log_table, logger: logger).open(&block) end
Public Instance Methods
execute_copy(dest_table, manifest, options)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 243 def execute_copy(dest_table, manifest, options) @connection.execute(<<-EndSQL.strip.gsub(/\s+/, ' ')) copy #{dest_table} from #{s manifest.url} credentials #{s manifest.credential_string} manifest statupdate false compupdate false #{options} ; EndSQL @logger.info "load succeeded: #{manifest.url}" rescue JobFailure => ex if /stl_load_errors/ =~ ex.message # We cannot resolve this load error by retry, give up now. raise JobError, ex.message end raise end
get_job_status(job_id)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 219 def get_job_status(job_id) count = @connection.query_value("select count(*) from #{@log_table} where job_id = #{job_id}") count.to_i > 0 ? 'success' : 'failure' end
load_objects(dest_table, manifest, options, log)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 236 def load_objects(dest_table, manifest, options, log) @connection.transaction {|txn| execute_copy dest_table, manifest, options write_load_log log } end
load_with_work_table(work_table, manifest, options, sql_source, log)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 224 def load_with_work_table(work_table, manifest, options, sql_source, log) @connection.transaction {|txn| # NOTE: This transaction ends with truncation, this DELETE does nothing # from the second time. So don't worry about DELETE cost here. @connection.execute("delete from #{work_table}") execute_copy work_table, manifest, options @connection.execute sql_source write_load_log log txn.truncate_and_commit work_table } end
open() { |self| ... }
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 210 def open(&block) @ds.open {|conn| @connection = conn yield self } rescue ConnectionError => ex raise DataConnectionFailed, "data connection failed: #{ex.message}" end
write_load_log(log)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 263 def write_load_log(log) @connection.execute("insert into #{@log_table} (task_id, job_id, finish_time) values (#{log.task_id}, #{log.job_id}, current_timestamp)") end