Other posts from this series:

This is one of the topics I've already talked about on several presentations in the past, but never actually written about. Here I want to cover a topic of Conflict-free Replicated Data Types: what problems they aim to solve and provide some basic implementations (in F#) to help you understand how to build them.

A motivation

Conflict-free Replicated Data Types are answer for a common problem of synchronizing data in distributed environments. While issues on that field where well-known and there were numerous attempts to solve it in the past, usually they were a variants of decentralized 2-phase commit transactions. However they all suffer for similar problems:

  • We are assuming, that all participants are available for the time of the transaction, which often (i.e. in case of edge computing and mobile apps) is not true.
  • Multi-phase and quorum-based commits require numerous round trips between communicating parties. That comes with a cost in terms of latency and throughput. Moreover this approach has tendency to degrade at scale, when number of machines or distance between them increases. In many cases it's not feasible to use distributed transaction across boundaries of a single data center.
  • They often determine a single master node used for exclusive write access or as a single source of truth. In some cases - as the ones mentioned above - it's not a feasible solution.

This doesn't mean, that existing approaches are bad. However what we are striving for is to give a wider area of possible solutions, that may better fit the problem under certain conditions. One of the examples, that I like to use to visualize the problem is that:

Imagine that you need to build a planet-scale video streaming service. Whenever a user uploads a video, we are replicating it across different data centers located on different continents to maintain good throughput and latency, and in result a better user experience. Additionally we want to show users a view count for that video.

Video uploading is a good example of master-slave replication. However things may complicate for such a small feature as view count. With many concurrent users over the entire planet and write-heavy characteristics, using the same approach for counter increments is not a great idea, as this may end up with congestion for more popular videos. We don't need transactions, as the nature of the problem allows us to loose constrains of strong consistency in favor of higher availability of our application. However as described earlier most of the existing solutions are based on exclusive write access to a replicated resource. This is where CRDTs and multi-master scenarios come to play.

Use cases and implementations

While this was quite simple example, there are many others that we can look up for in current industry:

  • Amazon uses CRDTs to keep their order cart in sync. They've also published their database known as Dynamo, which allows AWS audience to make use of CRDTs.
  • Riak is one of the most popular solutions in this area. One of their well-known customers are Riot games (company behind League of Legend), which uses Riak to implement their in-game chat.
  • Rovio (company behind Angry Birds game series) uses conflict-free counters for their advertisement platform to make their impression counters work freely even in offline scenarios.
  • SoundClound has their own implementation of Last-Write-Wins Set build in Go on top of Redis, known as Roshi, which they use for their observers management.
  • TomTom makes use of CRDTs to manage their navigation data.
  • CREAustralia uses them for their click stream analytics.

There are also other solutions around there:

  • AntidoteDB is another, pretty innovative approach to eventually consistent databases. One of its unique feature is transaction support in eventually consistent environment.
  • Akka.DistributedData is a plugin for Akka (and Akka.NET) distributed actor programming model, which exposes several CRDT types on top of Akka cluster.
  • Redislabs offers CRDB as part of their enterprise Redis database solution.
  • Cassandra and ScyllaDB allow their users to make use of eventually consistent counters in their databases.

What does it mean to be conflict-free?

Conflict-free is a vague description, but it comes to a simple statement: we are operating on a data structures, that don't require exclusive write access and are able to detect concurrent updates and perform deterministic, automatic conflict resolution. This doesn't mean that conflict doesn't ever occur, but we are able to always determine the output up front, based on a metadata contained within the structure itself. The core structures here are counters, registers and sets, but from them we can compose more advanced ones like maps, graphs or even JSON.

Types of CRDTs

We can discriminate CRDTs using two core categories: state-based (convergent) and operation-based (commutative) data types. No matter which one we talk about, they all consists of two parts: replication protocol and state application algorithms.

In practice both versions differ heavily on implementation and their "center of gravity" is focused in different place. Since we need some additional metadata to provide automatic conflict resolution: state-based CRDTs encapsulate it as part of the data structure, while operation-based tend to put more of it onto replication protocol itself. Here, I'm going to cover one of the simpler state-based approaches.

