Have you ever run into N+1 select problems with your GraphQL endpoint? If so, don't worry anymore. Today we're going to generalize and solve this kind of issues as well as discuss and explain the mechanics behind them. Also as usual, we don't cover only theory but also a F# implementation, which - if you're impatient - can be found here.

What's the problem

N+1 Select is a common name for a problem most often seen in Object Relational Mapping libraries. It's usually result of not prefetching all required data from database ahead of operating on it, in result causing our ORM to lazily fetch every missing piece when its demanded. Since this kind of issue is often combined with processing (a missing) data within loops, we easily run into situation when instead of doing a single database query, we're doing it for every loop iteration, seriously crippling performance of our system.

This problem became even more apparent - and harder to solve - with introduction of GraphQL, which enabled API consumers to send arbitrary requests. This dynamic nature of GraphQL queries makes it harder to predict, what data needs to be prefetched in order to serve the query. We cannot hardcode it, as requirements may be changing from one query to another. Moreover, different segments of the query (called resolvers) are often isolated and don't exactly know which other parts of the query are about to be fetched - some libraries solve this by creating query execution plans - or how to synchronize data access to avoid fragmented retrieval of information.

N+1 Select issue can also be described in more general terms - we are dealing with multi-step tree of async computations, some of which have dependencies on other computations being performed first, while others can be run in parallel, isolated from each other. Example: when having Users and Orders table, if you want to serve data about users and their orders, orders depend on users, but one order request is isolated/independent from another, same for individual users.

async-sequential-vs-parallel-2

Optimizations around this sort of dependent/isolated and sequential/parallel relationships where researched, and I think that for the purpose of this post we can divide them into two categories:

  1. Original approach promoted by Facebook's Haxl library, which was adopted by others like ZQuery or FSharp.Data.DataLoader. These are focusing on defining dedicated monads/applicatives capable of representing dependent/independent computations and deferring their execution in introspective, "free" manner.
  2. Another way was introduced by dataloader.js and spread over by many other platforms, including C# and Rust. It focuses on intelligent managing and sidelining the task execution queue filled by async/await mechanisms of these languages.

These approaches even when using different techniques, still are based in similar foundations. They try to optimize parallel operations by introducing batching and substituting individual requests with an equivalent operation that works over batches. Imagine replacing N queries for a given ID with one passing a list of N given IDs. They usually also support caching of once retrieved values - which helps deduplication of requests and is especially useful when it comes to building data graphs containing cycles.

Here we're going to cover 2nd approach. The reason is simple - 1st approach relies on building custom effect system that needs to be propagated by the rest of the application, making it quite intrusive. It's highly unlikely that you'll find another system, framework or library integrated to use and propagate your custom effects onward... especially since we didn't define them yet. This could be solved with Higher Kinded Types, but we don't have them in F#.

Making up promises

First, we need some building blocks to begin with. One of the components, we're going to need, is a value container that can be asynchronously awaited for until the value is provided. If you're using .NET TPL, then you know that we have TaskCompletionSource<T> which serves this purpose. Same for Promises in JavaScript.

In F# Async however we don't have such thing provided as part of standard library. It's quite important, as Async is not really an object representing a computation in progress (like .NET Tasks in most cases), but a description of a computation that needs to be started to do its job - actual object representing computation in progress is called AsyncActivation and usually is not visible to us. It also means, that binding to async computation multiple times will cause its code to be run multiple times.

async {
  let sleep1s = Async.Sleep 1000
  do! sleep1s
  do! sleep1s
} // total time passed: 2 seconds

task {
  let sleep1s = Task.Delay 1000
  do! sleep1s
  do! sleep1s
} // total time passed: 1 second

This is a result of lazy nature of F# Async and one of the properties of functional programming. That's what we want to avoid here, otherwise our fetching function would be called for each element in processed batch instead of once, defating its purpose. Fortunately, this is pretty well known problem. Some variants of ML languages (which also F# inherits from) like Concurrent ML define structures known as ivar or mvar, which semantics we're going to use bellow:

  • IVar<'t> is write-once cell, which reflects behavior of TaskCompletionSource<'t>. It can be written to only once and asynchronously awaited for a value to appear. In case when the value was already there, all future awaiters are being completed immediately. This is the data structure we're going to use.
  • MVar<'t> is another variant, that allows swapping the value inside multiple times. We're not going to use it here - it can have a funky behavior in terms of data races - but it's worth to know about the existence of it. It has no equivalent in .NET standard library.

We'll start from defining an internal state object of our IVar container.

