class Eco::API::Session::Batch::Job
@attr_reader name [String] the name of this `batch job` @attr_reader type [Symbol] a valid batch operation @attr_reader sets [Array<Symbol>] the parts of the person model this batch is supposed to affect @attr_reader usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job` @attr_reader status [Eco::API::Session::Batch::Status] if launched: the `status` of the `batch` @attr_reader feedback [Eco::API::Session::Batch::Feedback] helper class for feedback and end-user decision making
Attributes
Public Class Methods
@param e [Eco::API::Common::Session::Environment] requires a session environment, as any child of `Eco::API::Common::Session::BaseSession`. @param name [String] the name of this `batch job` @param type [Symbol] a valid batch operation. @param usecase [Eco::API::UseCases::UseCase, nil] when provided: `usecase` that generated this `batch job`.
This is necessary to know the `options` used to run the usecase, which could modify the `Batch::Job` behviour.
Eco::API::Common::Session::BaseSession::new
# File lib/eco/api/session/batch/job.rb, line 37 def initialize(e, name:, type:, sets:, usecase: nil) raise "A name is required to refer a job. Given: #{name}" if !name raise "Type should be one of #{self.class.types}. Given: #{type}" unless self.class.valid_type?(type) raise "Sets should be some of #{self.class.sets}. Given: #{sets}" unless self.class.valid_sets?(sets) raise "usecase must be a Eco::API::UseCases::UseCase object. Given: #{usecase.class}" if usecase && !usecase.is_a?(Eco::API::UseCases::UseCase) super(e) @name = name @type = type @sets = [sets].flatten.compact @usecase = usecase @feedback = Eco::API::Session::Batch::Feedback.new(job: self) reset end
# File lib/eco/api/session/batch/job.rb, line 22 def valid_sets?(value) sts = [value].flatten sts.all? { |s| sets.include?(s) } end
# File lib/eco/api/session/batch/job.rb, line 18 def valid_type?(value) types.include?(value) end
Public Instance Methods
Adds an entry(ies) to the job queue. @param entry [Ecoportal::API::V1::Person, Enumberable<Person>] the person(s) we want to update, carrying the changes to be done. @param unique [Boolean] specifies if repeated entries should be avoided in the queue. @yield [person] callback before launching the batch job request against the server. @yieldparam person [Person] current person object that that should be treated by the callback before launching the batch. @return [Eco::API::Session::Batch::Job] this `Batch::Job`.
# File lib/eco/api/session/batch/job.rb, line 95 def add(entry, unique: true, &block) case entry when Enumerable entry.each {|e| add(e, unique: unique, &block)} else unless !entry unless unique && @queue_hash.key?(entry) @queue_hash[entry] = true @queue.push(entry) @callbacks[entry] = Proc.new if block_given? end end end self end
Creates an empty `Batch::Job` with same behaviour as the current one @note
* this job will not be linked to the `Batch::Jobs` model of the current session * mostly used for error_handlers
@return [Eco::API::Session::Batch::Job]
# File lib/eco/api/session/batch/job.rb, line 65 def dup(name = "ad-hoc:job-from:#{self.name}", usecase: self.usecase) self.class.new(enviro, name: name, type: type, sets: sets, usecase: usecase) end
@return [Boolean] `true` if the current batch job is a result of an error_handler
# File lib/eco/api/session/batch/job.rb, line 75 def error_handler? usecase? && usecase.is_a?(Eco::API::Error::Handler) end
@see Eco::API::Session::Batch::Status#errors?
@return [Boolean] `true` if there were Server errors, `false` otherwise
# File lib/eco/api/session/batch/job.rb, line 131 def errors? status && status.errors? end
Processes the `queue` and, unless `simulate` is `true`, launches against the server:
1. pre_processes the queue obtaining the `requests`: - if the entries of `queue` got pending _callbacks_ (delayed changes), it processes them - unless type == `:create`: if there's a defined `api_excluded` _callback_ it calls it (see {Eco::API::Session::Config::People#api_excluded}) - transforms the result to a `Eco::API::Organization::People` object - if there are `api policies` defined, it passes the entries through them in order (see {Eco::API::Session::Config#policies}) - this step is **skipped** if the option `-skip-api-policies` was used in the command line - at this point all the transformations have taken place... - only include the entries that, after all above, still hold pending changes (`!as_update.empty?`) to be launched as update 2. pre launch checks against the `requests`: - it generates `stats` (`Eco::API::Session::Batch::RequestStats`) out of the requests - if there is a batch policy declared for the current job `type`, it checks compliance against `stats` (see {Eco::API::Session::Batch::Policies}), - a non-compliant batch will stop the current session by raising an `Exception` - this setp is **skipped** if the option `-skip-batch-policy` was used in the command line 3. if we are **not** in `dry-run` (or `simulate`), it: - backs up the raw queries (`requests`) launched to the Server, if we are **not** in `dry-run` (or `simulate`) - **launches the batch** request against the _Server_ (see {Eco::API::Session::Batch#launch}) - links the resulting batch `status` to this `Batch::Job` (see {Eco::API::Session::Batch::Status}) - prints any `errors` replied by the _Server_ 4. the post launch kicks in, and: - for success requests, it consolidates the associated entries (see `Ecoportal::API::V1::Person#consolidate!`) - launches specific error handlers, if there were **errors** from the Server as a result of the `batch.launch`, and there are `Error::Handlers` defined
@return [Eco::API::Session::Batch::Status]
# File lib/eco/api/session/batch/job.rb, line 165 def launch(simulate: false) pqueue = processed_queue @requests = as_update(pqueue) pre_checks(requests, simulate: simulate) if simulate if options.dig(:requests, :backup) req_backup = as_update(pqueue, add_feedback: false) backup_update(req_backup, simulate: simulate) end else if pqueue.length > 0 req_backup = as_update(pqueue, add_feedback: false) backup_update(req_backup) session.batch.launch(pqueue, method: type).tap do |job_status| @status = job_status status.root = self status.errors.print end end end unless requests.empty? || !simulate logger.info("--- simulate mode (dry-run) -- job '#{name}' -- this would have launched #{type.to_s.upcase}") end post_launch(queue: pqueue, simulate: simulate) @pending = false return status end
@return [Hash] options the root `usecase` is run with
# File lib/eco/api/session/batch/job.rb, line 85 def options usecase?? usecase.options : {} end
@return [Boolean] has been this `batch job` launched?
# File lib/eco/api/session/batch/job.rb, line 112 def pending? @pending end
Helper/shortcut to obtain a people object out of `input` @note if `input` is not provided, it will use `queue` @return [Eco::API::Organization::People]
# File lib/eco/api/session/batch/job.rb, line 138 def people(input = @queue) Eco::API::Organization::People.new(input) end
@see Eco::API::Session::Batch::Feedback#request_stats
# File lib/eco/api/session/batch/job.rb, line 125 def request_stats(requests = nil) feedback.request_stats(requests || self.requests) end
@note it requires launch to be firstly invoked @raise [Exception] if 'launch' has not firstly invoked @return [Enumbrable<Hash>] the last requests that the queue will generate
# File lib/eco/api/session/batch/job.rb, line 119 def requests raise "Method missuse. Firstly 'launch' should be invoked" unless instance_variable_defined?(:@requests) @requests end
# File lib/eco/api/session/batch/job.rb, line 52 def reset @queue = [] @queue_hash = {} @callbacks = {} @pending = true @status = nil end
@return [Eco::API::Session::Batch::Jobs] group of subjobs of this `Batch::Job`
# File lib/eco/api/session/batch/job.rb, line 70 def subjobs @subjobs ||= Eco::API::Session::Batch::Jobs.new(enviro, name: "childs-of:#{self.name}") end
Provides a text summary of the current status including:
1. stats of the changes introduced by the job in the different parts of the person model 2. if the job is compliant with the batch policy 3. error messages in case they were errors from the server
@note if `launch` was not invoked, it specifies so @return [String] the summary
# File lib/eco/api/session/batch/job.rb, line 202 def summary [].tap do |msg| if pending? msg << "PENDING - Batch #{type.to_s.upcase} - job '#{name}' - length: #{@queue.length}" else msg << feedback.generate(requests, only_stats: true) if batch_policy && !batch_policy.compliant?(request_stats) msg << "Batch Policy Uncompliance:" msg << batch_policy.uncompliance(request_stats) end msg << status.errors.message unless !status msg << subjobs_summary end end.join("\n") end
@return [Boolean] was this `batch job` generated by a `usecase`? (`Eco::API::UseCases::UseCase`)
# File lib/eco/api/session/batch/job.rb, line 80 def usecase? !!usecase end
Private Instance Methods
if there is a config definition to exclude entries and the current batch is not a creation batch
-
filter out excluded entries from the api update
# File lib/eco/api/session/batch/job.rb, line 253 def api_included(full_queue) return full_queue if type == :create return full_queue unless excluded_callback = session.config.people.api_excluded excluded = options.dig(:include, :excluded) if excluded.is_a?(Hash) && excluded[:only] full_queue.select {|entry| excluded_callback.call(entry, session, options, self)} elsif options.dig(:include, :excluded) full_queue else full_queue.select {|entry| !excluded_callback.call(entry, session, options, self)} end end
Applies the changes introduced by api policies
# File lib/eco/api/session/batch/job.rb, line 267 def apply_policies(pre_queue) people(pre_queue).tap do |entries| policies = session.policies unless policies.empty? || options.dig(:skip, :api_policies) policies.launch(people: entries, session: session, options: options, job: self) end end end
# File lib/eco/api/session/batch/job.rb, line 228 def as_update(data, *args) if data.is_a?(Array) data.map do |e| feedback.as_update(e, *args) end.compact.select {|e| e && !e.empty?} else feedback.as_update(data, *args) end end
Keep a copy of the requests for future reference
# File lib/eco/api/session/batch/job.rb, line 343 def backup_update(requests, simulate: false) dry_run = simulate ? "_dry_run" : "" dir = config.people.requests_folder file = File.join(dir, "#{type}_data#{dry_run}.json") file_manager.save_json(requests, file, :timestamp) end
Shortcut to get the batch (belt) policy
# File lib/eco/api/session/batch/job.rb, line 277 def batch_policy unless options.dig(:skip, :batch_policy) @batch_policy ||= session.config.batch_policies[self.type] end end
after launched to the server
1. `consolidate!` person model if succeeded (person.doc -> person.original_doc) 2. if there were errors: launch specific error handlers if they are defined for the type of error
# File lib/eco/api/session/batch/job.rb, line 304 def post_launch(queue: [], simulate: false) if !simulate && status status.queue.map do |entry| if status.success?(entry) if type == :create && entry.respond_to?(:id=) entry.id = status[entry].body["id"] end entry.consolidate! if entry.respond_to?(:consolidate!) #else # do not entry.reset! (keep track on changes still) end end # launch error_handlers handlers = session.config.error_handlers if status.errors.any? && !handlers.empty? && !error_handler? err_types = status.errors.by_type logger.debug("(#{self.name}) got these error types: #{err_types.keys}") handlers.each do |handler| if entries = err_types[handler.name] handler_job = subjobs_add("#{self.name} => #{handler.name}", usecase: handler) logger.debug("Running error handler #{handler.name}") handler.launch(people: people(entries), session: session, options: options, job: handler_job) logger.debug("Launching job of error handler: #{handler_job.name}") handler_job.launch(simulate: simulate) end end end elsif simulate fake_id = 111111111111111111111111 queue.map do |entry| if type == :create && entry.respond_to?(:id=) entry.id = fake_id.to_s fake_id += 1 end entry.consolidate! if entry.respond_to?(:consolidate!) end end end
Checks batch policy compliance and displays the feedback on request stats
# File lib/eco/api/session/batch/job.rb, line 284 def pre_checks(requests, simulate: false) only_stats = options.dig(:feedback, :only_stats) max_chars = simulate ? 2500 : 800 msg = feedback.generate(requests, max_chars: max_chars, only_stats: only_stats) logger.info(msg) # batch_policy stats = request_stats(requests) if simulate && batch_policy && !batch_policy.compliant?(stats) logger.warn("Batch Policy Uncompliance: this and next batches will be aborted!") logger.warn(batch_policy.uncompliance(stats)) elsif batch_policy # will throw an Exception if the policy request_stats is not compliant batch_policy.validate!(stats) end end
# File lib/eco/api/session/batch/job.rb, line 238 def processed_queue @queue.each {|e| @callbacks[e].call(e) if @callbacks.key?(e) } apply_policies(api_included(@queue)).select do |e| !as_update(e).empty? end.select do |e| next true unless e.is_a?(Ecoportal::API::V1::Person) next true unless e.new? # new people should either have account or details e.account || e.details end end
Adds a job tied to the current job Used with error handlers that need their own job to run
# File lib/eco/api/session/batch/job.rb, line 352 def subjobs_add(name = "ad-hoc:job-from:#{self.name}", usecase: self.usecase, &block) dup(name, usecase: usecase).tap do |subjob| subjobs.add(subjob, &block) end end
# File lib/eco/api/session/batch/job.rb, line 221 def subjobs_summary return "" unless subjobs.count > 0 [].tap do |msg| subjobs.map {|subjob| msg << subjob.summary} end.join("\n") end