In my previous post, I’ve shown how to create a simple hello world application using system leveraged with a F# API. Today, I’ll show a slightly more complex example. We’ll create a simple Map-Reduce system based on distributed word count algorithm.

Idea of our algorithm is quite simple – we want to group and count all repetitions of the same words (case-insensitive) in input text. This is a perfect subject to illustrate a map-reduce algorithm as well as task distribution mechanism. To realize our objective, we’ll need to define three actor types:

  • Master – it’s main job is to spread an input data equally among other actors and forward request for result data.
  • Mappers – they will tokenize their text fragments into specific words and send it to reducers.
  • Reducers – they are responsible for word grouping and counting.

At the beginning, lets define basic message types used by our system:

  • One to propagate chunks of data to mapping nodes.
  • Next one to forward processed data to reducers.
  • Last one to invoke collection mechanism, which will concatenate all reduced data to one peace and send it back to the originator.

All of them are illustrated by the code below:

type MRMsg =
    | Map of string
    | Reduce of (string * int) list
    | Collect

As you may have seen previously, instead of C#/Scala objective approach, I’ve used tail-recursive function to define an actor processing routine. Since a lot of it’s code would be repetitive among all of the actor types we want to define we can wrap actor behavior using actorOf and actorOf2 functions.

Lets start to defining a map-reduce logic into our application. Firstly we need to create a map actor. In this example, all what it needs to do, is to chop provided string into single words, and create from them a simple bag of words with their repetition. However we won’t sum them now – this is the job for reducer actors.

let mapWords (line:string) = seq { for word in line.Split() -> (word, 1) }

let map reducer (mailbox:Actor<MRMsg>) = function
    | Map line -> reducer <! Reduce (mapWords line |> List.ofSeq)
    | m -> mailbox.Unhandled m      // mapper won't handle any other messages

As you may see, after finishing it’s work, mapper will send list of words directly to reducers. To make this possible, we need to provide reducer reference as one of the function parameters.

Next type is the reduce actor. It’s task is to group all incoming words, and count their occurrences. In our example we use shared ConcurrentDictionary for gathering data from all of the reducer instances. For more real life example, we should probably use some more sophisticated mechanism, such as aggregate actor concatenating all results provided by specific reducers. Here for sake of simplicity, we omit it.

let reduceWords (dict:ConcurrentDictionary<string,int>) iter =
    |> List.iter (fun (k, v) -> dict.AddOrUpdate(k, v, System.Func<_,_,_>(fun key old -> old + v)) |> ignore)

let reduce (dict:ConcurrentDictionary<string,int>) (mailbox:Actor<MRMsg>) = function
    | Reduce l -> reduceWords dict l |> ignore
    | Collect -> mailbox.Sender() <! seq { for e in dict -> (e.Key, e.Value) }
    | m -> mailbox.Unhandled m

Since reducer actors have direct access to shared dictionary, they also are able to respond on Collect command, providing all reduced data back to message sender.

The last behavior, is the master actor. It’s role is to be a proxy between application’s in/out data, and the rest of the actors. We use it for two tasks: 1) sending text line by line to be processed by our Map-Reduce application, 2) forwarding request for result of MR operation.

let master mapper (reducer:InternalActorRef) (mailbox:Actor<MRMsg>) = function
    | Map str -> for line in str.Split '\n' do mapper <! Map line
    | Collect -> reducer.Tell(Collect, mailbox.Sender())    // forward message with info about it's originator
    | m -> mailbox.Unhandled m

Now when we have all of our actors defined, we may initialize an Akka framework to create our MR system. Now we can use a helper functions, we’ve defined before for fast and easy actor definition.

let system = System.create "MapReduceSystem" <| ConfigurationFactory.Default()
let dict = ConcurrentDictionary<string,int>()
let reducer = spawn system "reduce" <| actorOf2 (reduce dict)
let mapper = spawn system "map" <| actorOf2 (map reducer)
let master = spawn system "master" <| actorOf2 (master mapper reducer)

To finalize, we can pass some data to our system to see, if it returns an expected results. Until now, we’ve only used send operator <! to delegate fire-and-forget messages for specific actors, but we’ve actually never used any two way request-response mechanism. To do so, we’ll use Ask method (shortcut operator <?) to send a request message and receive a handler to be used when a response will be returned in asynchronous manner.

master <! Map "Orange orange apple"
master <! Map "cherry apple orange"

Threading.Thread.Sleep 500

// read the result
async {
    let! res = master <? Collect
    for (k, v) in res :?> (string*int) seq do
        printfn "%s\t%d" k v
} |> Async.RunSynchronously