type VarState<'t> =
  | Empty of ('t->unit) list
  | Full of 't

It can work in two modes:

  1. Empty means, that value was not yet provided. In that case we're collecting all completion callbacks, that come from pending awaiters.
  2. Full informs, that the cell is filled. In that case awaiter list is no longer necessary, as we can resolve them as they come.

With this in mind, our IVar type looks like this:

type IVar<'t> () =
  let mutable state: VarState<'t> = Empty [] 
  let accept (success, _failure, _cancel) = ...
  let async = Async.FromContinuations accept
  member this.Value: Async<'t> = async
  member this.TryWrite (value: 't): bool = ...
  member this.Write (value: 't): unit =
    if not (this.TryWrite value) then
      raise (NonEmptyVarException "Cannot write value to non-empty IVar.")

We skipped some of the implementations to discuss them in a second. First thing, we're going to look at is the accept function. It's used to create a generic completable Async object using Async.FromContinuations. What you can see here is that it's a function that takes a triple of callbacks, used to denote one of the tree possible final outputs of our async - a successfully computed value, a failure with exception or a cancellation.

For updating the state when an async activation binding hits our IVar.Value, we need to be sure that this operation is thread safe, and we're going to use lockless updates to make it so.

let accept (success, _failure, _cancel) =
  let rec update success =
    let old = Volatile.Read &state
    match old with
    | Full value -> success value
    | Empty list ->
      let nval = Empty (success::list)
      if not (obj.ReferenceEquals(old, Interlocked.CompareExchange(&state, nval, old))) then
        update success // retry
  update success

We described this algorithm in the greater detail in the past. Here we only glimpse over it. What we're basically trying to do is to accept incoming callbacks - if our IVar already has value in it, we're calling the callback immediately. Otherwise we add it to awaiters list and try to update the state using atomic hardware instructions. If another thread tried to do the same in the meantime, Interlocked.CompareExchange can return state updated by another thread. If that happens we'll simply retry the operations - since our state is based on immutable data structures, it's safe to do so.

Another operation using atomic compare and swap semantics is writing a value into a cell. Here we're trying replace our Empty state with Full value. If that succeeds, we're iterating over all awaiters callbacks, providing them with value.

type IVar<'t>(?value: 't) =
  member this.TryWrite (value: 't): bool =
    let rec update value =
      let old = Volatile.Read &state
      match old with
      | Value _ -> false
      | Full awaiters ->
        if obj.ReferenceEquals(Interlocked.CompareExchange(&state, Full value, old), old) then
          for resolve in awaiters do
            resolve value
          true
        else update value // retry   
    update value

While iteration itself happens in LIFO order, it doesn't really matter here.

Async batching

With our "promises" ready, we can now go to building actual solution. The core of the idea depends on cooperation of two different types:

  • A batching/caching object, which we supplement with our function for resolving sequences of requests. It exposes a method to perform requests one by one and hides the complexity to putting them in batches.
  • A custom synchronization context - this class is related to the .NET threading model. When our async code is being started (eg. during operations like Async.Start), we can instruct async activation, how should it be run on CPU via synchronization context. By default it uses a global .NET thread pool, but we'll need to alter this behavior a little.

The core of our API in action will look like this:

let ctx = DataLoaderContext() // custom synchronization context

let loader = DataLoader(ctx, fun ids -> async {
	use! db = openDb connectionString
	let! users = db.QueryAsync<User>("select * from users where id in (@ids)", {| ids = ids |})
	return users |> Seq.map (fun u -> u.id, u) |> Map.ofSeq
})

// later in parallel code pieces
let! users = 
	userIds
	|> Array.map (loader.GetAsync)
	|> Async.Parallel // all parallel calls will be grouped into single batch call

How to make that work? When we're trying to call loader.GetAsync, several things have to happen:

  1. (Optional) If we use caching, we can check if we didn't have value for provided key already. In that case we can short-circuit it and return immediately. Otherwise...
  2. Add requested key to batch.
  3. Push notification to our synchronization context to enqueue current data loader. It will wait there until later async activation will step over code binding to loader.GetAsync reply and try to execute it.
  4. Return a value from loader's internal IVar (our promise). This value is async and it will wait until we call our batching function. It's important that we need to schedule it on our custom synchronization context.
  5. Once loader.GetAsync will get executed, an async activation will try to schedule the next (synchronous) step using SynchronizationContext.Post method. There, we can override it and try to call our data loaders, which we enqueued on it in point 3.
  6. We're going to dequeue loaders one by one and let them work sequentially - so that first needs to finish, before we start the next one. We trigger them by calling a custom Commit method. It will internally replace IVar and batch of current data loader with fresh ones (so they count as the next batch) and will call fetch function over all requested arguments grouped so far.
  7. Once fetch function returns we complete our IVar (causing all awaiting async activations awaiting at point 4 to continue) and communicate to our synchronization context (via custom Done method) that it's safe to pick the next loader from the queue and repeat algorithm from point 6.

As you've might notice, the step through process is not quite easy to follow (eg. pt 7 wraps into 4 and 6), especially if we take into account, that not everything in there is called by our own code, but also by F# async state machine. There are many things that can go wrong here, and in that case all you'll see will be just an empty screen waiting in a deadlock. So better have a plan of action first!

We'll start from defining our DataLoader type:

type DataLoader<'id, 'value when 'id: comparison>(sync: DataLoaderContext, fetchFn: Set<'id> -> Async<Map<'id, 'value>>) as this =
  let syncRoot = obj()
  let mutable cache = Map.empty
  let mutable batch = Set.empty
  let mutable fetching = IVar.empty ()
  member this.GetAsync(key: 'id): Async<'value> =
    lock syncRoot (fun () ->
      match cache.TryGetValue key with
      | true, value -> value // 1. return cached async result if available
      | false, _ -> 
        batch <- Set.add key batch // 2. batch key
        sync.Enqueue this          // 3. enqueue this data loader
        let promise = fetching.Value
        let value = async {
            // 4. make this async work in our custom synchronization context
            do! Async.SwitchToContext sync
            // promise will return Map with `fetchFn` result for current `batch`
            match! promise with
            | Ok values -> return Map.find key (Result.unwrap result)
            | Error err -> 
              // rethrow an exception preserving its stack trace
              ExceptionDispatchInfo.Capture(err).Throw()
              return Unchecked.defaultof<_> // never reached
        }
        cache <- Map.add key value cache
        value)

Just like we mentioned, within GetAsync method we're executing steps 1-4 of our algorithm. What we didn't talk about, is that we require this operation to be thread safe. This time we'll fallback to good old lock function, since guaranteeing idempotency of this operation would be too expensive to use compare-and-swap.

Next, let's take a look at our modified synchronization context:

type DataLoaderContext() =
  inherit SynchronizationContext()
  let syncRoot = obj()
  // currently executing data loader
  let mutable atWork: ICommittable = null 
  let pending = Queue<ICommittable>()
  let rec next () =
    if isNull atWork && pending.TryDequeue(&atWork) then
      atWork.Commit()
      next ()   
  override this.Post(job: SendOrPostCallback, state: obj) =    
    job.Invoke(state)
    this.TryCommitNext()
  override this.Send(job, state) = this.Post(job, state)
  member internal this.Enqueue(commit: ICommittable) =    
    pending.Enqueue commit
  member internal this.Done() = lock syncRoot (fun () ->
    // once current data loader is finished, we repeat the cycle 
    atWork <- null 
    next ())
  member internal this.TryCommitNext() = lock syncRoot next

Here we're adding some internal methods used to communicate with our data loaders. Post method is called by F# async infrastructure and its role is to execute synchronous steps of our async code (discrete pieces in-between let! bindings generated by F# compiler). As part of this we check if no data loader (ICommittable object) is currently executing, and if not, picking next loader from the queue and commit it to do its work. Since we don't have control over when the Post is called, we also put it into synchronized block under lock guard.

So far, we skipped implementation of ICommittable to discuss rest of the infrastructure. It's really simple in its nature. What we do is to swap a contents of our DataLoader's IVar and batch to be sure they won't be altered while we're going to work with them. Next we call out fetch function over accumulated batch we retrieved and once it's complete, finish the IVar promise and inform synchronization context that the current data loader finished its work and the next one can be picked.

type DataLoader<'id, 'value when 'id: comparison> =
  interface ICommittable with
    member this.Commit() =
      // swap the contents of batch and promise
      let keys, ivar =
        lock syncRoot (fun () ->
          let ivar = fetching
          fetching <- IVar.empty()
          let keys = batch
          batch <-  Set.empty
          (keys, ivar))
      if Set.isEmpty keys then sync.Done()
      else
        Async.Start(async {
          try
            try
              let! batchResult = fetchFn keys
              ivar.Write (Ok batchResult) |> ignore
            with e ->
              ivar.Write (Error e) |> ignore
          finally
            // inform synchronization context, that current
            // data loader finished its work
            sync.Done ()
        })

With these in our toolbelt, we're now ready to batch multiple independent requests and substitute them with one. This design can also work in nested manner, when subsequent parallel requests have their own dependencies that can be executed in parallel - you can see that in action here.

Summary

We presented a technique, that offers a way to mitigate N+1 Select negative performance footprint, by substituting multiple parallelizable requests with one. This approach is not perfect. We still may need to do a number of requests growing with a depth of our call tree - using example from above, we can optimize access to multiple users or orders, but since orders depend on users, we cannot parallelize them both at the same time, even thou we could write such query by hand.

What's even more interesting, this approach is not forced to work only with database queries. It can be generalized to pretty much any sort of parallel workloads, and help to write more optimal and more readable code by hiding the inner complexities of executing parallel code.