# Exploring distributed designs with Erlangen: Kademlia

Max Rottenkolber <max@mr.gy>

Common Lisp is known to lend itself to rapid prototyping, and Erlangen intends to extend these capabilities towards the domain of distributed systems. This article documents an exploration of distributed hash tables, specifically the Kademlia paper, using Erlangen.

In an attempt to “boldly go where no one has gone before” (not me at least), I present mesh-table, a distributed hash table design that retains the topology based on the XOR metric pioneered by Kademlia, but otherwise deviates in its goals and implementation. While Kademlia describes a homogeneous peer-to-peer overlay network for exchange of transient data between peers in an untrusted network, mesh-table aims to facilitate storage and retrieval of long-lived, immutable data in a network of dedicated storage and client nodes in a trusted network.

I cover what I perceive to be the core principles of the Kademlia paper (the XOR metric and k-buckets), but go astray otherwise, often for simplicity's sake. Don’t expect particularly good ideas that are my own, and by no means production quality code. I’ll just be toying around for fun and insight.

## Distance metric

Kademlia requires that nodes are assigned identifiers of the same size as the keys used to identify stored values, and relies on the notion of distance between two identifiers. The basic premise is that the value denoted by a key is stored on the node with the identifier closest to the key. The distance between two identifiers is defined as their bit-wise XOR.

The `distance` function has some useful properties as a topological metric. The distance between an identifier and itself is zero

`(distance` x x`)` = 0

and the distance between two distinct identifiers is positive:

`(distance` x y`)` > 0 if xy

Furthermore, the `distance` function is symmetric,

x, y : `(distance` x y`)` = `(distance` y x`)`

and unidirectional, meaning for any identifier there exists exactly one identifier at any distance:

x, y, z : `(distance` x y`)``(distance` x z`)` if xyz

You can imagine the distance metric as a circular railway with each identifier being a train stop, and trains traveling in only one direction. Using the `distance` function, a node can tell if another node is closer to or further away from a given key, and determine which node is responsible for the key.

## Routes and buckets

Each Kademlia node maintains a set of routes that contain the identifiers and contact details of other nodes, effectively forming an overlay network. Since our nodes are implemented as Erlangen agents, we don’t need to deal with network protocols such as UDP, on which Kademlia is traditionally layered upon. Instead, we can use the message passing functionality provided by Erlangen as the underlying network. Hence, our routes store a reference to an agent as the contact information for another node.

Finally, we want to replace routes to nodes that have become inactive with routes to nodes that have more recently contacted us, and so we keep a timestamp with each route, to track when we last heard from the node it points to.

These routes form the edges of a partial mesh network of nodes. Just as values are stored in the node closest to their key, the mesh is organized in terms of the distance metric as well. We strive for a network topology that enables efficient lookup and redundancy.

By comparing the identifier in a route to the identifier of the node we can determine the length of the route. What does that mean, though? After all, our distance metric is not related to geographic distance, or any other “real” metric. Quite the opposite is true: our metric allows us to create a virtual, spatial space in which we can arrange our nodes.

In this space, nodes keep many routes to nodes close to them, and few routes to distant nodes. If a node receives a request for a given key, and it has a route to a node closer to the key than itself, it forwards the request via the route to the node closest to the key. We can see that a request eventually reaches the closest node in the network, and that the path is optimal with respect to the number of hops.

We implement this by sorting the routes of a node into a fixed number of buckets—one for each bit of the identifier—of equally limited capacity, but assign to them exponentially growing ranges of the identifier space. For each 1 ≤ n ≤ keysize, where keysize is the number of bits in an identifier, the respective bucket holds routes to nodes of distance between 2 and 2⁻¹.

A bucket consists of a bound denoting its assigned range (previously n), a free-counter to indicate how much capacity is left, and a list of `*replication*` − free routes for 0 ≤ free ≤ `*replication*`. Also, let’s define some of the global parameters of our network:

• `*key-size*`—the number of bits that comprise an identifier
• `*replication*`—a positive integer denoting the replication level of our network (controls the number of routes kept by nodes, as well as how many copies of each stored value we maintain)

