module RocketJob::Batch::IO
IO
methods for sliced jobs
Public Instance Methods
Download the output data into the supplied file, io, IOStreams::Path
, or IOStreams::Stream. Returns [Integer] the number of records / lines downloaded.
Parameters
stream [String | IO | IOStreams::Path | IOStreams::Stream] Full path and file name to stream into the job, Or, an IO stream that responds to: :write Or, an IOStreams path such as IOStreams::Paths::File, or IOStreams::Paths::S3
Example: Zip
# Since csv is not known to RocketJob it is ignored job.download('myfile.csv.zip')
Example: Encrypted Zip
job.download('myfile.csv.zip.enc')
Example: Explicitly set the streams
path = IOStreams.path('myfile.ze').stream(:zip).stream(:enc) job.download(path)
Example: Supply custom options
path = IOStreams.path('myfile.csv.enc').option(:enc, compress: false) job.download(path)
Example: Supply custom options. Set the file name within the zip file.
path = IOStreams.path('myfile.csv.zip').option(:zip, zip_file_name: 'myfile.csv') job.download(path)
Example: Download into a tempfile, or stream, using the original file name to determine the streams to apply:
tempfile = Tempfile.new('my_project') stream = IOStreams.stream(tempfile).file_name('myfile.gz.enc') job.download(stream)
Example: Add a header and/or trailer record to the downloaded file:
IOStreams.path('/tmp/file.txt.gz').writer do |writer| writer << "Header\n" job.download do |line| writer << line + "\n" end writer << "Trailer\n" end
Example: Add a header and/or trailer record to the downloaded file, letting the line writer add the line breaks:
IOStreams.path('/tmp/file.txt.gz').writer(:line) do |writer| writer << "Header" job.download do |line| writer << line end writer << "Trailer" end
Notes:
-
The records are returned in ‘_id’ order. Usually this is the order in which the records were originally loaded.
# File lib/rocket_job/batch/io.rb, line 443 def download(stream = nil, category: :main, header_line: nil, **args, &block) raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing? category = output_category(category) unless category.is_a?(Category::Output) output_collection = output(category) # Store the output file name in the category category.file_name = stream if !block && (stream.is_a?(String) || stream.is_a?(IOStreams::Path)) header_line ||= category.render_header return output_collection.download(header_line: header_line, &block) if block raise(ArgumentError, "Missing mandatory `stream` or `category.file_name`") unless stream || category.file_name if output_collection.slice_class.binary_format binary_header_line = output_collection.slice_class.to_binary(header_line) if header_line # Don't overwrite supplied stream options if any stream = stream&.is_a?(IOStreams::Stream) ? stream.dup : IOStreams.new(category.file_name) stream.remove_from_pipeline(output_collection.slice_class.binary_format) stream.writer(**args) do |io| # TODO: Binary formats should return the record count, instead of the slice count. output_collection.download(header_line: binary_header_line) { |record| io.write(record) } end else # TODO: Add category to named tags to aid problem determination # And RJ Download metric with duration IOStreams.new(stream || category.file_name).writer(:line, **args) do |io| output_collection.download(header_line: header_line) { |record| io << record } end end end
Returns [RocketJob::Sliced::Input] input collection for holding input slices
Parameters:
category [Symbol|RocketJob::Category::Input] The category or the name of the category to access or upload data into Default: None ( Uses the single default input collection for this job ) Validates: This value must be one of those listed in #input_categories
# File lib/rocket_job/batch/io.rb, line 16 def input(category = :main) category = input_category(category) (@inputs ||= {})[category.name] ||= category.data_store(self) end
Returns [RocketJob::Sliced::Output] output collection for holding output slices Returns nil if no output is being collected
Parameters:
category [Symbol|RocketJob::Category::Input] The category or the name of the category to access or download data from Default: None ( Uses the single default output collection for this job ) Validates: This value must be one of those listed in #output_categories
# File lib/rocket_job/batch/io.rb, line 30 def output(category = :main) category = output_category(category) (@outputs ||= {})[category.name] ||= category.data_store(self) end
Upload sliced range of integer requests as an arrays of start and end ids starting with the last range first
Returns [Integer] the number of slices uploaded.
Uploads one range per slice so that the response can return multiple records for each slice processed. Useful for when the highest order integer values should be processed before the lower integer value ranges. For example when processing every record in a database based on the id column
Example
job.input_category.slice_size = 100 job.upload_integer_range_in_reverse_order(200, 421) # Equivalent to calling: job.input.insert([400,421]) job.input.insert([300,399]) job.input.insert([200,299])
Notes:
-
Only call from one thread at a time against a single instance of this job.
-
The record_count for the job is set to: last_id - start_id + 1.
-
If an exception is raised while uploading data, the input collection is cleared out so that if a job is retried during an upload failure, data is not duplicated.
# File lib/rocket_job/batch/io.rb, line 272 def upload(object = nil, category: :main, file_name: nil, stream_mode: nil, on_first: nil, columns: nil, slice_batch_size: nil, **args, &block) input_collection = input(category) if block raise(ArgumentError, "Cannot supply both an object to upload, and a block.") if object if stream_mode || columns || slice_batch_size || args.size > 0 raise(ArgumentError, "Unknown keyword arguments when uploading a block. Only accepts :category, :file_name, or :on_first") end category = input_category(category) category.file_name = file_name if file_name # Extract the header line during the upload when applicable. extract_header = category.extract_header_callback(on_first) count = input_collection.upload(on_first: extract_header, slice_batch_size: slice_batch_size, &block) self.record_count = (record_count || 0) + count return count end count = case object when Range if file_name || stream_mode || on_first || args.size > 0 raise(ArgumentError, "Unknown keyword arguments when uploading a Range. Only accepts :category, :columns, or :slice_batch_size") end first = object.first last = object.last if first < last input_collection.upload_integer_range(first, last, slice_batch_size: slice_batch_size || 1_000) else input_collection.upload_integer_range_in_reverse_order(last, first, slice_batch_size: slice_batch_size || 1_000) end when Mongoid::Criteria if file_name || stream_mode || on_first || args.size > 0 raise(ArgumentError, "Unknown keyword arguments when uploading a Mongoid::Criteria. Only accepts :category, :columns, or :slice_batch_size") end input_collection.upload_mongo_query(object, columns: columns, slice_batch_size: slice_batch_size, &block) when defined?(ActiveRecord::Relation) ? ActiveRecord::Relation : false if file_name || stream_mode || on_first || args.size > 0 raise(ArgumentError, "Unknown keyword arguments when uploading an ActiveRecord::Relation. Only accepts :category, :columns, or :slice_batch_size") end input_collection.upload_arel(object, columns: columns, slice_batch_size: slice_batch_size, &block) else raise(ArgumentError, "Unknown keyword argument :columns when uploading a file") if columns category = input_category(category) # Extract the header line during the upload when applicable. extract_header = category.extract_header_callback(on_first) path = category.upload_path(object, original_file_name: file_name) input_collection.upload(on_first: extract_header, slice_batch_size: slice_batch_size) do |io| path.each(stream_mode || :line, **args) { |line| io << line } end end self.record_count = (record_count || 0) + count count end
@deprecated
# File lib/rocket_job/batch/io.rb, line 339 def upload_arel(arel, *column_names, category: :main, &block) count = input(category).upload_arel(arel, columns: column_names, &block) self.record_count = (record_count || 0) + count count end
@deprecated
# File lib/rocket_job/batch/io.rb, line 353 def upload_integer_range(start_id, last_id, category: :main, slice_batch_size: 1_000) count = input(category).upload_integer_range(start_id, last_id, slice_batch_size: slice_batch_size) self.record_count = (record_count || 0) + count count end
@deprecated
# File lib/rocket_job/batch/io.rb, line 360 def upload_integer_range_in_reverse_order(start_id, last_id, category: :main, slice_batch_size: 1_000) count = input(category).upload_integer_range_in_reverse_order(start_id, last_id, slice_batch_size: slice_batch_size) self.record_count = (record_count || 0) + count count end
@deprecated
# File lib/rocket_job/batch/io.rb, line 346 def upload_mongo_query(criteria, *column_names, category: :main, &block) count = input(category).upload_mongo_query(criteria, columns: column_names, &block) self.record_count = (record_count || 0) + count count end
Upload the supplied slice for processing by workers
Updates the record_count after adding the records
Returns [Integer] the number of records uploaded
Parameters
`slice` [ Array<Hash | Array | String | Integer | Float | Symbol | Regexp | Time> ] All elements in `array` must be serializable to BSON For example the following types are not supported: Date
Note:
The caller should implement `:slice_size`, since the entire slice is saved as-is.
Note:
Not thread-safe. Only call from one thread at a time
# File lib/rocket_job/batch/io.rb, line 382 def upload_slice(slice, category: :main) input(category).insert(slice) count = slice.size self.record_count = (record_count || 0) + count count end