class Bricolage::StreamingLoad::Job

Constants

LoadLog
MAX_RETRY

Attributes

data_ds[R]
job_id[R]

For tests

manifest[R]
process_id[R]
task[R]

Public Class Methods

new(context:, ctl_ds:, data_ds:, log_table: 'strload_load_logs', task_id:, force: false, logger:) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 22
def initialize(context:, ctl_ds:, data_ds:, log_table: 'strload_load_logs', task_id:, force: false, logger:)
  @context = context
  @ctl_ds = ctl_ds
  @data_ds = data_ds
  @log_table = log_table
  @task_id = task_id
  @force = force
  @logger = logger

  @task = nil
  @job_id = nil
  @manifest = nil
end

Public Instance Methods

do_load(task, job_id) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 152
def do_load(task, job_id)
  params = JobParams.load(@context, task.task_class, task.schema_name, task.table_name)
  @data_ds = params.ds
  @manifest = ManifestFile.create(ds: params.ctl_bucket, job_id: job_id, object_urls: task.object_urls, logger: @logger)
  log = LoadLog.new
  log.task_id = @task_id
  log.job_id = job_id
  DataConnection.open(params.ds, log_table: @log_table, logger: @logger) {|data|
    if params.enable_work_table?
      data.load_with_work_table params.work_table, @manifest, params.load_options_string, params.sql_source, log
    else
      data.load_objects params.dest_table, @manifest, params.load_options_string, log
    end
  }
end
execute(fail_fast: false) click to toggle source

Returns true -> Deletes a SQS message Returns false -> Keeps a SQS message

# File lib/bricolage/streamingload/job.rb, line 45
def execute(fail_fast: false)
  execute_task
  return true
rescue JobCancelled => ex
  Raven.capture_exception(ex)
  return true
rescue JobDuplicated
  return true
rescue JobDefered
  return false
rescue ControlConnectionFailed => ex
  @logger.error ex.message
  Raven.capture_exception(ex)
  wait_for_connection('ctl', @ctl_ds) unless fail_fast
  return false
rescue DataConnectionFailed => ex
  @logger.error ex.message
  Raven.capture_exception(ex)
  wait_for_connection('data', @data_ds) unless fail_fast
  return false
rescue JobFailure => ex
  @logger.error ex.message
  Raven.capture_exception(ex)
  return false
rescue JobError => ex
  @logger.error ex.message
  Raven.capture_exception(ex)
  return false
rescue Exception => ex
  @logger.exception ex
  Raven.capture_exception(ex)
  return true
end
execute_task() click to toggle source
# File lib/bricolage/streamingload/job.rb, line 81
def execute_task
  @process_id = "#{Socket.gethostname}-#{$$}"
  @logger.info "execute task: task_id=#{@task_id} force=#{@force} process_id=#{@process_id}"
  ctl = ControlConnection.new(@ctl_ds, @logger)

  ctl.open {
    @task = ctl.load_task(@task_id)
    @logger.info "task details: task_id=#{@task_id} table=#{@task.schema_name}.#{@task.table_name}"
    if @task.disabled
      # We do not record disabled job in the DB.
      @logger.info "task is disabled; defer task: task_id=#{@task_id}"
      raise JobDefered, "defered: task_id=#{@task_id}"
    end

    if @task.unknown_state?
      true_status = DataConnection.open(@data_ds, log_table: @log_table, logger: @logger) {|data|
        data.get_job_status(@task.last_job_id)
      }
      @logger.info "fixiating unknown job status: job_id=#{@task.last_job_id}, status=(unknown->#{true_status})"
      @task.fix_last_job_status true_status
      ctl.fix_job_status @task.last_job_id, true_status
      @logger.info "job status fixed."
    end

    @job_id = ctl.begin_job(@task_id, @process_id, @force)
    unless @job_id
      @logger.warn "task is already succeeded and not forced; discard task: task_id=#{@task_id}"
      ctl.commit_duplicated_job @task_id, @process_id
      raise JobDuplicated, "duplicated: task_id=#{@task_id}"
    end
  }

  begin
    do_load @task, @job_id
    ctl.open {
      ctl.commit_job @job_id, (@force ? 'forced' : nil)
    }
  rescue ControlConnectionFailed
    raise
  rescue DataConnectionFailed => ex
    ctl.open {
      ctl.abort_job job_id, 'unknown', ex.message.lines.first.strip
    }
    raise
  rescue JobFailure => ex
    ctl.open {
      fail_count = @task.failure_count
      final_retry = (fail_count >= MAX_RETRY)
      retry_msg = (fail_count > 0) ? "(retry\##{fail_count}#{final_retry ? ' FINAL' : ''}) " : ''
      ctl.abort_job job_id, 'failure', retry_msg + ex.message.lines.first.strip
      raise JobCancelled, "retry count exceeds limit: task_id=#{@task_id}" if final_retry
    }
    raise
  rescue JobError => ex
    ctl.open {
      fail_count = @task.failure_count
      final_retry = (fail_count >= MAX_RETRY)
      retry_msg = (fail_count > 0) ? "(retry\##{fail_count}#{final_retry ? ' FINAL' : ''}) " : ''
      ctl.abort_job job_id, 'error', retry_msg + ex.message.lines.first.strip
      raise JobCancelled, "retry count exceeds limit: task_id=#{@task_id}" if final_retry
    }
    raise
  rescue Exception => ex
    @logger.exception ex
    ctl.open {
      ctl.abort_job job_id, 'error', ex.message.lines.first.strip
    }
    raise JobError, "#{ex.class}: #{ex.message}"
  end
end
wait_for_connection(type, ds) click to toggle source
# File lib/bricolage/streamingload/job.rb, line 168
def wait_for_connection(type, ds)
  @logger.warn "loader: #{type} DB connection lost; polling..."
  start_time = Time.now
  n = 0
  while true
    begin
      ds.open {}
      @logger.warn "loader: #{type} DB connection recovered; return to normal state"
      return true
    rescue ConnectionError
    end
    sleep 15
    n += 1
    if n == 120  # 30 min
      # Could not get a connection in 30 minutes, now we return to the queue loop.
      # Next job may fail too, but we should not stop to receive the task queue too long,
      # because it contains control tasks.
      @logger.warn "loader: #{type} DB connection still failing (since #{start_time}); give up."
      return false
    end
  end
end