You might notice that `bucket-key-p` doesn’t implement what I have described, it checks against the upper bound of the bucket range only. That’s because a node starts out with a single bucket initially, and then allocates further buckets on demand. It keeps a list of buckets sorted by increasing bound. The first bucket in that list keeps routes of distance between 0 and 2ⁿ⁻¹, with n being the bucket’s bound. Thus, the first bucket contains all routes that don’t fall into any other bucket. To search for a route’s bucket, we can iterate over the bucket list to find the first bucket with an upper bound greater than the distance of the route.

A node also keeps a local mapping from keys to values, as well as a ring of callbacks for outstanding requests, but more on that later.

When a new route is to be added to the node’s first bucket and that bucket is full, the next lower bucket is split off the first bucket, and put in front of the bucket list. This allocation strategy makes sense because—depending on the size of our mesh—the lower buckets are increasingly unlikely to be populated, since they hold decreasingly smaller ranges of the identifier space.

So, when is a bucket full? A bucket is full when its capacity is reached, and none of its routes are stale. We consider a route stale once its timestamp exceeds a network-global timeout parameter:

• `*timeout*`—the duration of route validity after the last contact with the route’s node

With that out of the way, we can formalize how routes are added. First, we find the bucket suitable for the new route, if it has spare capacity, or a stale route can be deleted, we simply add the route. If it doesn’t fit, we consider if the bucket in question is the first bucket, in which case it can be split. If it’s not, we simply discard the new route—we’re already well-connected at that distance. Otherwise, we split the first bucket to make room for those close routes, and repeat the process.

Whenever a node receives a message from another node it already has a route to, it updates that route’s contact information (in case it has changed) and timestamp (preventing it from becoming stale). If the message is from a previously uncontacted node, a new route is added instead. The `update-route` function below also acts as a predicate that tests whether a matching route exists, and only returns true when such a route was updated.

Finally, we need to be able to find routes to nodes closest to a given identifier. To that end, we sort our routes by distance to said identifier—in descending order because we will select the n best routes using last. I know, horribly inefficient, but it’s a simple and correct implementation, and that’s all we need right now.

## Callback rings

We previously defined a node to have a “ring” of callbacks—what’s that about? During their lifetime, nodes send requests to other nodes, and when they do, they include a unique sequence number with each request. When a node responds to a request, it includes the request’s sequence number in its reply. Via the sequence number, the requesting node can associate the reply with one of its previous requests.

While most of our protocol is stateless, some parts of it do require us to keep track. Specifically, we sometimes need to remember actions to take if one of our previous requests is eventually met with a reply. These actions are stored as closures in a callback ring, and are executed when a reply to the outstanding request associated with the callback is received.

A node’s request ring consists of a sequence counter, as well as a buffer of callbacks for a fixed number of requests.

• `*response-backlog*`—the number of callbacks for outstanding requests we keep track of

The request ring is implemented as a ring-buffer of virtually unlimited capacity, which overwrites old elements when it wraps around its actual size. Its access function makes sure to return `nil` instead of the elements that have been overwritten.

The `ring` data structure has some very desirable properties, as well as a few questionable trade-offs. The callback ring is of fixed size, thus we avoid the risk of overflow due to excessive outstanding requests. Additionally, we forgo complicated timeout handling, the timeout for outstanding requests is implicitly adjusted relative to the load of the system. The node discards a callback (effectively a timeout of the request) only when it needs to reclaim its buffer slot for a new request. Hence, the effective timeout duration of requests decreases with the request throughput of a node (high load).

One glaring trade-off is the node’s implied behavior on excessive load. When requests are issued so quickly in succession that the callback ring wraps around before a responding node can reply, the ensuing replies are ignored, and no progress is made. Thus, it is up to the client to detect backpressure, and throttle accordingly.

It is noteworthy how this scheme affects the generational garbage collector. On a system with low load, callbacks can be retained much longer than technically required, possibly causing them to be tenured into an old generation. While this increases GC pressure, I conjecture that this phenomenon is actually amortized, since it is unlikely to occur on medium to high loads—where the additional pressure would actually hurt us.

Importantly, the value of `*response-backlog*` must be chosen deliberately with these properties in mind, as it governs both the effective timeout duration and the peak congestion of the system.

