Register CRDTs
Last-Writer-Win Register (LWW Register)
Integer LWW Register (state-based)
Ref: A comprehensive study of Convergent and Commutative Replicated Data Types
Scenario
Last write wins is a popular way of dealing with conflicts being the result of concurrent updates in many systems (Cassandra is good example of a database that's quite known from this approach).
One of the issues of last-write-wins approach is its inherent risk of data loss - we took an easy to use API at price of automatically throwing out potentially useful data.
Example code
type Timestamp = [timestamp: Date, replicaId: string];
type LWWRegister<T> = {
Timestamp: Timestamp;
Value: T | null;
};
type Operation<T> = [dateTime: Date, value: T | null];
type Endpoint<T> = CRDT.Endpoint<
LWWRegister<T>,
T | null,
T | null,
Operation<T>
>;
const crdt: CRDT.Crdt<
LWWRegister<unknown>,
unknown | null,
unknown | null,
Operation<unknown>
> = {
Default: {
Timestamp: [new Date(-8640000000000000), ""],
Value: null,
},
Query: (crdt) => crdt.Value,
Prepare: (_, value) => [new Date(), value],
Effect: (existing, e) => {
const [at, value] = e.Data;
const timestamp: Timestamp = [at, e.Origin];
if (existing.Timestamp < timestamp) {
return { Timestamp: timestamp, Value: value };
} else {
return existing;
}
},
};
const props = <T>(db: CRDT.Database, replica: string, ctx: CRDT.ActorContext) =>
CRDT.replicator(crdt, db, replica, ctx);
const update = async <T>(
value: T | null,
ref: Endpoint<T>
): Promise<T | null> => {
const command: CRDT.Command<Operation<T>> = ["", [new Date(), value]];
const result = await ref(command);
return result;
};
const query = async <T>(ref: Endpoint<T>): Promise<T | null> => {
const result = await ref(CRDT.Query);
return result;
};
How the code works:
-
First, the code defines a type
Timestampthat represents a tuple of aDateobject and astringthat represents the replica ID. This type is used to order updates from different replicas. -
The code then defines a type
LWWRegister<T>that represents a Last-Write-Wins Register. It consists of aTimestampand aValueof typeT | null. TheTimestamprepresents the last time the register was updated, and theValuerepresents the value of the register. -
The code also defines a type
Operation<T>that represents a tuple of aDateobject and aValueof typeT | null. This is used to represent update operations on the register. -
The code defines a type
Endpoint<T>that represents a CRDT endpoint for an LWWRegister of typeT. This type is defined using theCRDT.Endpointtype provided by the CRDT library. -
The code then defines a constant
crdtthat is an instance of the CRDT type. It implements theDefault,Query,Prepare, andEffectmethods of the CRDT interface. -
The
Defaultmethod returns a default value for the register, which is a timestamp of the earliest possible time and an empty string for the replica ID, and anullvalue. -
The
Querymethod simply returns the current value of the register. -
The
Preparemethod takes a timestamp and a value, and returns them as a tuple representing an update operation. -
The
Effectmethod takes the existing state of the register and an update operation, and returns the new state of the register. It compares the timestamps of the existing state and the update operation, and returns the state with the more recent timestamp and value. -
The
propsfunction takes a database, a replica ID, and an actor context, and returns a replicator function for the LWWRegister. -
The
updatefunction takes a value and an endpoint reference, and returns a promise that resolves to the updated value of the register. It creates a command object that represents the update operation, sends the command to the endpoint, and waits for the response. -
The
queryfunction takes an endpoint reference, and returns a promise that resolves to the current value of the register. It sends a query command to the endpoint and waits for the response.
Sample scenario
Let's say you're building a real-time collaborative document editing application, where multiple users can edit the same document simultaneously. In this application, you need to maintain a Last-Write-Wins Register CRDT to keep track of the last modification made to each section of the document.
In this case, you can represent each section of the document as a key-value pair, where the key is the section ID and the value is a Last-Write-Wins Register that keeps track of the last modification made to that section. When a user wants to make a modification to a section of the document, you can use the Last-Write-Wins Register CRDT to ensure that the latest modification wins.
Here's a sample:
-
Alice and Bob are working on the same document simultaneously. Alice is editing Section A, while Bob is editing Section B.
-
Alice finishes editing Section A and saves her changes. The Last-Write-Wins Register for Section A is updated with Alice's changes.
-
Bob finishes editing Section B and saves his changes. The Last-Write-Wins Register for Section B is updated with Bob's changes.
-
Bob realizes that he needs to make some changes to Section A, which Alice had already edited earlier. Bob retrieves the current value of the Last-Write-Wins Register for Section A and sees Alice's changes.
-
Bob makes his changes to Section A and attempts to save them. However, before Bob's changes can be saved, Alice makes some additional changes to Section A and saves them. The Last-Write-Wins Register for Section A is updated with Alice's latest changes.
-
When Bob attempts to save his changes to Section A again, the Last-Write-Wins Register CRDT resolves the conflict and ensures that Alice's latest changes win. Bob's changes are discarded, and the Last-Write-Wins Register for Section A is updated with Alice's latest changes.
Multi Value Register (MV Register)
MV Register (state-based)
Ref: A comprehensive study of Convergent and Commutative Replicated Data Types
Example code
type Change = {
user: string;
text: string;
};
type DocVersion = {
timestamp: Date;
changes: Change[];
};
type MVRegister = DocVersion[];
type Endpoint = CRDT.Endpoint<MVRegister, DocVersion[], DocVersion[], Change>;
const crdt: CRDT.Crdt<MVRegister, DocVersion[], DocVersion[], Change> = {
Default: [],
Query: (crdt) => crdt.flatMap((v) => v.changes),
Prepare: (_, value) => [{ timestamp: new Date(), changes: [value] }],
Effect: (existing, e) => {
const newVersion: DocVersion = {
timestamp: new Date(),
changes: [...existing[existing.length - 1].changes, e],
};
return [...existing, newVersion];
},
};
const props = (db: CRDT.Database, replica: string, ctx: CRDT.ActorContext) =>
CRDT.replicator(crdt, db, replica, ctx);
const update = async (change: Change, ref: Endpoint): Promise<DocVersion[]> => {
const command: CRDT.Command<Change> = ["", change];
const result = await ref(command);
return result;
};
const query = async (ref: Endpoint): Promise<Change[]> => {
const result = await ref(CRDT.Query);
return result;
};
** How code work **
-
Define the
MVRegistertype alias which represents a list of tuples containing a version timestamp and an optional value of typeT. This is the actual CRDT data structure that we'll be manipulating. -
Define the
Endpointtype alias which represents the client's view of the CRDT. It is essentially a wrapper around the CRDT operations, providing a simplified interface for clients to interact with. -
Define the
crdtconstant which contains the implementation of the CRDT. It is an object literal that implements theCrdtinterface. It defines theDefaultmember which returns an emptyMVRegister, theQuerymember which returns a list of non-null values in the CRDT, thePreparemember which simply returns the given value as-is, and theEffectmember which merges the given event with the existing CRDT state. -
Define the
propsfunction which creates a new CRDT endpoint using the given database, replica ID, and actor context. It does this by calling thereplicatorfunction with thecrdtobject and the given parameters. -
Define the
updatefunction which updates the CRDT with the given value by creating a new command with the value and sending it to the endpoint. It returns a promise that resolves with the updated value. -
Define the
queryfunction which retrieves the current state of the CRDT by sending aQuerycommand to the endpoint. It returns a promise that resolves with the list of non-null values in the CRDT.