Today we're going to cover how to build a complex, JSON-like document CRDT. In the past, we focused on homogeneous data types like registers, sets, arrays or maps. This time we're going to combine them all and tackle some of the challenges that this approach presents.

Other blog posts from this series:

Prerequisites

While this post reuses many concepts we learned previously in this series, it vaguely covers their properties in scope required to implement document CRDT. However if you want to get better intuition or find it not sufficient, you may want to read (at least fragments of) other blog posts concerning vector clocks, reliable causal broadcast protocol, multi-value registers and linear sequences.

We'll use snippets from source code that can be found here. Feel free to use it to follow along.

Document representation

A conflict-free replicated JSON document can be basically looked at as a tree of nodes. We can differentiate two different types of them:

  • Leafs which are used to store primitive types. In JSON we have several of them. We'll use: int, float, string, bool and null, all wrapped as a cases of discriminated union, which we're going to call Primitive.
  • Branches represent more complex containers, like maps and arrays.

Since we're talking about JSON-like structure, our data type won't have a fixed, static schema. In previous blog posts we were constraining ourselves to develop conflict resolution procedures that would work over objects of the same type. Here we can run into situation, where the same node is concurrently updated with values of different types eg. account.transfers[0].amount = 120 on one replica and account.transfers[0].amount = { value: 100, currency: "usd" } on another.

json-document-crdt-tree-1

This is an expected problem to have. In reality our systems evolve over time and so does the schema we're using. Since we operate in decentralized peer-to-peer fashion, we cannot really stop others from submitting outdated updates. This means that while one peer may be in version 2.0 using new schema, it still might talk with a peer in v1.0 who was offline for long time, building up its own state according to old schema. We cannot just ignore it or otherwise we can end up with corrupted state. We need to be ready to embrace that.

In the past we talked about two methods of dealing with updating primitive values in conflict-free way: Last-Write-Wins and Multi-Value registers. Here, we're going with the latter approach and will extend it over different types living under the same node - so in case of concurrent updates, we'll show all conflicting values, expecting user to override them manually. Some people may find last write wins might to be more convenient approach, but it also comes with a risk of data lost (as old values are automatically discarded).

For arrays, we'll use LSeq defined in the past - and reuse its VPtr and generateSeq definitions, as they stay exactly the same. LSeq is easy to construct and its behavior maps well enough for a JSON array. Finally for object/map we're going with something inspired by an Observed-Remove Map. We'll try to maintain Add-Wins semantics over the entire document - so in case of entry being updated on one replica and deleted on another one, we always will resolve such conflict in favor of updates.

With these semantics and tradeoffs defined up-front, let's start with core types definitions:

type Vertex<'a> = (VPtr * 'a) // a single LSeq element

type Node = Entry list
and Entry =
  | Leaf of VTime * Primitive // Int, Float, String, Bool or Null
  | Array of VTime list * Vertex<Node>[]
  | Object of VTime list * Map<string,Node>

As you can see in a snippet above, our node is in fact a list of concurrently updated values. It can consists of multiple Leafs (as they may represent concurrent conflicts of multi-value register), while our branch entries (Array and Object cases) will always exist only once within the node. It's because they represent a complex containers that define their own conflict resolution strategies.

We want to keep information when a particular branch was updated by many events concurrently - so that we can detect concurrent conflicts - therefore each entry type has a vector clock (VTime) associated with it. For Leaf its a single value, representing when corresponding value was updated for the last time. Branch types (Array and Object) may contain multiple inner entries updated concurrently by different events. Hence for them we store concurrent (and only concurrent) updates as a VTime list.

In terms of allowed operations we'll stick to the basics: adding, updating and removing a primitive value under a path that can consist of object fields and/or array indexes. We don't consider specialized operations like incrementing counter value or text operations: if you want to have support for them, keep in mind that in context of CRDTs they require specialized node types and operation handling - intent is a king here and for concurrent/distributed operations there's a huge difference between a = a + 1 and increment(&a). We discussed these algorithms and data structures already (see links at the beginning of a post), but here we'll skip them for sake of simplicity.

type Command =
  | Assign   of Primitive   
  | Remove                          
  | Update   of key:string * Command
  | UpdateAt of index:int * Command 
  // append at the end by `InsertAt(list.Length, Assign (String "value"))`
  | InsertAt of index:int * Command 

One thing you may notice here is that while there's no semantic difference between inserting and updating an object field, it's not the case for inserts/updates on array indexes - inserting a new item at index 0 and updating an item at index 0 produce very different results - so we need separate commands for these.

Now once we talked about user-facing commands, let's consider events that actually will be stored and replicated between machines. These events must convey enough metadata to make our operation commutative, so that they can be applied in causal order. Part of that (like operation's vector clock) is already carried as part of our generic reliable causal broadcast protocol. However document also requires some custom information.

type Event =
  | AtKey     of string * Operation
  | AtIndex   of VPtr * Operation
  | Assigned  of Primitive
  | Removed

