Originally, I didn't want to make a separate blog post about design behind bounded counters, but since beside original paper and a very few implementation living in the wild, this CRDT is widely unknown, I've decided to give it a try.
NOTE: we'll be reusing a
PNCounter implementation described in previous blogs posts. So if you're not familiar with the way how they work, please read about them first. If you feel lost, you may also take a look at my repository with sample implementations written in F#.
Other blog posts from this series:
- An introduction to state-based CRDTs
- Optimizing state-based CRDTs (part 1)
- Optimizing state-based CRDTs (part 2)
- State-based CRDTs: Bounded Counter
- State-based CRDTs: Map
- Operation-based CRDTs: protocol
- Operation-based CRDTs: registers and sets
- Operation-based CRDTs: arrays (part 1)
- Operation-based CRDTs: arrays (part 2)
- Operation-based CRDTs: JSON document
- Pure operation-based CRDTs
- CRDT optimizations
We already introduced some of the CRDT counters in the past, namely G-Counter (grow-only counter) and PN-Counter (which gave us both increment and decrement operations). While they have proven to be great in many eventually consistent scenarios working on a massive scale - some proposed ones where web page visit counters, likes/dislikes or click counters. Now let's see how we could extend their design further.
A Bounded Counter variant, we're describing here, was build to address a specific problem: is it possible to build a highly available, eventually consistent counter with an upper/lower limit on its value? This is a useful property, which we could make use of in few scenarios, eg.:
- An primary scenario shown in the research paper: we have an advertisement company, showing ads over mobile devices. When a company buys X impressions (ad displays) on our platform, we want to track them even when devices are periodically offline, and eventually stop showing them when a total number of impressions displayed to our end users has been reached.
- We want to distribute selling event tickets. Once all tickets are sold, we want to block platform from overselling them. All of that with tolerance to network splits, in globally-distributed manner.
- A standard (maybe a bit far-fetched) example of money transfer, in which we want to minimize a risk of customer running into debit.
All of those rely on notion of some kind of boundary value, beyond which we should not allow operations to happen. However, it's not trivial in case of eventually consistent counters, as they don't provide us any "global" counter value, to which we can relate in order to check if our constraint was violated. We only know a "local" value, as observed from the point of view of specific replica.
This means, that we need to shift our approach a little to realize this goal.
The core behavior of our bounded counter (here called
BCounter) is very similar to
PNCounter described in previous posts. However here we'll add one extra constraint: our counter cannot be decremented to reach the value below zero. This concerns only decrements (increments have no bounds). With that:
- We can build a counter which has an upper bound simply by incrementing it ons start, and then reverting the operation (increment → decrement) to create the upper bound.
- We can easily build a counter with a value floating within a provided range of values by combining two bounded counters together and incrementing/decrementing them both.
The core concept behind
BCounter could be represented as purring water between different buckets (replicas): we treat our number as some amount of finite resource - water in this example - which we can pour out, refill or spill from one to another. For this implementation, I'll describe that amount as quota.
There's no magic in there. The rules are simple:
- Each replica has its own part of the total quota to consume.
- Replica can increment (
incoperation) its own quota as it sees fit, therefore increasing a total quota available. This is safe operation, as we don't risk our counter to run below 0 this way.
- Replica can decrement its counter, but only up to it's local quota limit. This mean that our
decmay fail, due to insufficient number of resources to spend. In that case we may need to try again on another replica.
- Replica can transfer part of its own quota (
transferoperation) to another replica. Again it cannot share more that it has, so this operation can also potentially fail.
In this post, we won't describe how replicas should negotiate how to split their quota between each other: IMHO this is very dependent on the particular traits of your system. We'll focus on building a foundation, that will make those three operations (
PNCounter as a base for our
BCounter to keep track of counter's value. Additionally we'll need a matrix, which we'll use to remember the amounts of quota send between each pair of replicas. Conceptually we can present this data structure in a following way:
In this implementation, I'll assume that transfer matrix is a sparse one and I'll represent it as a map of keys (sender → receiver) and values (all quota transferred so far between that pair).
type BCounter = BCounter of PNCounter * transfers:Map<(ReplicaId * ReplicaId), int64>
PNCounter tracks the current known value of the counter, which as we mentioned, should never drop below 0:
let zero = BCounter(PNCounter.zero, Map.empty) let value (BCounter(pn, _)) = PNCounter.value pn
Now, the most important thing there is to calculate the quota. Ultimately it's pretty simple: we pick current counter's value, add all quota transferred to it and subtract of of the quota transferred from it. We could illustrate this with our diagram as:
Or simply, in code:
let quota replica (BCounter(pn, transfers)) = transfers |> Map.fold (fun value (sender, receiver) transferred -> if sender = replica then value - transferred elif receiver = replica then value + transferred else value ) (PNCounter.value pn)
Just like in case of previous counters, remember that those operations are context-sensitive: it means that each replica should only "speak" by itself and never try to act on behalf of another replica. With this helper function, we're ready to implement our three operations. Let's start from increment, as it's the easiest one:
let inc replica value (BCounter(pn, transfers)) = BCounter(PNCounter.inc replica value, transfers)
As we said, we are free to increment the total value as we want to. Decrements are a little more tricky:
let dec replica value bcounter = let q = quota replica bcounter if q < value then Error q // cannot consume more that what we have else let (BCounter(pn, transfers)) = bcounter Ok (BCounter(PNCounter.dec replica value pn, transfers))
Here, a returned value is actually
Result<BCounter, int64> - on successful decrement we return updated
BCounter. However as mentioned, decrement may fail if local quota limit doesn't allow us to freely perform that operation. In that case we return an error result with the amount of quota available on local replica.
In failure scenario we may need to ask other remote replicas to share their quotas, which of course means, that we need at least a periodic connectivity to them and that our replica cannot perform operations safely fully on its own, even when a total quota allows to do that.
This is a known limitation, but as I've said at the beginning, there's no magic here. If two things that cannot communicate, they cannot communicate. It's science.
The last operation we have left, is our quota transfer:
let transfer sender receiver value bcounter = let q = quota sender bcounter if q < value then Error q // cannot send more than what we have else let (BCounter(pn, transfers)) = bcounter let pair = (sender,receiver) let updatedTransfers = // if entry for `pair` key doesn't exists insert `value` there // if it exists add `value` to a current entry Helpers.upsert pair value ((+) value) transfers Ok (BCounter (pn, updatedTransfers))
Of course, in real distributed environment, this means, that transfer request-response is performed asynchronously, and that remote replicas need to wait for a local state to propagate, before the receiver will be able to take advantage of newly assigned quota.
Since we're talking about state-based CRDTs, there's one thing missing - our glorified
merge function, which allows us to automatically resolve all potential conflicts between two replicas. Fortunately, we already know how to merge
PNCounters from previous blog posts (right?), so we only need to figure out, how to merge transfers matrix.
At the very beginning of this blog post series about CRDTs, I've talked about two core operations that maintain idempotence, associativity and commutativity rules, required by
merge function: union of sets and max value of two numbers (and by extension composition of those operations).
Since we can treat keys of the map as set, we only need to know, how to merge values of the map. We already did that when implementing
GCounter.merge operation. We only need to replace key type (from
replica to pair of
(sender,receiver)) since each entry describes transferred quota between that pair, we know that this value is only incremented - we cannot "untransfer" quota, we can only transfer it back, by incrementing value of reversed
(receiver, sender) entry. Because of that, we can safely apply our
max value in this case (as the greater value is always the one most up-to-date):
let merge (BCounter(c1, t1)) (BCounter(c2, t2)) = let c3 = PNCounter.merge c1 c2 let t3 = t2 |> Map.fold (fun acc pair v2 -> Helpers.upsert pair v2 (max v2) acc) t1 BCounter(c3, t3)
With that in place, we have a
BCounter with a complete set of operations necessary to perform its role.
In this blog posts, we covered how to build a basic bounded counters. We've also mentioned how to maintain upper and both upper-lower boundary (via
While we haven't mentioned how to implement delta-state on top of these, it's really not that hard: we covered composition of delta-state of G-Counters and PN-Counters previously, and exactly the same rules also apply here.