## Protocol messages

Our nodes communicate by exchanging and routing messages through the mesh. Usually, this would be the time when we define a wire encoding, but Erlangen does that for us already, so we can simply define structure classes to represent our messages. Each message contains some metadata to facilitate routing, namely the sender’s node and agent identifiers, in addition to the parameters specific to a particular message type.

• `*node*`—special variable bound to the node’s state, a `node` structure

Furthermore, messages are divided into requests and replies. Using defstruct’s `:include` option—which amounts to single-inheritance—we define our different types of messages in a type hierarchy. All types of requests and replies are of type `message`, but the types `request` and `reply` are disjoint.

Requests are treated specially because they are subject to being forwarded to increasingly closer nodes before reaching their final destination. They feature two extra flags: forward-p and trace-p. The former is used for explicit replication by inhibiting forwarding, effectively preventing the receiver from delegating the request. The latter is a debugging feature that allows us to trace how requests travel through the mesh. Replies to requests are always sent directly to the node that originally issued it.

The semantics for each type of request are expressed as a `handle` method on the respective type. As mentioned before, nodes shall update or add routes to nodes they are contacted by. To implement this we use CLOS Method Combination to add a `:before` method to `handle` that does just this. Note that since this method is specified on the type `message`, it is also run when we handle replies.

The body of the method is straightforward: we update the route to the requesting node unless it doesn’t already exist, in which case we add a new route. There is one peculiarity though: some messages won’t have a node identifier, namely the ones issued by client nodes that don’t want to become part of the storage mesh. Client nodes send anonymous requests, and are not interned into the routing tables of other nodes.

To help nodes deal with replies we define a function, `reply-bind`, which assigns a unique sequence number to a request to be issued, and optionally stores a function in the node’s callback ring under that sequence number. The callback function can then call `finalize-request` to delete itself, signaling that the request is completed, and preventing further replies to the same request from being accepted. Finally, we define a `handle` method on the `reply` type to call a respective callback function if applicable.

Before a request is answered with a reply, it is usually routed forward through the mesh until it arrives at the node responsible for it. Forwarding also plays a role in replication, when a request is forwarded to a set of neighbors to solicit redundancy. For that purpose we define a function, `forward`, which sends a request via a list of routes, logging the event when tracing is enabled.

When a request has reached its final destination, the respective node responds with a message of type `reply`, which includes the request’s sequence number. The `respond` function takes a reply and a request, sets the sequence number of the reply accordingly, and sends it to the agent that initiated the request. When the trace-p flag of the request is true, the reply is logged.

We also define a function, `replicate-request`, which creates a copy (or replica) of a request, but overwrites its metadata. It also accepts two optional parameters: id and forward-p. The id parameter sets the node identifier slot of the created replica, and defaults to the calling node’s identifier. Client nodes use this parameter to create anonymous requests by supplying false. The forward-p parameter sets the forward-p flag in the replicated request, and defaults to true. It is used by storage nodes to explicitly solicit replication, and when they do so they ensure the redundant request isn’t routed by supplying false.

Finally, we define structures for our set of protocol messages. These are going to be instantiated, as opposed to the previous abstract structure types. Each of these structures is a subtype of either `request` or `reply`, and may contain one or more additional slots based on the message’s semantics. These messages include request/reply pairs for discovering new nodes, as well as retrieving, storing, and deleting key/value pairs from the mesh.

A request of type `discover-request` includes an extra slot, key, which holds an identifier close to the nodes to be discovered. It is answered with a reply of type `discover-reply`, which has no additional slots (the desired information, namely the identifier and agent of the replying node, is already inherited from the `message` type).

A request of type `get-request` includes an extra slot, key, which holds an identifier used as the key of a value to be retrieved. It is answered with a reply of type `get-reply`, which includes an extra slot value that holds the value associated with the specified key.

A request of type `put-request` includes two extra slots, key and value, which holds an identifier used as a key and a value to be associated with that key. It is answered with a reply of type `put-reply`, which merely signifies acknowledgment of the request, and thus has no additional slots.

