module RocketJob::Batch::Worker

Attributes

rocket_job_record_number[RW]

While working on a slice, the current slice is available via this reader

rocket_job_slice[RW]

While working on a slice, the current slice is available via this reader

Public Instance Methods

rocket_job_active_workers(server_name = nil) click to toggle source

Returns [Array<ActiveWorker>] All workers actively working on this job

# File lib/rocket_job/batch/worker.rb, line 96
def rocket_job_active_workers(server_name = nil)
  servers = []
  case sub_state
  when :before, :after
    if running? && (server_name.nil? || worker_on_server?(server_name))
      servers << ActiveWorker.new(worker_name, started_at, self)
    end
  when :processing
    query = input.running
    query = query.where(worker_name: /\A#{server_name}/) if server_name
    query.each do |slice|
      servers << ActiveWorker.new(slice.worker_name, slice.started_at, self)
    end
  end
  servers
end
rocket_job_batch_callbacks(worker) click to toggle source

Run Batch before and after callbacks

# File lib/rocket_job/batch/worker.rb, line 310
def rocket_job_batch_callbacks(worker)
  # If this is the first worker to pickup this job
  case sub_state
  when :before
    rocket_job_batch_run_before_callbacks
    # Check for 0 record jobs
    rocket_job_batch_complete?(worker.name) if running?
  when sub_state == :after
    rocket_job_batch_run_after_callbacks
  end
end
rocket_job_batch_complete?(worker_name) click to toggle source

Checks for completion and runs after_batch if defined Returns true if the job is now complete/aborted/failed

# File lib/rocket_job/batch/worker.rb, line 201
def rocket_job_batch_complete?(worker_name)
  return true unless running?
  return false unless record_count

  # Only failed slices left?
  input_count  = input.count
  failed_count = input.failed.count
  if failed_count.positive? && (input_count == failed_count)
    # Reload to pull in any counters or other data that was modified.
    reload unless new_record?
    rocket_job_batch_fail!(worker_name) if may_fail?
    return true
  end

  # Any work left?
  return false if input_count.positive?

  # If the job was not saved to the queue, do not save any changes
  if new_record?
    rocket_job_batch_run_after_callbacks(false)
    return true
  end

  # Complete job iff no other worker has already completed it
  # Must set write concern to at least 1 since we need the nModified back
  result = self.class.with(write: {w: 1}) do |query|
    query.
      where(id: id, state: :running, sub_state: :processing).
      update("$set" => {sub_state: "after", worker_name: worker_name})
  end

  # Reload to pull in any counters or other data that was modified.
  reload

  if result.modified_count.positive?
    rocket_job_batch_run_after_callbacks(false)
  elsif aborted?
    # Repeat cleanup in case this worker was still running when the job was aborted
    cleanup!
  end
  true
end
rocket_job_batch_fail!(worker_name) click to toggle source

Fail the job

# File lib/rocket_job/batch/worker.rb, line 245
def rocket_job_batch_fail!(worker_name)
  fail_job = true

  unless new_record?
    # Fail job iff no other worker has already finished it
    # Must set write concern to at least 1 since we need the nModified back
    result   = self.class.with(write: {w: 1}) do |query|
      query.
        where(id: id, state: :running, sub_state: :processing).
        update({"$set" => {state: "failed", worker_name: worker_name}})
    end
    fail_job = false unless result.modified_count.positive?
  end

  return unless fail_job

  message        = "#{input.failed.count} slices failed to process"
  self.exception = JobException.new(message: message)
  new_record? ? fail(worker_name, message) : fail!(worker_name, message)
end
rocket_job_batch_perform(slice, record) { |record| ... } click to toggle source

Perform a single record within the current slice.

# File lib/rocket_job/batch/worker.rb, line 175
def rocket_job_batch_perform(slice, record)
  @rocket_job_record_number = slice.current_record_number

  return block_given? ? yield(record) : perform(record) if _perform_callbacks.empty?

  # @rocket_job_input and @rocket_job_output can be modified by before/around callbacks
  @rocket_job_input  = record
  @rocket_job_output = nil

  run_callbacks(:perform) do
    @rocket_job_output =
      if block_given?
        yield(@rocket_job_input)
      else
        perform(@rocket_job_input)
      end
  end

  @rocket_job_input  = nil
  result             = @rocket_job_output
  @rocket_job_output = nil
  result
end
rocket_job_batch_run_after_callbacks(save_before = true) click to toggle source

Run the after_batch callbacks Saves the current state before and after running callbacks if callbacks present

# File lib/rocket_job/batch/worker.rb, line 288
def rocket_job_batch_run_after_callbacks(save_before = true)
  unless _after_batch_callbacks.empty?
    self.sub_state = :after
    save! if save_before && !new_record? && !destroyed?
    logger.measure_info(
      "after_batch",
      metric:             "#{self.class.name}/after_batch",
      log_exception:      :full,
      on_exception_level: :error,
      silence:            log_level
    ) do
      run_callbacks(:after_batch)
    end
  end
  if new_record? || destroyed?
    complete if may_complete?
  else
    may_complete? ? complete! : save!
  end
end
rocket_job_batch_run_before_callbacks() click to toggle source

Run the before_batch callbacks Saves the current state before and after running callbacks if callbacks present

# File lib/rocket_job/batch/worker.rb, line 268
def rocket_job_batch_run_before_callbacks
  unless _before_batch_callbacks.empty?
    self.sub_state = :before
    save! unless new_record? || destroyed?
    logger.measure_info(
      "before_batch",
      metric:             "#{self.class.name}/before_batch",
      log_exception:      :full,
      on_exception_level: :error,
      silence:            log_level
    ) do
      run_callbacks(:before_batch)
    end
  end
  self.sub_state = :processing
  save! unless new_record? || destroyed?
end
rocket_job_batch_throttled?(slice, worker) click to toggle source
# File lib/rocket_job/batch/worker.rb, line 113
def rocket_job_batch_throttled?(slice, worker)
  filter = self.class.rocket_job_batch_throttles.matching_filter(self, slice)
  return false unless filter

  # Restore retrieved slice so that other workers can process it later.
  slice.set(worker_name: nil, state: :queued, started_at: nil)
  worker.add_to_current_filter(filter)
  true
end
rocket_job_perform_slice(slice, &block) click to toggle source

Perform individual slice without callbacks

# File lib/rocket_job/batch/worker.rb, line 145
def rocket_job_perform_slice(slice, &block)
  slice.processing_record_number ||= 0
  append                         = false

  # Skip processed records in this slice if it has no output categories.
  records =
    if slice.processing_record_number.to_i > 1
      append = true
      logger.info("Resuming previously incomplete slice from record number #{slice.processing_record_number}")
      slice.records[slice.processing_record_number - 1..-1]
    else
      # Reprocess all records in this slice.
      slice.processing_record_number = 0
      slice.records
    end

  count = 0
  RocketJob::Sliced::Writer::Output.collect(self, input_slice: slice, append: append) do |writer|
    records.each do |record|
      slice.processing_record_number += 1
      SemanticLogger.named_tagged(record: slice.current_record_number) do
        writer << rocket_job_batch_perform(slice, record, &block)
        count += 1
      end
    end
  end
  count
end
rocket_job_process_slice(slice, &block) click to toggle source

Process a single slice from Mongo Once the slice has been successfully processed it will be removed from the input collection Returns [Integer] the number of records successfully processed

# File lib/rocket_job/batch/worker.rb, line 126
def rocket_job_process_slice(slice, &block)
  @rocket_job_slice = slice
  count             = 0

  run_callbacks(:slice) do
    # Allow before_slice callbacks to fail, complete or abort this slice.
    return 0 unless running?

    count = rocket_job_perform_slice(slice, &block)
  end
  @rocket_job_slice = nil

  # On successful completion remove the slice from the input queue
  # TODO: Add option to complete slice instead of destroying it to retain input data.
  slice.destroy
  count
end
rocket_job_work(worker, re_raise_exceptions = false) click to toggle source

Processes records in each available slice for this job. Slices are processed one at a time to allow for concurrent calls to this method to increase throughput. Processing will continue until there are no more slices available for this job.

Returns [true|false] whether any work was performed.

Slices are destroyed after their records are successfully processed

If an exception was thrown the entire slice of records is marked as failed.

Thread-safe, can be called by multiple threads at the same time

# File lib/rocket_job/batch/worker.rb, line 29
def rocket_job_work(worker, re_raise_exceptions = false)
  raise "Job must be started before calling #rocket_job_work" unless running?

  start_time = Time.now
  if sub_state != :processing
    fail_on_exception!(re_raise_exceptions) { rocket_job_batch_callbacks(worker) }
    return false unless running?
  end

  SemanticLogger.named_tagged(job: id.to_s) do
    until worker.shutdown?
      slice = input.next_slice(worker.name)
      if slice
        # Grab a slice before checking the throttle to reduce concurrency race condition.
        return true if slice.fail_on_exception!(re_raise_exceptions) { rocket_job_batch_throttled?(slice, worker) }
        next if slice.failed?

        slice.fail_on_exception!(re_raise_exceptions) { rocket_job_process_slice(slice) }
      elsif record_count && fail_on_exception!(re_raise_exceptions) { rocket_job_batch_complete?(worker.name) }
        return false
      else
        logger.debug "No more work available for this job"
        worker.add_to_current_filter(throttle_filter_id)
        return true
      end

      # Allow new jobs with a higher priority to interrupt this job
      break if (Time.now - start_time) >= Config.re_check_seconds
    end
  end
  false
end
work_first_slice(&block) click to toggle source

Prior to a job being made available for processing it can be processed one slice at a time.

For example, to extract the header row which would be in the first slice.

Returns [Integer] the number of records processed in the slice

Note: The slice will be removed from processing when this method completes

@deprecated Please open a ticket if you need this behavior.

# File lib/rocket_job/batch/worker.rb, line 72
def work_first_slice(&block)
  raise "#work_first_slice can only be called from within before_batch callbacks" unless sub_state == :before

  # TODO: Make these settings configurable
  count        = 0
  wait_seconds = 5
  while input.first.nil?
    break if count > 10

    logger.info "First slice has not arrived yet, sleeping for #{wait_seconds} seconds"
    sleep wait_seconds
    count += 1
  end

  slice = input.first
  # No records processed
  return 0 unless slice

  # TODO: Persist that the first slice is being processed by this worker
  slice.start
  rocket_job_process_slice(slice, &block)
end