class Akasha::CommandRouter::OptimisticTransactor
Default command transactor providing optional optimistic concurrency. Works by loading aggregate from the repo by id, having it handle the command, and saving changes to the aggregate to the repository.
Constants
- MAX_CONFLICT_RETRIES
The default maximum number of retries when conflict is detected.
- MAX_CONFLICT_RETRY_INTERVAL
An upper limit for a retry interval.
- MIN_CONFLICT_RETRY_INTERVAL
A lower limit for a retry interval.
Public Instance Methods
call(aggregate_class, command, aggregate_id, options, **data)
click to toggle source
Have an aggregate handle a command.
-
`aggregate_class` - aggregate class you want to handle the command,
-
`command` - command the aggregate will process, corresponding to a method of the aggregate class.
-
`aggregate_id` - id of the aggregate instance the command is for,
-
`options`:
- concurrency - `:optimistic` or `:none` (default: `:optimistic`); - revision - set to aggregate revision to detect conflicts while saving aggregates (requires `concurrency == :optimistic`); `nil` to just save without concurrency control; - max_conflict_retries - how many times to retry processing a command if a conflict is detected (`ConflictError`); default: MAX_CONFLICT_RETRIES; - min_conflict_retry_interval - minimum time to sleep between retries; default MIN_CO_RETRY_INTERVAL; - max_conflict_retry_interval - maximum time to sleep between retries; default MIN_CO_RETRY_INTERVAL.
-
`data`- command payload.
# File lib/akasha/command_router/optimistic_transactor.rb, line 30 def call(aggregate_class, command, aggregate_id, options, **data) max_conflict_retries = options.fetch(:max_conflict_retries, MAX_CONFLICT_RETRIES) min_conflict_retry_interval = options.fetch(:min_conflict_retry_interval, MIN_CONFLICT_RETRY_INTERVAL) max_conflict_retry_interval = options.fetch(:max_conflict_retry_interval, MAX_CONFLICT_RETRY_INTERVAL) with_retries(base_sleep_seconds: min_conflict_retry_interval, max_sleep_seconds: max_conflict_retry_interval, max_tries: 1 + max_conflict_retries, rescue: [Akasha::ConflictError]) do handle_command(aggregate_class, command, aggregate_id, options, **data) end end
Protected Instance Methods
handle_command(aggregate_class, command, aggregate_id, options, **data)
click to toggle source
# File lib/akasha/command_router/optimistic_transactor.rb, line 42 def handle_command(aggregate_class, command, aggregate_id, options, **data) concurrency, revision = parse_options!(options) aggregate = aggregate_class.find_or_create(aggregate_id) check_conflict!(aggregate, revision) if concurrency == :optimistic aggregate.public_send(command, **data) aggregate.save!(concurrency: concurrency) end
Private Instance Methods
check_conflict!(aggregate, revision)
click to toggle source
# File lib/akasha/command_router/optimistic_transactor.rb, line 61 def check_conflict!(aggregate, revision) return if revision.nil? || revision == aggregate.revision raise StaleRevisionError, "Conflict detected; expected: #{revision} got: #{aggregate.revision}" end
parse_options!(options)
click to toggle source
# File lib/akasha/command_router/optimistic_transactor.rb, line 52 def parse_options!(options) concurrency = options[:concurrency] || :optimistic revision = options[:revision] if concurrency == :none && !revision.nil? raise ArgumentError, "Unexpected revision #{revision.inspect} when concurrency set to #{concurrency.inspect}" end [concurrency, revision] end