class RocketJob::Sliced::Slices

Attributes

all[R]
collection_name[RW]
slice_class[RW]
slice_size[RW]

Public Class Methods

new(collection_name:, slice_class: Sliced::Slice, slice_size: 100) click to toggle source

Parameters

name: [String]
  Name of the collection to create
slice_size: [Integer]
  Number of records to store in each slice
  Default: 100
slice_class: [class]
  Slice class to use to hold records.
  Default: RocketJob::Sliced::Slice
# File lib/rocket_job/sliced/slices.rb, line 20
def initialize(collection_name:, slice_class: Sliced::Slice, slice_size: 100)
  @slice_class     = slice_class
  @slice_size      = slice_size
  @collection_name = collection_name

  # Using `Sliced::Slice` avoids having to add `_type` as an index when all slices are the same type anyway.
  @all = Sliced::Slice.with_collection(collection_name)
end

Public Instance Methods

<<(slice, input_slice = nil)
Alias for: insert
append(slice, input_slice) click to toggle source

Append to an existing slice if already present

# File lib/rocket_job/sliced/slices.rb, line 99
def append(slice, input_slice)
  existing_slice = all.where(id: input_slice.id).first
  return insert(slice, input_slice) unless existing_slice

  extra_records          = slice.is_a?(Slice) ? slice.records : slice
  existing_slice.records = existing_slice.records + extra_records
  existing_slice.save!
  existing_slice
end
completed() click to toggle source

Forwardable generates invalid warnings on these methods.

# File lib/rocket_job/sliced/slices.rb, line 131
def completed
  all.completed
end
create(params = {}) click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 33
def create(params = {})
  slice = new(params)
  slice.save
  slice
end
create!(params = {}) click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 39
def create!(params = {})
  slice = new(params)
  slice.save!
  slice
end
create_indexes() click to toggle source

Index for find_and_modify only if it is not already present

# File lib/rocket_job/sliced/slices.rb, line 112
def create_indexes
  missing =
    begin
      all.collection.indexes.none? { |i| i["name"] == "state_1__id_1" }
    rescue Mongo::Error::OperationFailure
      true
    end
  all.collection.indexes.create_one({state: 1, _id: 1}, unique: true) if missing
end
drop() click to toggle source

Drop this collection when it is no longer needed

# File lib/rocket_job/sliced/slices.rb, line 126
def drop
  all.collection.drop
end
each(&block) click to toggle source

Returns output slices in the order of their id which is usually the order in which they were written.

# File lib/rocket_job/sliced/slices.rb, line 47
def each(&block)
  all.sort(id: 1).each(&block)
end
failed() click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 135
def failed
  all.failed
end
first() click to toggle source

Mongoid does not apply ordering, add sort rubocop:disable Style/RedundantSort

# File lib/rocket_job/sliced/slices.rb, line 149
def first
  all.sort("_id" => 1).first
end
group_exceptions() click to toggle source

Returns [Array<Struct>] grouped exceptions by class name, and unique exception messages by exception class.

Each struct consists of:

class_name: [String]
  Exception class name.

count: [Integer]
  Number of exceptions with this class.

messages: [Array<String>]
  Unique list of error messages.
# File lib/rocket_job/sliced/slices.rb, line 171
def group_exceptions
  result_struct = Struct.new(:class_name, :count, :messages)
  result        = all.collection.aggregate(
    [
      {
        "$match" => {state: "failed"}
      },
      {
        "$group" => {
          _id:      {error_class: "$exception.class_name"},
          messages: {"$addToSet" => "$exception.message"},
          count:    {"$sum" => 1}
        }
      }
    ]
  )
  result.collect do |errors|
    result_struct.new(errors["_id"]["error_class"], errors["count"], errors["messages"])
  end
end
insert(slice, input_slice = nil) click to toggle source

Insert a new slice into the collection

Returns [Integer] the number of records uploaded

Parameters

slice [RocketJob::Sliced::Slice | Array]
  The slice to write to the slices collection
  If slice is an Array, it will be converted to a Slice before inserting
  into the slices collection

input_slice [RocketJob::Sliced::Slice]
  The input slice to which this slice corresponds
  The id of the input slice is copied across
  If the insert results in a duplicate record it is ignored, to support
  restarting of jobs that failed in the middle of processing.
  A warning is logged that the slice has already been processed.

Note:

`slice_size` is not enforced.
However many records are present in the slice will be written as a
single slice to the slices collection
# File lib/rocket_job/sliced/slices.rb, line 73
def insert(slice, input_slice = nil)
  slice = new(records: slice) unless slice.is_a?(Slice)

  # Retain input_slice id in the new output slice
  if input_slice
    slice.id                  = input_slice.id
    slice.first_record_number = input_slice.first_record_number
  end

  begin
    slice.save!
  rescue Mongo::Error::OperationFailure => e
    # Ignore duplicates since it means the job was restarted
    raise(e) unless e.message.include?("E11000")

    logger.warn "Skipped already processed slice# #{slice.id}"
  end
  slice
end
Also aliased as: <<
insert_many(slices) click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 93
def insert_many(slices)
  documents = slices.collect(&:as_document)
  all.collection.insert_many(documents) if documents.present?
end
last() click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 153
def last
  all.sort("_id" => -1).first
end
new(params = {}) click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 29
def new(params = {})
  slice_class.new(params.merge(collection_name: collection_name))
end
queued() click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 139
def queued
  all.queued
end
running() click to toggle source
# File lib/rocket_job/sliced/slices.rb, line 143
def running
  all.running
end