Finally, a request of type `delete-request` includes an extra slot, key, which holds an identifier used as a key associated with the value to be deleted. It is answered with a reply of type `delete-reply`, which merely signifies acknowledgment of the request, and thus has no additional slots.

## Pluggable stores

By default we use Common Lisp’s built-in hash tables to store key/value pairs in nodes. This suffices for initial testing and experimentation, but eventually we want to use a persistent storage backend. To that end, we wrap our hash table accesses in methods, implicitly defining a set of generic functions that represent an abstract storage interface.

By implementing methods for the generic functions `values-get`, `values-put`, and `values-delete`, which specialize on the values parameter (the store object), we can plug-in alternative storage back-ends later on.

## Protocol logic

A central operation during routing is for one node to determine the next hop, if any, for a given request. We define a function, `routes`, to be called by a node to find any routes to nodes closer to a given identifier than itself. It accepts a node identifier of the node that initiated the request, and the target identifier of the request (which might be an identifier associated with a value or node), and returns no more than a specified number of relevant routes. By default, up to one route is returned, which will be the best route, if any.

It uses `find-routes` to get a list of all of the calling node’s routes sorted by distance to the target identifier, and removes from that list all routes that either lead to the node from which the request originated (to prevent routing cycles), are stale, or lead to nodes farther away from the target identifier than the calling node itself. When the calling node has no routes to closer nodes (meaning that it is the closest node), `routes` returns `nil`.

A similar function `neighbors` takes a target identifier, and returns n routes closest to the identifier, where n defaults to `*replication*`. An optional boolean parameter controls whether stale routes are excluded.

In order to discover routes to other nodes, and to optionally announce their own existence, nodes send messages of type `discover-request`. The `discover` function sends a discover request to its neighbors closest to the identifier key. It includes stale routes to eventually reestablish connectivity between temporarily partitioned nodes. The request includes the origin’s identity when announce-p is true, effectively announcing its existence to the receiving nodes.

A node receiving a discover request responds with a message including its identity of type `discover-reply`, thereby acknowledging its existence. It then forwards the request via up to `*replication*` routes to nodes closer to the request’s key identifier.

Discover requests are redundantly routed through the mesh until they reach the node closest to its target. In the process, the request initiator discovers routes to the closest node, and any node along the path the request was routed through. Storage nodes also announce themselves via discover requests, inserting themselves into the mesh by adding or updating routes to themselves. The level of redundancy is controlled by the network-global `*replication*` parameter.

The structure of the mesh (see “Routes and buckets”) implies that the paths taken by discover requests are likely to be the shortest in terms of the routes available to all nodes, and the number of hops needed to reach a destination does not increase significantly with the size of the identifier space or the size of the network.

To retrieve the value associated with an identifier, nodes send messages of type `get-request`. The handling node forwards the request to the node closest to the requested key it knows about unless it has no such route (meaning it is the closest node), or forwarding is explicitly forbidden (i.e. the forward-p flag is false). Once a node is unable to forward a get request, it attempts to retrieve the requested value, and, if successful, responds to the initiating node with a message of type `get-reply`.

If a node can’t satisfy the request because it doesn’t have the requested value, and forward-p is true, it replicates the request, sets its forward-p flag to false, and forwards it to its neighbors closest to the value’s key. When any of the neighbors replies with the value, the node copies it into its own store before replying to the original request with the retrieved value.

The last phase represents somewhat of a failover. If a node can’t satisfy a get request, which it presumably should be able to handle, it calls out to its peers for help as a last resort. Since it will copy any value retrieved this way into its own store, this behavior has a regenerating effect. When a new node joins the mesh, for instance, its store might be empty, but it will receive the subset of get requests closest to its identifier. By calling out to the closest neighbors, of which one must have been responsible for the key before the new node entered the mesh, it will over time accumulate all the values it is responsible for.

Because of how the mesh is structured, the handling node should be in a good position to coordinate with the next closest nodes. It is more likely than any other node to have routes to any nodes that store values for keys close to its identifier.

In order to store values in the mesh, nodes send messages of type `put-request` that contain the key identifier and the value to be associated with it. Put requests are forwarded through the mesh just like get requests are. Finally, the handling node records the key/value pair, and additionally replicates the requests to its neighbors closest to the key.