Convergent replicated data types

The single most important operation of state-based CRDTs is Merge method. What it does is essentially to take a two corresponding replicas of the same logical entity, and produce an updated state as an output. If any conflicts occur, it's up to merge operation to resolve them.

Moreover merge operation must conform to three properties, which give us great perks for using them:

  • Commutativity (x • y = y • x) and Asociativity ((x • y) • z = x • (y • z)) which means that we can perform out of order merge operations and still end up with correct state.
  • Idempotency (x • x = x), so we don't need to care about potential duplicates send from replication layer.

Those properties are not easy to guarantee, but you're going to see how far we can go only by using two basic operations, which meet those criteria:

  • union of two sets
  • maximum of two values

With them in our hands, the only requirement on our replication layer is to eventually dispatch all state changes to all replicas. There is however a problem with this: to perform those merge operations we need to carry whole data structure on every change. Imagine that we need to send the entire collection of 1000 elements over the wire, only because someone added one extra element. This problem can be solved by the approach known as delta-state CRDTs, which I'll discuss another time.

Now, lets cover some basic data structures and see how we could compose them into more advanced ones. Please keep in mind, that those examples are mind to be simple and present you the approach to the problem.

Counters

Counters are the first type of CRDTs, we'll cover here. Essentially they allow to read, increment and (optionally) decrement a counter value concurrently on many different machines, without worrying about locking.

Growing-only Counter

Also known as GCounter. It's a counter which value can only be ever increasing. One of it's use cases could be a page view counter I've referred to in the motivation section. Simply speaking it's a map of replica-id/partial-counter values.

In order to calculate total counter's value, we need to sum the values of all known counters specific to particular replicas.

If we want to increment a counter's value, we simply increment a partial counter in the context of the replica, we're working with at the moment. It's crucial, that a single replica should never increment the value of another replica.

module GCounter =
    type GCounter = Map<ReplicaId, int64>
    let zero: GCounter = Map.empty
    let value (c: GCounter) = 
        c |> Map.fold (fun acc _ v -> acc + v) 0L
    let inc r (c: GCounter) =
        match Map.tryFind r c with
        | Some x -> Map.add r (x + 1) c
        | None   -> Map.add r 1 c
    let merge (a: GCounter) (b: GCounter): GCounter =
        a |> Map.fold (fun acc ka va ->
            match Map.tryFind ka acc with
            | Some vb -> Map.add ka (max va vb) acc
            | None    -> Map.add ka va acc) b

When it comes to a merge operation, we simply concatenate key/value entries of both counters. When we detect that both counters have different values for the same replica, we simply take a max of both values. This is a correct behavior, since we know that counter values could only be incremented. For the same reason, the decrement operation is not supported by this kind of CRDT.

Increment/decrement Counter

Now, lets talk about so called PNCounter, which is able to provide both increment and decrement operations. I think, it's particularly useful, as it's a simple example to show, how we can compose simple CRDTs to build more advanced ones.

The crucial trick here, is that PNCounter consists of two GCounters - one of them used to count increments and other used for decrements - so decrement operation is simply incrementing GCounter part responsible for counting decrements. Our output value is basically a difference between the two.

module PNCounter =
    type PNCounter = GCounter.GCounter * GCounter.GCounter
    let zero: PNCounter = (GCounter.zero, GCounter.zero)
    let value (inc, dec) = GCounter.value inc - GCounter.value dec
    let inc replica (inc, dec) = (GCounter.inc replica inc, dec)
    let dec replica (inc, dec) = (inc, GCounter.inc replica dec)
    let merge (inc1, dec1) (inc2, dec2) = 
        (GCounter.merge inc1 inc2, GCounter.merge dec1 dec2)

As you can see merge operation is again pretty trivial: a simple merge of corresponding GCounter parts from both PNCounters.

