class Bricolage::StreamingLoad::Job
Constants
- LoadLog
- MAX_RETRY
Attributes
data_ds[R]
job_id[R]
For tests
manifest[R]
process_id[R]
task[R]
Public Class Methods
new(context:, ctl_ds:, data_ds:, log_table: 'strload_load_logs', task_id:, force: false, logger:)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 22 def initialize(context:, ctl_ds:, data_ds:, log_table: 'strload_load_logs', task_id:, force: false, logger:) @context = context @ctl_ds = ctl_ds @data_ds = data_ds @log_table = log_table @task_id = task_id @force = force @logger = logger @task = nil @job_id = nil @manifest = nil end
Public Instance Methods
do_load(task, job_id)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 152 def do_load(task, job_id) params = JobParams.load(@context, task.task_class, task.schema_name, task.table_name) @data_ds = params.ds @manifest = ManifestFile.create(ds: params.ctl_bucket, job_id: job_id, object_urls: task.object_urls, logger: @logger) log = LoadLog.new log.task_id = @task_id log.job_id = job_id DataConnection.open(params.ds, log_table: @log_table, logger: @logger) {|data| if params.enable_work_table? data.load_with_work_table params.work_table, @manifest, params.load_options_string, params.sql_source, log else data.load_objects params.dest_table, @manifest, params.load_options_string, log end } end
execute(fail_fast: false)
click to toggle source
Returns true -> Deletes a SQS message Returns false -> Keeps a SQS message
# File lib/bricolage/streamingload/job.rb, line 45 def execute(fail_fast: false) execute_task return true rescue JobCancelled => ex Raven.capture_exception(ex) return true rescue JobDuplicated return true rescue JobDefered return false rescue ControlConnectionFailed => ex @logger.error ex.message Raven.capture_exception(ex) wait_for_connection('ctl', @ctl_ds) unless fail_fast return false rescue DataConnectionFailed => ex @logger.error ex.message Raven.capture_exception(ex) wait_for_connection('data', @data_ds) unless fail_fast return false rescue JobFailure => ex @logger.error ex.message Raven.capture_exception(ex) return false rescue JobError => ex @logger.error ex.message Raven.capture_exception(ex) return false rescue Exception => ex @logger.exception ex Raven.capture_exception(ex) return true end
execute_task()
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 81 def execute_task @process_id = "#{Socket.gethostname}-#{$$}" @logger.info "execute task: task_id=#{@task_id} force=#{@force} process_id=#{@process_id}" ctl = ControlConnection.new(@ctl_ds, @logger) ctl.open { @task = ctl.load_task(@task_id) @logger.info "task details: task_id=#{@task_id} table=#{@task.schema_name}.#{@task.table_name}" if @task.disabled # We do not record disabled job in the DB. @logger.info "task is disabled; defer task: task_id=#{@task_id}" raise JobDefered, "defered: task_id=#{@task_id}" end if @task.unknown_state? true_status = DataConnection.open(@data_ds, log_table: @log_table, logger: @logger) {|data| data.get_job_status(@task.last_job_id) } @logger.info "fixiating unknown job status: job_id=#{@task.last_job_id}, status=(unknown->#{true_status})" @task.fix_last_job_status true_status ctl.fix_job_status @task.last_job_id, true_status @logger.info "job status fixed." end @job_id = ctl.begin_job(@task_id, @process_id, @force) unless @job_id @logger.warn "task is already succeeded and not forced; discard task: task_id=#{@task_id}" ctl.commit_duplicated_job @task_id, @process_id raise JobDuplicated, "duplicated: task_id=#{@task_id}" end } begin do_load @task, @job_id ctl.open { ctl.commit_job @job_id, (@force ? 'forced' : nil) } rescue ControlConnectionFailed raise rescue DataConnectionFailed => ex ctl.open { ctl.abort_job job_id, 'unknown', ex.message.lines.first.strip } raise rescue JobFailure => ex ctl.open { fail_count = @task.failure_count final_retry = (fail_count >= MAX_RETRY) retry_msg = (fail_count > 0) ? "(retry\##{fail_count}#{final_retry ? ' FINAL' : ''}) " : '' ctl.abort_job job_id, 'failure', retry_msg + ex.message.lines.first.strip raise JobCancelled, "retry count exceeds limit: task_id=#{@task_id}" if final_retry } raise rescue JobError => ex ctl.open { fail_count = @task.failure_count final_retry = (fail_count >= MAX_RETRY) retry_msg = (fail_count > 0) ? "(retry\##{fail_count}#{final_retry ? ' FINAL' : ''}) " : '' ctl.abort_job job_id, 'error', retry_msg + ex.message.lines.first.strip raise JobCancelled, "retry count exceeds limit: task_id=#{@task_id}" if final_retry } raise rescue Exception => ex @logger.exception ex ctl.open { ctl.abort_job job_id, 'error', ex.message.lines.first.strip } raise JobError, "#{ex.class}: #{ex.message}" end end
wait_for_connection(type, ds)
click to toggle source
# File lib/bricolage/streamingload/job.rb, line 168 def wait_for_connection(type, ds) @logger.warn "loader: #{type} DB connection lost; polling..." start_time = Time.now n = 0 while true begin ds.open {} @logger.warn "loader: #{type} DB connection recovered; return to normal state" return true rescue ConnectionError end sleep 15 n += 1 if n == 120 # 30 min # Could not get a connection in 30 minutes, now we return to the queue loop. # Next job may fail too, but we should not stop to receive the task queue too long, # because it contains control tasks. @logger.warn "loader: #{type} DB connection still failing (since #{start_time}); give up." return false end end end