class Riak::Client::BeefcakeProtobuffsBackend::CrdtOperator
Serializes and writes CRDT operations from {Riak::Crdt::Operation} members into protobuffs, and writes them to a Riak
cluster. @api private
Attributes
backend[R]
Public Class Methods
new(backend)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 22 def initialize(backend) @backend = backend end
Public Instance Methods
operate(bucket, key, bucket_type, operation, options = {})
click to toggle source
Serializes and writes CRDT operations.
# File lib/riak/client/beefcake/crdt_operator.rb, line 27 def operate(bucket, key, bucket_type, operation, options = {}) serialized = serialize(operation) args = { bucket: bucket, key: key, type: bucket_type, op: serialized, return_body: true, }.merge options request = DtUpdateReq.new args begin return backend.protocol do |p| p.write :DtUpdateReq, request p.expect :DtUpdateResp, DtUpdateResp, empty_body_acceptable: true end rescue ProtobuffsErrorResponse => e raise unless e.message =~ /precondition/ raise CrdtError::PreconditionError.new e.message end end
serialize(operations)
click to toggle source
Serializes CRDT operations without writing them.
# File lib/riak/client/beefcake/crdt_operator.rb, line 49 def serialize(operations) return serialize [operations] unless operations.is_a? Enumerable serialize_wrap operations end
Private Instance Methods
inner_serialize(operation)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 98 def inner_serialize(operation) case operation.type when :counter serialize_inner_counter operation when :flag serialize_flag operation when :register serialize_register operation when :set serialize_inner_set operation when :map serialize_inner_map operation else raise ArgumentError, t('crdt.unknown_inner_field', symbol: operation.type.inspect) end end
inner_serialize_delete(operation)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 116 def inner_serialize_delete(operation) MapField.new( name: operation.name, type: type_symbol_to_type_enum(operation.type) ) end
inner_serialize_group(operations)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 82 def inner_serialize_group(operations) updates, deletes = operations.partition do |op| op.value.is_a? Riak::Crdt::Operation::Update end serialized_updates = updates.map do |operation| inner_serialize operation.value end serialized_deletes = deletes.map do |operation| inner_serialize_delete operation.value end { updates: serialized_updates, removes: serialized_deletes } end
serialize_counter(counter_ops)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 123 def serialize_counter(counter_ops) amount = counter_ops.inject(0){|m, o| m += o.value } CounterOp.new(increment: amount) end
serialize_flag(flag_op)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 140 def serialize_flag(flag_op) operation_value = flag_op.value ? MapUpdate::FlagOp::ENABLE : MapUpdate::FlagOp::DISABLE MapUpdate.new( field: MapField.new( name: flag_op.name, type: MapField::MapFieldType::FLAG ), flag_op: operation_value ) end
serialize_group(operations)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 67 def serialize_group(operations) case operations.first.type when :counter serialize_counter operations when :hll serialize_hyper_log_log operations when :set serialize_set operations when :map serialize_map operations else raise ArgumentError, t('crdt.unknown_field', symbol: operation.type.inspect) end end
serialize_hyper_log_log(hyper_log_log_ops)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 175 def serialize_hyper_log_log(hyper_log_log_ops) adds = ::Set.new hyper_log_log_ops.each do |o| adds.add [o.value[:add]] if o.value[:add] end HllOp.new( adds: adds.to_a.flatten ) end
serialize_inner_counter(counter_op)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 128 def serialize_inner_counter(counter_op) MapUpdate.new( field: MapField.new( name: counter_op.name, type: MapField::MapFieldType::COUNTER ), counter_op: CounterOp.new( increment: counter_op.value ) ) end
serialize_inner_map(map_op)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 207 def serialize_inner_map(map_op) inner_op = map_op.value if inner_op.is_a? Riak::Crdt::Operation::Delete return MapUpdate.new(field: MapField.new( name: map_op.name, type: MapField::MapFieldType::MAP ), map_op: MapOp.new( removes: inner_serialize_delete(inner_op)) ) end inner_serialized = inner_serialize inner_op MapUpdate.new( field: MapField.new( name: map_op.name, type: MapField::MapFieldType::MAP ), map_op: MapOp.new( updates: [inner_serialized] )) end
serialize_inner_set(set_op)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 186 def serialize_inner_set(set_op) value = set_op.value or nil MapUpdate.new( field: MapField.new( name: set_op.name, type: MapField::MapFieldType::SET ), set_op: SetOp.new( adds: value[:add], removes: value[:remove] ) ) end
serialize_map(map_ops)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 201 def serialize_map(map_ops) inner_serialized = inner_serialize_group map_ops MapOp.new(inner_serialized) end
serialize_register(register_op)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 151 def serialize_register(register_op) MapUpdate.new( field: MapField.new( name: register_op.name, type: MapField::MapFieldType::REGISTER ), register_op: register_op.value ) end
serialize_set(set_ops)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 161 def serialize_set(set_ops) adds = ::Set.new removes = ::Set.new set_ops.each do |o| adds.add [o.value[:add]] if o.value[:add] removes.merge [o.value[:remove]] if o.value[:remove] end SetOp.new( adds: adds.to_a.flatten, removes: removes.to_a.flatten ) end
serialize_wrap(operations)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 56 def serialize_wrap(operations) raise ArgumentError, t('crdt.serialize_no_ops') if operations.empty? ops = serialize_group operations DtOp.new(wrap_field_for(operations) => ops) end
type_symbol_to_type_enum(sym)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 230 def type_symbol_to_type_enum(sym) MapField::MapFieldType.const_get sym.to_s.upcase end
wrap_field_for(ops)
click to toggle source
# File lib/riak/client/beefcake/crdt_operator.rb, line 63 def wrap_field_for(ops) "#{ops.first.type.to_s}_op".to_sym end