There are also other types of counters, which I won't cover here. One of particularly interesting cases are Bounded Counters, which allow you to provide an arbitrary upper/lower bound on the counter to determine if a target threshold has been reached.

Note about vector clocks

We already mentioned counters implementation. Before we go forward, I think it's a good point to talk about the vector clocks and notion of time.

In many systems, a standard way of defining causality (a happened-before relationship) is by using time stamps. There is a problem however with using them in terms of high-frequency distributed systems:

  • Operating systems, especially executing on machines in different data centers, can be subject of clock skews, which can kick your butt in write-heavy scenarios. Also another anomalies can occur: leap second bugs or even invalid time values happening between two threads.
  • While timestamps can potentially give us information necessary to determine the most recent update (we'll use that in a minute), they won't tell us anything about the "state of the world" at the moment, when update has happened. This means, we cannot detect if one update knew about another, or if they happened concurrently.

This is where vector clocks come to work. They are form of logical clocks, represented by monotonically incremented values, specific for each replica. Sounds much like GCounter we've seen above ;)

Why do we talk about them here? The internal implementation of vector clock is very close to what GCounter looks like. The major difference here is an ability to partially compare two vector clocks.

Unlike standard comparison, partial comparison allows us to determine fourth possible result - an indecisive one, when we are no longer able to determine if two values have lesser, greater or equal relationship. We can use this to recognize concurrent updates, which happened between two clocks.

Here, we'll define vector clocks as:

type Ord =
    | Lt = -1  // lower
    | Eq = 0   // equal
    | Gt = 1   // greater
    | Cc = 2   // concurrent

type VTime = GCounter.GCounter
module VClock =
    let zero = GCounter.zero
    let inc = GCounter.inc
    let merge = GCounter.merge
    let compare (a: VTime) (b: VTime): Ord = 
        let valOrDefault k map =
            match Map.tryFind k map with
            | Some v -> v
            | None   -> 0L
        let akeys = a |> Map.toSeq |> Seq.map fst |> Set.ofSeq
        let bkeys = b |> Map.toSeq |> Seq.map fst |> Set.ofSeq
        (akeys + bkeys)
        |> Seq.fold (fun prev k ->
            let va = valOrDefault k a
            let vb = valOrDefault k b
            match prev with
            | Ord.Eq when va > vb -> Ord.Gt
            | Ord.Eq when va < vb -> Ord.Lt
            | Ord.Lt when va > vb -> Ord.Cc
            | Ord.Gt when va < vb -> Ord.Cc
            | _ -> prev ) Ord.Eq

The comparison function, even thou long, is pretty simple - we'll compare pairwise entries of both VTime maps (if an entry didn't exist on the opposite side, we count its value as 0):

  • If all values of corresponding replicas are equal, clocks are equal:
  • If all values on the left side are lower than or equal to their counterparts on the right side, left side is lesser than the right one.
  • If all values on the left side are greater than or equal to their counterparts on the right side, left side is greater than the right one.
  • Any mix of lesser/greater entries comparison means, that we detected a concurrent update.

If you're more interested about the topic of time in distributed systems, I can recommend you a great talk about this subject: Keeping Time in Real Systems by Kavya Joshi.

Registers

The next type of CRDTs are registers. You can think of them as value cells, that are able to provide CRDT semantic over any defined type. Remember, that we're still constrained by commutativity/associativity/idempotency rules. For this reason we must apply additional metadata, which will allow us to provide arbitrary conflict resolution in case of conflict detection.

Last Write Wins Register

The most obvious way to solve conflicts, we already have talked about earlier, is to use timestamps. This is exactly what our implementation of LWWReg uses.

module LWWReg =
    type LWWReg<'a> = 'a * DateTime
    let zero: LWWReg<'a> = (Unchecked.defaultof<'a>, DateTime.MinValue)
    let value (v, _) = v
    let set c2 v2 (v1, c1) = if c1 < c2 then (v2, c2) else (v1, c1)
    let merge (v1, c1) (v2, c2) = if c1 < c2 then (v2, c2) else (v1, c1)