By forwarding copies of the request, the handling node distributes the key/value pair redundantly to its neighbors closest to the key. If all goes well, a successful put requests leaves n+1 copies of the pair in the mesh, where n = `*replication*`. Just as with get requests, the handling node’s affinity to other nodes close to the key identifier ensures that it will be able to replicate the value to relevant nodes.

Finally, nodes can delete values associated with a key by issuing messages of type `delete-request`. It works almost exactly like a put request, except that it removes key/value pairs from a node’s records.

Since routes become stale over time, a period of low or no traffic might leave a node without fresh routes. To avoid this situation, nodes periodically refresh buckets that are not full or contain stale routes. A bucket is refreshed by performing discover requests on a random identifier that falls within that bucket.

The function `refresh-routes` performs the procedure outlined above. It accepts a boolean parameter announce-p that controls whether the calling node should announce itself to its peers.

The inner workings of `refresh-routes` are a bit tricky. Remember that the first bucket contains all routes that don’t fall into any other bucket, so the first bucket spans the range of identifiers starting at distance d = 1 (the identifier at distance d = 0 is the node’s own identifier). For each bucket to refresh, we generate a random distance within its range, and attempt to discover the node with the identifier at that distance.

Remember that our distance function is unidirectional: for any identifier there exists exactly one identifier at any distance. Since our `distance` function is defined as XOR, which is reversible, the following holds true

`(distance` x y`)` = d`(distance` x d`)` = y

and we can obtain an identifier within the bucket by computing the distance between the calling node’s identifier and the distance selected at random.

When a node is first started it is supplied a set of initial peers. The function `initialize-node` takes a list of identifier/agent pairs, and adds a new route for each entry. It then performs discover requests on the node’s own identifier using the initial routes. The optional parameter announce-p controls whether the calling node should announce itself to its peers.

In order to perform periodic actions (like refreshing routes in our case) we need to keep track of time and time intervals. For this we use get-internal-real-time¹ to get a monotonic time. Monotonicity is important in this context because we care about stable intervals specifically, and want them to be strictly dependent on elapsed time—as opposed to global time. `Get-internal-real-time` returns a time in “internal time units”, which can be converted to seconds using the constant internal-time-units-per-second.

The function `deadline` accepts a timeout in internal time units, and returns a deadline (a time in the future at which the timeout will be exceeded). Analogously, the function `deadline-exceeded-p` is a predicate that accepts a deadline, and tests whether it is exceeded.

The function `receive-until` accepts a deadline, and waits to receive a message until the deadline is exceeded. It returns either a message received by the calling agent, or signals an error of type `timeout` if the deadline was exceeded. It uses the function `seconds-until-deadline` which returns the seconds left until a time in internal time units will pass. `Receive-until` lets us multitask between handling incoming messages and performing our periodic duties on time by ensuring that we don’t miss a deadline while we wait for incoming messages.

• 1. At the time of writing CCL’s implementation of `get-internal-real-time` is broken. Erlangen includes a fixed version of this function for the meantime, at least for platforms other than MS Windows.

## Storage nodes and clients

Now we have assembled all the pieces needed to express the life cycle of a node. The functions `node` and `client`, that implement the behavior of a storage node and a client respectively, are intended to be used as top-level functions for agents (i.e. as the first argument to spawn).

A storage node is initialized with an identifier, a set of initial peers, and a store object that maps keys to values. First, it binds *random-state* to a fresh random state object that has been randomly initialized to get its own independently seeded random source to generate random identifiers. It then binds the special variable `*node*` to a fresh `node` structure which is initialized as follows: unless specified, the identifier is selected at random using `gen-id`, and an empty hash table is used for storing values.

Additionally, an interval at half of the duration of `*timeout*` is set, to be used as the refresh deadline. The deadline is selected at halfway to the expiry date of fresh routes, so that newly added routes can be pinged at least once before they become stale. The first deadline is set to expire after a random duration up to the interval in order to avoid a thundering herd situation.

The node then initializes routes to its initial peers, and announces itself before entering its event loop. If a message is received before the refresh deadline expires, it is processed according to protocol by `handle`. When the refresh deadline is met, it is reset, and the node’s routes are refreshed.

