module AcidicJob
rubocop:disable Metrics/ModuleLength, Metrics/AbcSize, Metrics/MethodLength
Represents an action to perform a no-op. One possible option for a return from an atomic_phase
block.
Represents an action to set a new recovery point. One possible option for a return from an atomic_phase
block.
Represents an action to set a new API response (which will be stored onto an idempotency key). One possible option for a return from an atomic_phase
block.
Constants
- IDEMPOTENCY_KEY_LOCK_TIMEOUT
Number of seconds passed which we consider a held idempotency key lock to be defunct and eligible to be locked again by a different job run. We try to unlock keys on our various failure conditions, but software is buggy, and this might not happen 100% of the time, so this is a hedge against it.
- VERSION
Public Instance Methods
&block &block
# File lib/acidic_job.rb, line 54 def idempotently(with:) # set accessors for each argument passed in to ensure they are available # to the step methods the job will have written define_accessors_for_passed_arguments(with) # execute the block to gather the info on what phases are defined for this job defined_steps = yield # [:create_ride_and_audit_record, :create_stripe_charge, :send_receipt] # convert the array of steps into a hash of recovery_points and callable actions phases = define_atomic_phases(defined_steps) # { create_ride_and_audit_record: <#Method >, ... } # find or create an Key record (our idempotency key) to store all information about this job # side-effect: will set the @key instance variable # # A key concept here is that if two requests try to insert or update within # close proximity, one of the two will be aborted by Postgres because we're # using a transaction with SERIALIZABLE isolation level. It may not look # it, but this code is safe from races. ensure_idempotency_key_record(job_id, defined_steps.first) # if the key record is already marked as finished, immediately return its result return @key.succeeded? if @key.finished? # otherwise, we will enter a loop to process each required step of the job 100.times do # our `phases` hash uses Symbols for keys recovery_point = @key.recovery_point.to_sym case recovery_point when Key::RECOVERY_POINT_FINISHED.to_sym break else raise UnknownRecoveryPoint unless phases.key? recovery_point atomic_phase @key, phases[recovery_point] end end # the loop will break once the job is finished, so simply report the status @key.succeeded? end
# File lib/acidic_job.rb, line 98 def step(method_name) @_steps ||= [] @_steps << method_name @_steps end
Private Instance Methods
# File lib/acidic_job.rb, line 106 def atomic_phase(key, proc = nil, &block) error = false phase_callable = (proc || block) begin key.with_lock do phase_result = phase_callable.call phase_result.call(key: key) end rescue StandardError => e error = e raise e ensure # If we're leaving under an error condition, try to unlock the idempotency # key right away so that another request can try again. begin key.update_columns(locked_at: nil, error_object: error) if error.present? rescue StandardError => e # We're already inside an error condition, so swallow any additional # errors from here and just send them to logs. puts "Failed to unlock key #{key.id} because of #{e}." end end end
# File lib/acidic_job.rb, line 168 def define_accessors_for_passed_arguments(passed_arguments) passed_arguments.each do |accessor, value| # the reader method may already be defined self.class.attr_reader accessor unless respond_to?(accessor) # but we should always update the value to match the current value instance_variable_set("@#{accessor}", value) end true end
# File lib/acidic_job.rb, line 179 def define_atomic_phases(defined_steps) defined_steps << Key::RECOVERY_POINT_FINISHED {}.tap do |phases| defined_steps.each_cons(2).map do |enter_method, exit_method| phases[enter_method] = lambda do method(enter_method).call if exit_method.to_s == Key::RECOVERY_POINT_FINISHED Response.new else RecoveryPoint.new(exit_method) end end end end end
# File lib/acidic_job.rb, line 132 def ensure_idempotency_key_record(key_val, first_step) isolation_level = case ActiveRecord::Base.connection.adapter_name.downcase.to_sym when :sqlite :read_uncommitted else :serializable end serialized_job_info = serialize ActiveRecord::Base.transaction(isolation: isolation_level) do @key = Key.find_by(idempotency_key: key_val) if @key # Programs enqueuing multiple jobs with different parameters but the # same idempotency key is a bug. raise MismatchedIdempotencyKeyAndJobArguments if @key.job_args != serialized_job_info["arguments"] # Only acquire a lock if the key is unlocked or its lock has expired # because the original job was long enough ago. raise LockedIdempotencyKey if @key.locked_at && @key.locked_at > Time.current - IDEMPOTENCY_KEY_LOCK_TIMEOUT # Lock the key and update latest run unless the job is already finished. @key.update!(last_run_at: Time.current, locked_at: Time.current) unless @key.finished? else @key = Key.create!( idempotency_key: key_val, locked_at: Time.current, last_run_at: Time.current, recovery_point: first_step, job_name: serialized_job_info["job_class"], job_args: serialized_job_info["arguments"] ) end end end