It's quite obvious. Our set and merge operations simply compare two registers and pick register's value with a higher timestamp.

Just like in previous cases, we'll be able to compose LWW registers with other CRDTs to provide more advanced operations.

Sets

Once we've covered counters and registers, it's time to talk about collections. The most natural candidate there are different variations of sets - simply because set union conforms to associativity/commutativity/idempotency properties mentioned before. Later on, we could use them to define structures like maps, graphs or even indexed linear sequences (useful i.e. in collaborative text editing).

Growing-only Set

Just like in case of counters, here the most basic example is a growing-only set, also known as GSet. One of its cases could be i.e. a voting system, where we'd like to tell if a person has participated in voting, while still making his/her vote anonymous (in this case a total voting result could be GCounter itself).

module GSet =
    type GSet<'a when 'a: comparison> = Set<'a>
    let zero: GSet<'a> = Set.empty
    let value (s: GSet<'a>) = s
    let add v (s: GSet<'a>) = Set.add v s
    let merge (a: GSet<'a>) (b: GSet<'a>) = a + b

No, it's not trolling. It's just a standard set! :) The only difference here is that we constrain ourselves not to perform any removals on the set. The reason for that is merge operator: since our merge is just a standard union, if we'd remove any element from any of replicas, after merging it with another replica (where that removal hasn't happened yet), removed element with auto-magically reappear in the result set.

There's also a lesson here: because we removed an element from the set, we lost some data. This is something, we often cannot afford in case of CRDTs and it's the reason, why we often must attach some additional metadata, even thou it may seem not to be explicitly needed by the result value.

2-Phase Set

The next step is two phase set. Like in case of PNCounter, we could simply combine two GSets - one for added elements, and one for removed ones (often referred to as tombstones). Add/remove element and merge also works pretty much like in case of PNCounter/GCounter.

module PSet = 
    type PSet<'a when 'a: comparison> = GSet.GSet<'a> * GSet.GSet<'a>
    let zero: PSet<'a> = (GSet.zero, GSet.zero)
    // (add, rem) is a single PSet instance
    let value (add, rem) = add - rem  
    let add v (add, rem) = (GSet.add v add, rem)
    let rem v (add, rem) = (add, GSet.add v rem)
    let merge (add1, rem1) (add2, rem2) = 
        (GSet.merge add1 add2, GSet.merge rem1 rem2)

There are several problems with following implementation:

  • Common case of tombstone-based sets is the fact that removed set can grow infinitely, so that final value set will take only fraction of size of actual metadata necessary to keep the sets consistent. There are extra algorithms - known as tombstone pruning - to mitigate that problem.
  • While we are able to remove added element, the problem appears when we'll try to add removed element again. Since we're don't have any semantic to remove elements from any of the underlying GSets, once removed, element will stay in tombstone forever. This will cause removing it from the final value set. So no re-adding the value for you my friend. Again, we need some extra metadata that will allow us to track causality to determine when add/remove have happened.

Observed Remove Set

If you kept up to this point, congratulations! We're actually going to make a first semi-advanced case here: an observed remove set (known as ORSet), which will allow us to freely add/remove elements and still converge when merging replicas from different locations.

How does it work? We'll represent our ORSet as add/remove collections, but this time instead of sets, we'll use maps. The keys in those maps will be our elements, while values will be a (partially) comparable timestamps used to mark, when the latest add/remove has happened.

The actual specialization of ORSet depends on the timestamp and conflict resolution algorithm used:

  • You could use DateTime for timestamps and prefer the latest value on conflict resolution. This will essentially give us Last Write Wins semantics (just like in LWWReg) over particular elements of the set. This would greatly simplify things, but we're going to do better than that :)
  • Other approach is to use vector clocks, we defined earlier in this post. This will allow us to detect, when two replicas have added/removed the same element without knowing about other parties trying to do the same. When such case is detected, we need to arbitrary tell, what the outcome of our conflict resolution algorithm will be. The most common case is usually preferring additions over removals. This is known as Add-Wins Observed Remove Set (shortly AWORSet). This is what we'll implement here.