Our event is a composite of navigation cases (like AtKey and AtIndex) and actual operation type (Assigned and Removed). Interesting thing to notice is that while on the command side we distinguished between InsertAt/UpdateAt, on the event side we have just a single AtIndex. That's because once we supply our event with virtual pointer (VPtr type we borrowed from the LSeq article), we no longer need to worry about mistaking insert with update, as virtual pointers - unlike standard array indexes - are immutable and unique position descriptors for stored elements.

We described the theory behind command→event conversion. Now let's look at the possible implementation:

let rec handle (replicaId: ReplicaId) (node: Node) (cmd: Command) =
  match cmd with
  | Assign value -> Assigned value
  
  | Remove -> Removed
  
  | Update(key, nested) ->
    // get Object component of the node or create it
    let map = Node.getObject node |> Option.defaultValue Map.empty
    let inner = Map.tryFind key map |> Option.defaultValue Node.empty
    AtKey(key, handle replicaId inner nested)
    
  | InsertAt(_, Remove) -> 
    failwith "cannot insert and remove element at the same time"
    
  | InsertAt(i, nested) ->
    // get Array component of the node or create it
    let array = Node.getArray node |> Option.defaultValue [||]
    // LSeq generates VPtr predecesor/successor virtual pointers
    let left = if i = 0 then [||] else (fst array.[i-1]).Sequence  
    let right = if i = array.Length then [||] else (fst array.[i]).Sequence
    let ptr = { Sequence = generateSeq left right; Id = replicaId }
    AtIndex(ptr, handle replicaId Node.empty nested)
    
  | UpdateAt(i, nested) ->
	// we cannot update index at array which doesn't exist
    let array = Node.getArray node |> Option.get 
    let (ptr, inner) = array.[i]
    AtIndex(ptr, handle replicaId inner nested)

You may notice that, unlike in the case of JavaScript, when we want to create a value deep in the path that didn't previously exist eg. {}.account.balance = 100, we'll dynamically create all subsequent objects/arrays required to realize this request. Otherwise we couldn't serve concurrent set/removal operations i.e. when one replica is deleting entire branch upper in the tree, while another one is changing a leaf node value.

I think, the most confusing part of the code above is InsertAt case. It's because it exposes behavior characteristic to LSeq data type. A little reminder: LSeq can be imagined as an array of elements sorted by their virtual pointers. We can realize inserting new element at index n by generating virtual pointer, that's logically greater than virtual pointer of element at index n-1 and smaller that virtual pointer at index n+1. For array edges we can replace them with VPtr.MIN and VPtr.MAX substitutes.

As you may have seen, our command/event allows to only update a single path element at the time. In practice nothing stands on your way to change discriminate union definition to store multiple nested operations instead of one.

Value assignment

Now it's time to talk about commutative event application to our document state. Let's start with the simplest one: assignment of a primitive at given node. Using multi-value register semantics we just add a new leaf entry to our node while keeping all concurrent entries around and discard others.

module Node 

let assign (timestamp: VTime) (value: Primitive) (node: Node) : Node =
  let concurrent = 
    node |> List.filter (fun e -> not (Entry.isBefore timestamp e))
  Leaf(timestamp, value)::concurrent

We haven't defined Entry.isBefore yet - its construction is fairly simple: entry happens before given time, if ALL of entry's timestamps have happened before submitted time.

module Entry

let isBefore (time: VTime) (e: Entry) =
  match e with
  | Leaf(ts, _) -> Version.compare ts time < Ord.Eq
  | Array(ts, _)
  | Object(ts, _) -> 
    ts |> List.forall (fun ts -> Version.compare ts time < Ord.Eq)

You may have say "you've mentioned that we're concerned about concurrent operations, but here we're looking only at happen-before relations". Yes, but keep in mind, that our event application function is backed by properties of Causal Reliable Broadcast protocol we covered in the past. This protocol handles deduplication (case when events have equal timestamps) and will never apply event that would violate partial order (so applying successor event before its strict predecessor). So all remaining options are entries that are either concurrent to or have been updated strictly before current event.

Updating fields

Updating fields is very similar to a value assignment. We take an Object entry and apply a nested event to it. Then we need to check if there are concurrent events happening in scope of current object entry (the VTime list) and keep them. Do the same again, but this time with entries in scope of the node itself. Finally return a new updated Object entry with updated timestamp and remaining (concurrent) entries.

let rec apply replicaId (timestamp: VTime) (node: Node) (op: Event) : Node =
  match op with
  // ... other cases
  | AtKey(key, nested) ->
    // get Object component from the node or create it
    let timestamps, map =
      match List.tryFind (function Object _ -> true | _ -> false) node with
      | Some(Object(timestamps, map)) -> (timestamps, map)
      | _ -> ([], Map.empty)
    let innerNode = Map.tryFind key map |> Option.defaultValue Node.empty
    
    // apply inner event and update current object
    let map = Map.add key (apply replicaId timestamp innerNode nested) map
    // keep concurrent timestamps of a current object entry
    let timestamps = 
      timestamp::(List.filter (fun ts -> Version.compare ts timestamp = Ord.Cc) timestamps)
      
    // keep concurrent entries of a current node
    let concurrent =
      node |> List.choose (fun e ->
        match e with
        | Object _ -> None // we cover object update separately
        | outdated when Entry.isBefore timestamp outdated -> None
        | other -> Some other)
    // attach modified object entry to result node
    (Object(timestamps, map))::concurrent