Clients behave very similar to storage nodes, except they always use a randomly generated identifier, and do not use a store object. A client doesn’t announce itself, and proxies incoming requests instead of handling them. As such, it serves as a gateway into the mesh that can be used by one or more agents.

The function `proxy` takes a request, and forwards an anonymous replica of the request via its route to the node closest to the request key. It installs a callback to handle the eventual response, which causes the client to respond to the request issuer with the reply.

If you are wondering why `client` accepts messages of the form `(`agent `.` request`)` even though it never looks at the agent value, and sends replies back to the agent noted in the request structure, wonder no more. This way, clients implement the `erlangen-plaform.server` protocol, and can be queried using `cast` and `call`.

## Taking the mesh for a spin

Let’s go for a test drive. If you want to follow along, make sure to check out the `mesh-table` branch, and load `erlangen-platform`. First, we define a fresh package to experiment in, with all the goodies we need.

To bootstrap the mesh, we generate a random identifier, and use it to spawn our first node—the root node. So far it’s a lone wolf, but at least now we have an initial peer we can use to initialize further nodes with.

Now we can populate the mesh with a bunch of nodes. Initially, they will only have one peer (the root node), but it won’t be long until they discover each other.

Of course we want to interact with the mesh, so let’s spawn a client node, and put some data into the network. We generate a few random keys, and call the client with put requests to insert some values. Each put request yields a put reply acknowledging the operation.

To get more insight into what happens behind the scenes, we can trace requests through the mesh. But first, we need to spawn a logging agent to log the trace. We register the name `:log` for our logger, which is the default destination for `write-log` (on top of which our tracing is built on).

With a log present, we can trace our requests using the `:trace-p` flag. Let’s trace a get request, and see what happens. The client forwards the request to the node closest to the key. Since the client just handled a put request to the same key, it still has a direct route to the node closest to the key, and that node happens to be final destination for the request. The node responds with a get reply to the client, which then responds to the caller.

So far so good, let’s kill half of our nodes to see how the mesh deals with that. Imagine a whole data center burns down to the ground, and no backups can be found. In distributed speak this situation is called a partition, which implies that the nodes could come back up at a later time, but in our case they won’t.

We can still make requests, but we might not get back a reply—half of our mesh went dark after all. We can supply a timeout to `call` to give an upper bound to the time we wait to a reply. The request might be forwarded through a seemingly fine route, to a node that won’t reply no matter what. After a while though, the routes to dead nodes will become stale. A live node will take over, and chances are high that it will be able to yield the correct value.

To make up for our losses, we spawn ten fresh nodes to take over. New routes are established, and some stale routes are forgotten. Soon enough the mesh will look just as healthy as it did before.

Over time, the new nodes will even regenerate records previously owned by now dead nodes. When a node receives a get request for a key to which it is closest, but doesn’t have a value for the key, it forwards the request to its neighbors with the `forward-p` flag set to false. If it gets a reply it copies the value into its own store, and replies as if it had always been present.

That’s it for out little tour through the mesh. I hope I could show some weaknesses (like reduced availability right after partition), and some strengths (like effective durability) of the design. Most importantly, I hope I could convince you of Common Lisp and Erlangen as a good platform for developing distributed designs.

## Appendix: graphing the mesh

You might have guessed that the visual graphs of meshes found in this article were not created by hand. The `mesh-table` branch comes with some extra code for rendering live mesh networks using Graphviz, specifically a DOT emitter.

Identifiers are encoded in the color space, which can sometimes be misleading, but gives a fairly accurate sense of distance most of the time. Edge length and thickness also relate to the distance between two nodes connected by a route. Additionally, it adds visual cues for stale routes, dead nodes, and can even overlay request traces onto the graph.

Besides generating visual graphs for this article, the DOT emitter also proved to be an invaluable debugging tool. Bugs that might otherwise be subtle, become very prominent, if not obvious, when you can see them affect the live system. In fact, as the visualization improved, I found more and more bugs. I can’t stress enough, how (even more) embarrassingly bug ridden this article would’ve been, if the little `graphviz` function hadn’t saved the day.