module ORSet =
    type ORSet<'a when 'a: comparison> = Map<'a, VTime> * Map<'a, VTime>
    let zero: ORSet<'a> = (Map.empty, Map.empty)

To get the result set, our value function will iterate over add map and remove from it all entries from removals map, where add timestamp is lower than remove timestamp (this means that if both updates were concurrent, we keep the result).

    let value (add, rem) = 
        rem |> Map.fold(fun acc k vr ->
            match Map.tryFind k acc with
            | Some va when VClock.compare va vr = Ord.Lt -> Map.remove k acc
            | _ -> acc) add

Just like in case of PNCounter our add/remove operations need to work in context of a particular replica r - this is the result of using vector clocks as timestamps. Here, we'll simply add element to corresponding map and increase it's vector clock.

    let add r e (add, rem) =
        match Map.tryFind e add, Map.tryFind e rem with
        | Some v, _ -> (Map.add e (VClock.inc r v) add, Map.remove e rem)
        | _, Some v -> (Map.add e (VClock.inc r v) add, Map.remove e rem)
        | _, _ -> (Map.add e (VClock.inc r VClock.zero) add, rem)

    let remove r e (add, rem) =
        match Map.tryFind e add, Map.tryFind e rem with
        | Some v, _ -> (Map.remove e add, Map.add e (VClock.inc r v) rem)
        | _, Some v -> (Map.remove e add, Map.add e (VClock.inc r v) rem)
        | _, _ -> (add, Map.add e (VClock.inc r VClock.zero) rem)

Additional thing, you may have noticed here is that we use Map.remove. It's safe to do so in this context, as we at the same time add value-timestamp pair to the opposite map, still keeping the information about element presence inside an object.

The most complex part is actual merge function. We start from simply squashing corresponding add/remove maps (in case of conflicting timestamps, we will simply merge them together). Then what we need is to converge merged add/remove maps by removing from add map all values with timestamps lower than corresponding entry timestamps in remove map (this already covers concurrent update case, as we decided to favor additions over removals at the beginning). For the remove set, we'll simply remove all elements with timestamps lower, equal or concurrent to the ones from add map. Just to keep things fairly compact.

    let merge (add1, rem1) (add2, rem2) =
        let mergeKeys a b =
            b |> Map.fold (fun acc k vb ->
                match Map.tryFind k acc with
                | Some va -> Map.add k (VClock.merge va vb) acc
                | None -> Map.add k vb acc ) a
        let addk = mergeKeys add1 add2
        let remk = mergeKeys rem1 rem2
        let add = remk |> Map.fold (fun acc k vr ->
            match Map.tryFind k acc with
            | Some va when VClock.compare va vr = Ord.Lt -> Map.remove k acc
            | _ -> acc ) addk
        let rem = addk |> Map.fold (fun acc k va ->
            match Map.tryFind k acc with
            | Some vr when VClock.compare va vr = Ord.Lt -> acc
            | _ -> Map.remove k acc ) remk
        (add, rem)

You can see here, we're using a map of vector clocks (which are also maps), which is a sub-optimal solution for this implementation. There are different ways used to mitigate this problem:

  1. The simplest way is to compress binary ORSet payload using for example L4Z or GZip.
  2. More advanced approach is to modify the existing implementation using Dotted vector versions.

But I hope to write about them another time.

What next?

With this set of data structures in our arsenal, we could build more advanced ones:

  • One example would be a Last-Write-Wins Map, which essentially looks somewhat like type LWWMap<'k,'v> = ORSet<('k * LWWReg<'v>)> (keep in mind, that comparison should depend only on key component).
  • Another one would be a graph structure, which is composed of two sets: one for nodes and one for edges.

As you may see, this is quite big topic and IMHO a pretty interesting one. There are many more things, like delta-state based optimizations nad operation-based CRDTs, which I won't cover here - simply to not turn this post into a book - but I hope to continue the upcoming posts.