Array insert/update

Since we handled array insert/update distinction at command handler, within event handler we can treat them almost in the same way. We need to simply use binary search by virtual pointer to find the index of a node to insert update - that's how LSeq handles any potential concurrency conflicts. If virtual pointer under that index is equal to the one passed in event - it's an update. Otherwise it's an insert.

let rec apply replicaId (timestamp: VTime) (node: Node) (op: Event) : Node =
  match op with
  // ... other cases
  | AtIndex(ptr, nested) ->
    // get Array component from the node or create it
    let mutable timestamps, array =
      match List.tryFind (function Array _ -> true | _ -> false) node with
      | Some(Array(timestamps, array)) -> (timestamps, array)
      | _ -> ([], [||])
      
    // find index of a given VPtr or index where it should be inserted
    let i = array |> Array.binarySearch (fun (x, _) -> ptr >= x)
    if i < Array.length array && fst array.[i] = ptr then
      // we're updating existing node
      array <- Array.copy array // defensive copy for sake of update
      let (_, entry) = array.[i]
      array.[i] <- (ptr, apply replicaId timestamp entry nested)
    else
      // we're inserting new node
      let n = (ptr, apply replicaId timestamp Node.empty nested)
      array <- Array.insert i n array
      
    // keep concurrent timestamps of a current array entry
    let timestamps = 
      timestamp::(List.filter (fun ts -> Version.compare ts timestamp = Ord.Cc) timestamps)
    // keep concurrent entries of a current node
    let concurrent =
      node |> List.choose (fun e ->
        match e with
        | Array _ -> None // we cover array update separately
        | outdated when Entry.isBefore timestamp outdated -> None
        | other -> Some other)
    // attach modified array entry to result node
    (Array(timestamps, array))::concurrent

Updating LSeq array elements shares many similarities with updating an object. In both cases we override other non-concurrent entries within the same node and non-concurrent timestamps within the same entry.

Removing nodes

The last remaining operation is node removal. In principle, we can take advantage of timestamps generated by our broadcast protocol and traverse over the tree to remove entries, which have all of their timestamps happening before removal event.

For entries having concurrent updates, we either preserve them (in case of Leafs, which maps directly to add-wins semantics) or have to propagate removal procedure recursively (for branch node entries).

let rec removeNode (tombstone: VTime) (node: Node) : Node option =
  let node = node |> List.choose (fun e ->
    match e with
    | Leaf(timestamp, _) ->
      // remove node with timestamp lower than tombstone
      if Version.compare timestamp tombstone <= Ord.Eq then None else Some e
      
    | Array(timestamps, vertices) ->
      // check if all of array's timestamps are behind tombstone
      if timestamps |> List.forall (fun ts -> Version.compare ts tombstone < Ord.Eq) then
        None // remove node with lower timestamp
      else
        // recursivelly check if other array elements need removal
        let vertices =
          vertices
          |> Array.choose (fun (ptr, node) -> removeNode tombstone node |> Option.map (fun n -> (ptr, n)))
        Some(Array(timestamps, vertices))
        
    | Object(timestamps, fields) -> 
      // check if all of object's timestamps are behind tombstone
      if timestamps |> List.forall (fun ts -> Version.compare ts tombstone < Ord.Eq) then
        None // remove node with lower timestamp
      else
        // recursivelly check if other object fields needs removal
        let fields = fields |> Map.fold (fun acc key value ->
          match removeNode tombstone value with
          | None -> acc
          | Some v -> Map.add key v acc) Map.empty 
        Some(Object(timestamps, fields))
  )
  // if all node entries were removed remove node itself
  if List.isEmpty node then None else Some node

Now, one thing we need to keep in mind is that concurrent removal and updates of nodes which are at the different depth of the document tree may result in state that may be surprising for some. Imagine following scenario: we start with some initial document state { parent: { name: "Alice"}} that's in sync between nodes A and B. Both nodes made a concurrent update: A did parent.surname = "Smith" while B called for delete("parent"). What should be the state of the document after both replicas synchronize again?.

concurrent-insert-remove

As we mentioned, we prefer add-wins semantics, so in our case the resulting object will be { parent: { surname: "Smith" } }. But wait, what has happened to parent.name? Well, since it was assigned prior to delete request, it had lower timestamp and therefore it has been removed. As far as our document CRDT is concerned, parent.name and parent.surname are independent entries, and their updates or removals are happening independently. If you want to prevent that, you'd need to refresh assignment of parent = { name: "Alice", surname: "Smith" } , so that a parent.name field timestamp will be reassigned.

Summary

We presented how to implement a strict core of what could be considered a minimal version of a JSON document using CRDT semantics.

We didn't cover more advanced operations like multi-entry updates, rich data types like counters or edit-capable text fields. Other interesting area would be compressing the metadata footprint or changing our data type to work natively with database storage engines (like LSM trees) so that we could support documents that are larger than RAM. For now let's save these topics for another time.