class Bricolage::StreamingLoad::Job::ControlConnection
Constants
- JobInfo
- MAX_MESSAGE_LENGTH
- TaskInfo
Public Class Methods
new(ds, logger = ds.logger)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 278 def initialize(ds, logger = ds.logger) @ds = ds @connection = nil end
open(ds, logger = ds.logger, &block)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 274 def ControlConnection.open(ds, logger = ds.logger, &block) new(ds, logger).open(&block) end
Public Instance Methods
abort_job(job_id, status, message)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 422 def abort_job(job_id, status, message) write_job_result(job_id, status, message) end
begin_job(task_id, process_id, force)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 391 def begin_job(task_id, process_id, force) job_id = @connection.query_value(<<-EndSQL) insert into strload_jobs ( task_id , process_id , status , start_time ) select task_id , #{s process_id} , 'running' , current_timestamp from strload_tasks where task_id = #{task_id} and (#{force ? 'true' : 'false'} or task_id not in (select task_id from strload_jobs where status = 'success')) returning job_id ; EndSQL return job_id ? job_id.to_i : nil end
commit_duplicated_job(task_id, process_id)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 458 def commit_duplicated_job(task_id, process_id) job_id = @connection.query_value(<<-EndSQL) insert into strload_jobs ( task_id , process_id , status , start_time , finish_time , message ) select #{task_id} , #{s process_id} , 'duplicated' , current_timestamp , current_timestamp , '' returning job_id ; EndSQL return job_id end
commit_job(job_id, message = nil)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 415 def commit_job(job_id, message = nil) @connection.transaction {|txn| write_job_result job_id, 'success', (message || '') update_loaded_flag job_id } end
fix_job_status(job_id, status)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 377 def fix_job_status(job_id, status) @connection.update(<<-EndSQL) update strload_jobs set status = #{s status} , message = 'status fixed: ' || message where job_id = #{job_id} and status = 'unknown' ; EndSQL end
load_jobs(task_id)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 344 def load_jobs(task_id) records = @connection.query_rows(<<-EndSQL) select job_id , status from strload_jobs where task_id = #{task_id} order by start_time ; EndSQL records.map {|rec| JobInfo.new(rec['job_id'].to_i, rec['status']) } end
load_object_urls(task_id)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 362 def load_object_urls(task_id) urls = @connection.query_values(<<-EndSQL) select o.object_url from strload_tasks t inner join strload_task_objects tob using (task_id) inner join strload_objects o using (object_id) where t.task_id = #{task_id} ; EndSQL urls end
load_task(task_id)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 319 def load_task(task_id) rec = @connection.query_row(<<-EndSQL) or raise JobError, "no such task: #{task_id}" select tsk.task_class , tbl.schema_name , tbl.table_name , tbl.disabled from strload_tasks tsk inner join strload_tables tbl using (table_id) where tsk.task_id = #{task_id} ; EndSQL TaskInfo.new( task_id, rec['task_class'], rec['schema_name'], rec['table_name'], (rec['disabled'] != 'f'), load_object_urls(task_id), load_jobs(task_id) ) end
open() { |self| ... }
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 283 def open(&block) @ds.open {|conn| @connection = conn yield self } rescue ConnectionError => ex raise ControlConnectionFailed, "control connection failed: #{ex.message}" end
update_loaded_flag(job_id)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 440 def update_loaded_flag(job_id) @connection.execute(<<-EndSQL) update strload_objects set loaded = true where object_id in ( select object_id from strload_task_objects where task_id = (select task_id from strload_jobs where job_id = #{job_id}) ) ; EndSQL end
write_job_result(job_id, status, message)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 428 def write_job_result(job_id, status, message) @connection.execute(<<-EndSQL) update strload_jobs set (status, finish_time, message) = (#{s status}, current_timestamp, #{s message[0, MAX_MESSAGE_LENGTH]}) where job_id = #{job_id} ; EndSQL end