class Bricolage::StreamingLoad::Job::ControlConnection

Constants

JobInfo
MAX_MESSAGE_LENGTH
TaskInfo

Public Class Methods

new(ds, logger = ds.logger) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 278
def initialize(ds, logger = ds.logger)
  @ds = ds
  @connection = nil
end
open(ds, logger = ds.logger, &block) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 274
def ControlConnection.open(ds, logger = ds.logger, &block)
  new(ds, logger).open(&block)
end

Public Instance Methods

abort_job(job_id, status, message) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 422
def abort_job(job_id, status, message)
  write_job_result(job_id, status, message)
end
begin_job(task_id, process_id, force) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 391
        def begin_job(task_id, process_id, force)
          job_id = @connection.query_value(<<-EndSQL)
            insert into strload_jobs
                ( task_id
                , process_id
                , status
                , start_time
                )
            select
                task_id
                , #{s process_id}
                , 'running'
                , current_timestamp
            from
                strload_tasks
            where
                task_id = #{task_id}
                and (#{force ? 'true' : 'false'} or task_id not in (select task_id from strload_jobs where status = 'success'))
            returning job_id
            ;
          EndSQL
          return job_id ? job_id.to_i : nil
        end
commit_duplicated_job(task_id, process_id) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 458
        def commit_duplicated_job(task_id, process_id)
          job_id = @connection.query_value(<<-EndSQL)
            insert into strload_jobs
                ( task_id
                , process_id
                , status
                , start_time
                , finish_time
                , message
                )
            select
                #{task_id}
                , #{s process_id}
                , 'duplicated'
                , current_timestamp
                , current_timestamp
                , ''
            returning job_id
            ;
          EndSQL
          return job_id
        end
commit_job(job_id, message = nil) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 415
def commit_job(job_id, message = nil)
  @connection.transaction {|txn|
    write_job_result job_id, 'success', (message || '')
    update_loaded_flag job_id
  }
end
fix_job_status(job_id, status) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 377
        def fix_job_status(job_id, status)
          @connection.update(<<-EndSQL)
            update
                strload_jobs
            set
                status = #{s status}
                , message = 'status fixed: ' || message
            where
                job_id = #{job_id}
                and status = 'unknown'
            ;
          EndSQL
        end
load_jobs(task_id) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 344
        def load_jobs(task_id)
          records = @connection.query_rows(<<-EndSQL)
            select
                job_id
                , status
            from
                strload_jobs
            where
                task_id = #{task_id}
            order by
                start_time
            ;
          EndSQL
          records.map {|rec| JobInfo.new(rec['job_id'].to_i, rec['status']) }
        end
load_object_urls(task_id) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 362
        def load_object_urls(task_id)
          urls = @connection.query_values(<<-EndSQL)
            select
                o.object_url
            from
                strload_tasks t
                inner join strload_task_objects tob using (task_id)
                inner join strload_objects o using (object_id)
            where
                t.task_id = #{task_id}
            ;
          EndSQL
          urls
        end
load_task(task_id) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 319
        def load_task(task_id)
          rec = @connection.query_row(<<-EndSQL) or raise JobError, "no such task: #{task_id}"
            select
                tsk.task_class
                , tbl.schema_name
                , tbl.table_name
                , tbl.disabled
            from
                strload_tasks tsk
                inner join strload_tables tbl using (table_id)
            where
                tsk.task_id = #{task_id}
            ;
          EndSQL
          TaskInfo.new(
            task_id,
            rec['task_class'],
            rec['schema_name'],
            rec['table_name'],
            (rec['disabled'] != 'f'),
            load_object_urls(task_id),
            load_jobs(task_id)
          )
        end
open() { |self| ... } click to toggle source
# File lib/bricolage/streamingload/job.rb, line 283
def open(&block)
  @ds.open {|conn|
    @connection = conn
    yield self
  }
rescue ConnectionError => ex
  raise ControlConnectionFailed, "control connection failed: #{ex.message}"
end
update_loaded_flag(job_id) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 440
        def update_loaded_flag(job_id)
          @connection.execute(<<-EndSQL)
            update
                strload_objects
            set
                loaded = true
            where
                object_id in (
                  select
                      object_id
                  from
                      strload_task_objects
                  where task_id = (select task_id from strload_jobs where job_id = #{job_id})
                )
            ;
          EndSQL
        end
write_job_result(job_id, status, message) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 428
        def write_job_result(job_id, status, message)
          @connection.execute(<<-EndSQL)
            update
                strload_jobs
            set
                (status, finish_time, message) = (#{s status}, current_timestamp, #{s message[0, MAX_MESSAGE_LENGTH]})
            where
                job_id = #{job_id}
            ;
          EndSQL
        end