class RR::Replicators::TwoWayReplicator
This replicator implements a two way replication. Options:
-
:
left_change_handling
, :right_change_handling
: Handling of records that were changed only in the named database. Can be any of the following:-
:
ignore
: No action. -
:
replicate
: Updates other database accordingly. Default Setting -
Proc
object: If a Proc object is given, it is responsible for dealing with the record. Called with the following parameters:-
replication_helper: The current
ReplicationHelper
instance. -
difference: A
ReplicationDifference
instance describing the change
-
-
-
:
replication_conflict_handling
: Handling of conflicting record changes. Can be any of the following:-
:
ignore
: No action. Default Setting -
:
left_wins
: The right database is updated accordingly. -
:
right_wins
: The left database is updated accordingly. -
:
later_wins
: The more recent change is replicated. (If both changes have same age: left change is replicated) -
:
earlier_wins
: The less recent change is replicated. (If both records have same age: left change is replicated) -
Proc
object: If a Proc object is given, it is responsible for dealing with the record. Called with the following parameters:-
replication_helper: The current
ReplicationHelper
instance. -
difference: A
ReplicationDifference
instance describing the changes
-
-
-
:
logged_replication_events
: Specifies which types of replications are logged. Is either a single value or an array of multiple ones. Default: [:ignored_conflicts] Possible values:-
:
ignored_changes
: log ignored (but not replicated) non-conflict changes -
:
all_changes
: log all non-conflict changes -
:
ignored_conflicts
: log ignored (but not replicated) conflicts -
:
all_conflicts
: log all conflicts
-
Example of using a Proc object for custom behaviour:
lambda do |rep_helper, diff| # if specified as replication_conflict_handling option, logs all # conflicts to a text file File.open('/var/log/rubyrep_conflict_log', 'a') do |f| f.puts <<-end_str #{Time.now}: conflict * in table #{diff.changes[:left].table} * for record '#{diff.changes[:left].key}' * change type in left db: '#{diff.changes[:left].type}' * change type in right db: '#{diff.changes[:right].type}' end_str end end
Constants
- CONFLICT_STATE_MATRIX
Specifies how to clear conflicts. The outer hash keys describe the type of the winning change. The inner hash keys describe the type of the loosing change. The inser hash values describe the action to take on the loosing side.
- MAX_REPLICATION_ATTEMPTS
How often a replication will be attempted (in case it fails because the record in question was removed from the source or inserted into the target database after the
ReplicationDifference
was loaded- OTHER_SIDE
Shortcut to calculate the “other” database.
Attributes
The current ReplicationHelper
object
Public Class Methods
Provides default option for the replicator. Optional. Returns a hash with key => value pairs.
# File lib/rubyrep/replicators/two_way_replicator.rb, line 65 def self.default_options { :left_change_handling => :replicate, :right_change_handling => :replicate, :replication_conflict_handling => :ignore, :logged_replication_events => [:ignored_conflicts], } end
Initializes the TwoWayReplicator
Raises an ArgumentError if any of the replication options is invalid.
Parameters:
-
rep_helper
: TheReplicationHelper
object providing information and utility functions.
# File lib/rubyrep/replicators/two_way_replicator.rb, line 132 def initialize(rep_helper) self.rep_helper = rep_helper validate_change_handling_options validate_conflict_handling_options validate_logging_options end
Public Instance Methods
Helper for execution of insert / update / delete attempts. Wraps those attempts into savepoints and handles exceptions.
Note: Savepoints have to be used for PostgreSQL (as a failed SQL statement will otherwise invalidate the complete transaction.)
-
action
: short description of change (e. g.: “update” or “delete”) -
source_db
: either :left
or :right
- source database of replication -
target_db
: either :left
or :right
- target database of replication -
diff
: the currentReplicationDifference
instance -
remaining_attempts
: the number of remaining replication attempts for this difference
# File lib/rubyrep/replicators/two_way_replicator.rb, line 272 def attempt_change(action, source_db, target_db, diff, remaining_attempts) begin rep_helper.session.send(target_db).execute "savepoint rr_#{action}_#{remaining_attempts}" yield unless rep_helper.new_transaction? rep_helper.session.send(target_db).execute "release savepoint rr_#{action}_#{remaining_attempts}" end rescue Exception => e rep_helper.session.send(target_db).execute "rollback to savepoint rr_#{action}_#{remaining_attempts}" diff.amend replicate_difference diff, remaining_attempts - 1, "#{action} failed with #{e.message}" end end
Attempts to delete the source record from the target database.
-
if +source_db is :
left
, then the record is deleted in database
-
:right
.
-
source_db
: either :left
or :right
- source database of replication -
diff
: the currentReplicationDifference
instance -
remaining_attempts
: the number of remaining replication attempts for this difference -
target_key
: a column_name => value hash identifying the source record
# File lib/rubyrep/replicators/two_way_replicator.rb, line 294 def attempt_delete(source_db, diff, remaining_attempts, target_key) change = diff.changes[source_db] target_db = OTHER_SIDE[source_db] target_table = rep_helper.corresponding_table(source_db, change.table) attempt_change('delete', source_db, target_db, diff, remaining_attempts) do number_updated = rep_helper.delete_record target_db, target_table, target_key if number_updated == 0 diff.amend replicate_difference diff, remaining_attempts - 1, "target record for delete vanished" else log_replication_outcome source_db, diff end end end
Attempts to read the specified record from the source database and insert it into the target database. Retries if insert fails due to missing source or suddenly existing target record.
-
source_db
: either :left
or :right
- source database of replication -
diff
: the currentReplicationDifference
instance -
remaining_attempts
: the number of remaining replication attempts for this difference -
source_key
: a column_name => value hash identifying the source record
# File lib/rubyrep/replicators/two_way_replicator.rb, line 211 def attempt_insert(source_db, diff, remaining_attempts, source_key) source_change = diff.changes[source_db] source_table = source_change.table target_db = OTHER_SIDE[source_db] target_table = rep_helper.corresponding_table(source_db, source_table) values = rep_helper.load_record source_db, source_table, source_key if values == nil diff.amend replicate_difference diff, remaining_attempts - 1, "source record for insert vanished" else attempt_change('insert', source_db, target_db, diff, remaining_attempts) do rep_helper.insert_record target_db, target_table, values log_replication_outcome source_db, diff end end end
Attempts to read the specified record from the source database and update the specified record in the target database. Retries if update fails due to missing source
-
source_db
: either :left
or :right
- source database of replication -
diff
: the currentReplicationDifference
instance -
remaining_attempts
: the number of remaining replication attempts for this difference -
source_key
: a column_name => value hash identifying the source record -
target_key
: a column_name => value hash identifying the source record
# File lib/rubyrep/replicators/two_way_replicator.rb, line 237 def attempt_update(source_db, diff, remaining_attempts, source_key, target_key) source_change = diff.changes[source_db] source_table = source_change.table target_db = OTHER_SIDE[source_db] target_table = rep_helper.corresponding_table(source_db, source_table) values = rep_helper.load_record source_db, source_table, source_key if values == nil diff.amend replicate_difference diff, remaining_attempts - 1, "source record for update vanished" else attempt_change('update', source_db, target_db, diff, remaining_attempts) do number_updated = rep_helper.update_record target_db, target_table, values, target_key if number_updated == 0 diff.amend replicate_difference diff, remaining_attempts - 1, "target record for update vanished" else log_replication_outcome source_db, diff end end end end
Helper function that clears a conflict by taking the change from the specified winning database and updating the other database accordingly.
-
source_db
: the winning database (either :left
or :right
) -
diff
: theReplicationDifference
instance -
remaining_attempts
: the number of remaining replication attempts for this difference
# File lib/rubyrep/replicators/two_way_replicator.rb, line 161 def clear_conflict(source_db, diff, remaining_attempts) source_change = diff.changes[source_db] target_db = OTHER_SIDE[source_db] target_change = diff.changes[target_db] target_action = CONFLICT_STATE_MATRIX[source_change.type][target_change.type] source_key = source_change.type == :update ? source_change.new_key : source_change.key target_key = target_change.type == :update ? target_change.new_key : target_change.key case target_action when :insert attempt_insert source_db, diff, remaining_attempts, source_key when :update attempt_update source_db, diff, remaining_attempts, source_key, target_key when :delete attempt_delete source_db, diff, remaining_attempts, target_key end end
Logs replication of the specified difference as per configured :replication_conflict_logging
/ :left_change_logging
/ :right_change_logging
options.
-
winner
: Either the winner database (:left
or :right
) or :ignore
-
diff
: theReplicationDifference
instance
# File lib/rubyrep/replicators/two_way_replicator.rb, line 183 def log_replication_outcome(winner, diff) options = rep_helper.options_for_table(diff.changes[:left].table) option_values = [options[:logged_replication_events]].flatten # make sure I have an array if diff.type == :conflict return unless option_values.include?(:all_conflicts) or option_values.include?(:ignored_conflicts) return if winner != :ignore and not option_values.include?(:all_conflicts) outcome = {:left => 'left_won', :right => 'right_won', :ignore => 'ignored'}[winner] else return unless option_values.include?(:all_changes) or option_values.include?(:ignored_changes) return if winner != :ignore and not option_values.include?(:all_changes) outcome = winner == :ignore ? 'ignored' : 'replicated' end rep_helper.log_replication_outcome diff, outcome end
Called to replicate the specified difference.
-
:
diff
:ReplicationDifference
instance -
:
remaining_attempts
: how many more times a replication will be attempted -
:
previous_failure_description
: why the previous replication attempt failed
# File lib/rubyrep/replicators/two_way_replicator.rb, line 314 def replicate_difference(diff, remaining_attempts = MAX_REPLICATION_ATTEMPTS, previous_failure_description = nil) raise Exception, previous_failure_description || "max replication attempts exceeded" if remaining_attempts == 0 options = rep_helper.options_for_table(diff.changes[:left].table) if diff.type == :left or diff.type == :right key = diff.type == :left ? :left_change_handling : :right_change_handling option = options[key] if option == :ignore log_replication_outcome :ignore, diff elsif option == :replicate source_db = diff.type change = diff.changes[source_db] case change.type when :insert attempt_insert source_db, diff, remaining_attempts, change.key when :update attempt_update source_db, diff, remaining_attempts, change.new_key, change.key when :delete attempt_delete source_db, diff, remaining_attempts, change.key end else # option must be a Proc option.call rep_helper, diff end elsif diff.type == :conflict option = options[:replication_conflict_handling] if option == :ignore log_replication_outcome :ignore, diff elsif option == :left_wins clear_conflict :left, diff, remaining_attempts elsif option == :right_wins clear_conflict :right, diff, remaining_attempts elsif option == :later_wins winner_db = diff.changes[:left].last_changed_at >= diff.changes[:right].last_changed_at ? :left : :right clear_conflict winner_db, diff, remaining_attempts elsif option == :earlier_wins winner_db = diff.changes[:left].last_changed_at <= diff.changes[:right].last_changed_at ? :left : :right clear_conflict winner_db, diff, remaining_attempts else # option must be a Proc option.call rep_helper, diff end end end
Verifies if the :left_change_handling
/ :right_change_handling
options are valid. Raises an ArgumentError if an option is invalid
# File lib/rubyrep/replicators/two_way_replicator.rb, line 91 def validate_change_handling_options [:left_change_handling, :right_change_handling].each do |key| rep_helper.session.configuration.each_matching_option(key) do |table_spec, value| unless value.respond_to? :call verify_option table_spec, [:ignore, :replicate], key, value end end end end
Verifies if the given :replication_conflict_handling
options are valid. Raises an ArgumentError if an option is invalid.
# File lib/rubyrep/replicators/two_way_replicator.rb, line 103 def validate_conflict_handling_options rep_helper.session.configuration.each_matching_option(:replication_conflict_handling) do |table_spec, value| unless value.respond_to? :call verify_option table_spec, [:ignore, :left_wins, :right_wins, :later_wins, :earlier_wins], :replication_conflict_handling, value end end end
Verifies if the given :replication_logging
option /options is / are valid. Raises an ArgumentError if invalid
# File lib/rubyrep/replicators/two_way_replicator.rb, line 115 def validate_logging_options rep_helper.session.configuration.each_matching_option(:logged_replication_events) do |table_spec, values| values = [values].flatten # ensure that I have an array values.each do |value| verify_option table_spec, [:ignored_changes, :all_changes, :ignored_conflicts, :all_conflicts], :logged_replication_events, value end end end
Checks if an option is configured correctly. Raises an ArgumentError if not.
-
table_spec
: the table specification to which the option belongs. May benil
. -
valid_option_values
: array of valid option values -
option_key
: the key of the option that is to be checked -
option_value
: the value of the option that is to be checked
# File lib/rubyrep/replicators/two_way_replicator.rb, line 79 def verify_option(table_spec, valid_option_values, option_key, option_value) unless valid_option_values.include? option_value message = "" message << "#{table_spec.inspect}: " if table_spec message << "#{option_value.inspect} not a valid #{option_key.inspect} option" raise ArgumentError.new(message) end end