Orleans, Distributed Virtual Actors for Programming and Scalability Comparison

03 May 2015

Orleans: Distributed Virtual Actors for Programmability and Scalability
Philip A. Bernstein, Sergey Bykov, Allan Geller, Cabriel Kliot, Jorgen Thelin
Microsoft Research; Technical Report MSR-TR-2014-41

Here's an side-by-side comparison of the Orleans work by Microsoft, a virtual actor system for large-scale distributed programming, with Erlang and Basho's distributed programming abstraction, Riak Core.

If I've got anything wrong, I kindly ask to you open a pull request. Thanks!

Orleans Paper, MSR-TR-2014-41

Comments and Comparisons to Erlang and Riak Core

Abstract

High-scale interactive services demand high throughput with low latency and high availability, difficult goals to meet with the traditional stateless 3-tier architecture. The actor model makes it natural to build a stateful middle tier and achieve the required performance. However, the popular actor model platforms still pass many distributed systems problems to the developers.

The Orleans programming model introduces the novel abstraction of virtual actors that solves a number of the complex distributed systems problems, such as reliability and distributed resource management, liberating the developers from dealing with those concerns. At the same time, the Orleans runtime enables applications to attain high performance, reliability and scalability.

The Erlang programming language is a dynamically-typed, functional, single-assignment, garbage-collected, language with eager evaluation. The concurrency model supported by Erlang is the actor model: concurrency is supported by lightweight processes that communicate through shared-nothing asynchronous message passing.

The programming model focused on in this paper features a novel abstraction called the "virtual actor"; this abstraction can be seen as an abstraction over the "simple_one_for_one" supervisor model where workers are dynamically created to perform a task with one main difference: in the Orleans model, actors are instantiated on demand and destroyed when they are not in use; instantiated actors, or activations, are responsible for processing work, as routed to them by the runtime system, in serial. These actors are known as stateless workers.

Orleans also supports a single activation worker that can only exist once.

This paper presents the design principles behind Orleans and demonstrates how Orleans achieves a simple programming model that meets these goals. We describe how Orleans simplified the development of several scalable production applications on Windows Azure, and report on the performance of those production systems.

1. Introduction

Building interactive services that are scalable and reliable is hard. Interactivity imposes strict constraints on availability and latency, as that directly impacts end-user experience. To support a large number of concurrent user sessions, high throughput is essential.

The traditional three-tier architecture with stateless front-ends, stateless middle tier and a storage layer has limited scalability due to latency and throughput limits of the storage layer that has to be consulted for every request. A caching layer is often added between the middle tier and storage to improve performance [9][14] [19]. However, a cache loses most of the concurrency and semantic guarantees of the underlying storage layer. To prevent inconsistencies caused by concurrent updates to a cached item, the application or cache manager has to implement a concurrency control protocol [11]. With or without cache, a stateless middle tier does not provide data locality since it uses the data shipping paradigm: for every request, data is sent from storage or cache to the middle tier server that is processing the request. The advent of social graphs where a single request may touch many entities connected dynamically with multi-hop relationships makes it even more challenging to satisfy required application-level semantics and consistency on a cache with fast response for interactive access.

The actor model offers an appealing solution to these challenges by relying on the function shipping paradigm. Actors allow building a stateful middle tier that has the performance benefits of a cache with data locality and the semantic and consistency benefits of encapsulated entities via application-specific operations. In addition, actors make it easy to implement horizontal, “social”, relations between entities in the middle tier.

Another view of distributed systems programmability is through the lens of the object-oriented programming (OOP) paradigm. While OOP is an intuitive way to model complex systems, it has been marginalized by the popular service-oriented architecture (SOA). One can still benefit from OOP when implementing service components. However, at the system level, developers have to think in terms of loosely-coupled partitioned services, which often do not match the application’s conceptual objects. This has contributed to the difficulty of building distributed systems by mainstream developers. The actor model brings OOP back to the system level with actors appearing to developers very much like the familiar model of interacting objects.

Actor platforms such as Erlang [3] and Akka [2] are a step forward in simplifying distributed system programming. However, they still burden developers with many distributed system complexities because of the relatively low level of provided abstractions and system services. The key challenges are the need to manage the lifecycle of actors in the application code and deal with inherent distributed races, the responsibility to handle failures and recovery of actors, the placement of actors, and thus distributed resource management. To build a correct solution to such problems in the application, the developer must be a distributed systems expert.

Orleans is specifically trying to provide higher-level abstractions than Erlang and Akka so application developers do not need to think about distribution, load-balancing, actor-placement, and cluster management.

Both Akka Cluster and Riak Core provide higher-level abstractions as well that solve similar problems: Riak Core and Akka Cluster both focus on distribution and cluster management but leave actor-placement and load-balancing across actors up to the developer.

To avoid these complexities, we built the Orleans programming model and runtime, which raises the level of the actor abstraction. Orleans targets developers who are not distributed system experts, although our expert customers have found it attractive too. It is actor-based, but differs from existing actor-based platforms by treating actors as virtual entities, not as physical ones. First, an Orleans actor always exists, virtually. It cannot be explicitly created or destroyed. Its existence transcends the lifetime of any of its in-memory instantiations, and thus transcends the lifetime of any particular server. Second, Orleans actors are automatically instantiated: if there is no in-memory instance of an actor, a message sent to the actor causes a new instance to be created on an available server. An unused actor instance is automatically reclaimed as part of runtime resource management. An actor never fails: if a server S crashes, the next message sent to an actor A that was running on S causes Orleans to automatically re-instantiate A on another server, eliminating the need for applications to supervise and explicitly re-create failed actors. Third, the location of the actor instance is transparent to the application code, which greatly simplifies programming. And fourth, Orleans can automatically create multiple instances of the same stateless actor, seamlessly scaling out hot actors.

Overall, Orleans gives developers a virtual “actor space” that, analogous to virtual memory, allows them to invoke any actor in the system, whether or not it is present in memory. Virtualization relies on indirection that maps from virtual actors to their physical instantiations that are currently running. This level of indirection provides the runtime with the opportunity to solve many hard distributed systems problems that must otherwise be addressed by the developer, such as actor placement and load balancing, deactivation of unused actors, and actor recovery after server failures, which are notoriously difficult for them to get right. Thus, the virtual actor approach significantly simplifies the programming model while allowing the runtime to balance load and recover from failures transparently.

The runtime supports indirection via a distributed directory. Orleans minimizes the runtime cost of indirection by using local caches that map from actor identity to its current physical location. This strategy has proven to be very efficient. We typically see cache hit rates of well over 90% in our production services.

Orleans specifically focuses on providing a runtime where developers do not have to think about load balancing, placement, and failure recovery.

While Erlang/OTP doesn't provide many of these things natively, and mostly provides lower-level building blocks, Riak Core provides a higher-level abstraction for distributing work over a cluster of nodes with support for load balancing and dynamic membership.

Riak Core uses an epidemic gossip protocol to distribute membership information across the cluster, allowing any node service any request and route it to the correct node in one hop.

Orleans has been used to build multiple production services currently running on the Microsoft Windows Azure cloud, including the back-end services for some popular games. This enabled us to validate the scalability and reliability of production applications written using Orleans, and adjust its model and implementation based on this feedback. It also enabled us to verify, at least anecdotally, that the Orleans programming model leads to significantly increased programmer productivity.

While the Orleans programming model is appropriate for many applications, certain patterns do not fit Orleans well. One such pattern is an application that intermixes frequent bulk operations on many entities with operations on individual entities. Isolation of actors makes such bulk operations more expensive than operations on shared memory data structures. The virtual actor model can degrade if the number of actors in the system is extremely large (billions) and there is no temporal locality. Orleans does not yet support cross-actor transactions, so applications that require this feature outside of the database system are not suitable.

In summary, the main contributions of this paper are (a) a novel virtual actor abstraction that enables a simplified programming model; (b) an efficient and scalable implementation of the distributed actor model that eliminates some programming complexities of traditional actor frameworks with a good level of performance and scalability; and (c) detailed measurements and a description of our production experience.

The outline of the paper is as follows. In Section 2, we introduce the Orleans programming model. Section 3 describes the runtime, with a focus on how the virtual actor model enables scalability and reliability. Section 4 discusses how Orleans is used in practice, and Section 5 presents measurements on both production and synthetic benchmarks. Section 6 compares Orleans to other actor frameworks and the early prototype of Orleans reported in [5]. Section 7 is the conclusion.

2. Programming Model

This section describes the Orleans programming model and provides code examples from the Halo 4 Presence service (described further in Section 4.1).

2.1 Virtual Actors

The Orleans programming model is based on the .NET Framework 4.5 [10]. Actors are the basic building blocks of Orleans applications and are the units of isolation and distribution. Every actor has a unique identity, composed of its type and primary key (a 128-bit GUID). An actor encapsulates behavior and mutable state, like any object. Its state can be stored using a built-in persistence facility. Actors are isolated, that is, they do not share memory. Thus, two actors can interact only by sending messages.

Virtualization of actors in Orleans has four facets:

1. Perpetual existence: actors are purely logical entities that always exist, virtually. An actor cannot be explicitly created or destroyed and its virtual existence is unaffected by the failure of a server that executes it. Since actors always exist, they are always addressable.

Virtual actors in Orleans exist at an abstraction layer over Erlang-style actors: in Erlang actors refer to a very specific instance of a process executing a function in the runtime system, in Orleans, they are a logical unit of computation, of which many instantiations can exist of.

For example, in Erlang we might create a gen_fsm for opening a door lock with a keycode. We would have a supervisor launch a process for each in-progress door-unlocking as requests came in, and this supervisor would launch these processes locally. It could also forward to a remote node's supervisor to have it started on a different node.

(To be clear: Erlang allows you to start a process on a remote node, but Erlang does not allow you supervise a process on another node; this is the reason we'd have to forward to a remote supervisor via message to have it properly started on the remote node.)

2. Automatic instantiation: Orleans’ runtime automatically creates in-memory instances of an actor called activations. At any point in time an actor may have zero or more activations. An actor will not be instantiated if there are no requests pending for it. When a new request is sent to an actor that is currently not instantiated, the Orleans runtime automatically creates an activation by picking a server, instantiating on that server the .NET object that implements the actor, and invoking its ActivateAsync method for initialization. If the server where an actor currently is instantiated fails, the runtime will automatically re-instantiate it on a new server on its next invocation. This means that Orleans has no need for supervision trees as in Erlang [3] and Akka [2], where the application is responsible for recreating a failed actor. An unused actor’s in-memory instance is automatically reclaimed as part of runtime resource management. When doing so Orleans invokes the DeactivateAsync method, which gives the actor an opportunity to perform a cleanup operation.

Automatic instantiation appears to be an abstraction that eliminates the need for instantiating actors through supervisors across a series of nodes and balancing requests across these instantiations.

The authors seem to indicate that supervision trees are not required with this model because the application is no longer responsible for recreating the failed actor. However, I feel this misses a couple critical points:

First, in the Orleans model a method invocation triggers processing by an actor. If an appropriate activation exists, it handles the request, if not, it instantiates a new activation. However, if that actor fails, the method invocation is provided a failure continuation that is responsible for error handling. This is different than the Erlang model of failure with supervision trees: under the one_for_one or simple_one_for_one strategy, the actor will be re-created and re-process with the arguments from the initial invocation. (Orleans does support retries on failed calls; this is configurable behaviour, but defaults to zero.)

Second, in the Erlang model supervision trees also serve to enforce a instantiation order for initial startup and when restarting processes during failure conditions. (Generally speaking, this order is topological sort of the process dependency tree.) It is unclear if forcing a particular instantiation order would ever be required in the Orleans model.

Given model Erlang actors exist for a brief period to perform processing of a defined unit of work, automatic reclamation of unused actors is not usually a concern. Some process pools, such as poolboy, provided as external libraries, provide a similar abstraction.

3. Location transparency: an actor may be instantiated in different locations at different times, and sometimes might not have a physical location at all. An application interacting with an actor or running within an actor does not know the actor’s physical location. This is similar to virtual memory, where a given logical memory page may be mapped to a variety of physical addresses over time, and may at times be “paged out” and not mapped to any physical address. Just as an operating system pages in a memory page from disk automatically, the Orleans runtime automatically instantiates a non-instantiated actor upon a new request.

While something like this does not exist in Erlang, Riak Core provides an abstraction over where resources are located in the network.

For example, in Riak Core you might specify you want to process data item X with module Y; module Y will spawn a process a a given node for processing this request if it's not already running (this is similar to the single activation actors in Orleans).

However, it's up to the user to provide the system a "partition key." for a given data item. This "partition key" is used to determine how to distribute load evenly for the number of nodes and predictably route all requests for a given item to the right node from any node by one hop. The reason for this abstraction is that Riak Core is data-centric: it assumes that data will be stored uniformly across the cluster and computation will be pushed to the data, using a technique such as function shipping.

Given many of the computations that occur in Orleans are stateless workers, it's of greater importance to either locate coordinating activations together and distribute across the cluster in a way to minimize hotspots.

In the comparison of Section 2.3, we discuss Riak Core's routing mechanism that provides an alternative form of location transparency.

4. Automatic scale out: Currently, Orleans supports two activation modes for actor types: single activation mode (default), in which only one simultaneous activation of an actor is allowed, and stateless worker mode, in which many independent activations of an actor are created automatically by Orleans on-demand (up to a limit) to increase throughput. “Independent” implies that there is no state reconciliation between different activations of the same actor. Therefore, the stateless worker mode is appropriate for actors with immutable or no state, such as an actor that acts as a read-only cache.

Making actors virtual entities, rather than physical ones, has a significant impact on the Orleans programming model and implementation. Automatic activation, location transparency and perpetual existence greatly simplify the programming model since they remove from the application the need to explicitly activate or deactivate an actor, as well as supervise its lifecycle, and re-create it on failures.

While Erlang actors can be viewed as single activation at some conceptual level, for instance, each process is uniquely started by the system, uniquely addressable by a given process identifier, and can each register uniquely locally or globally under a chose name, the "simple_one_for_one" pattern is a close analogy for the stateless worker pattern: a supervisor is responsible for dynamically starting a given number of processes that perform one task before terminating and do not register under a unique name with the process registry.

However, a difference in the Orleans model is that the stateless workers are long-lived and service multiple requests. For example, work from Basho on two Erlang-based libraries, sidejob and poolboy, has shown that pre-allocating a bounded number of processes that are long lived to serve requests, having any process be able to spawn these, and not route requests through a supervisor, result in better throughput and latency, given the removal of process management from the critical path.

A good example of the single activation pattern in Riak Core would be the claimant; one claimant exists at a given time for the cluster and is responsible for coordinating cluster changes, such as the addition and removal of a given node.

2.2 Actor Interfaces

Actors interact with each other through methods and properties declared as part of their strongly-typed interfaces. All methods and properties of an actor interface are required to be asynchronous; that is, their return types must be promises (see Section 2.4).

Erlang's processes execution functions that are dynamically typed.

Erlang currently has no mechanism for ensuring that a process will be able to respond to all messages it may receive; a static analysis tool is provided by Erlang, but only cursory analysis is supported using user-supplied annotations.

2.3 Actor References

An actor reference is a strongly-typed virtual actor proxy that allows other actors, as well as non-actor code, to invoke methods and properties on it. An actor reference can be obtained by calling the GetActor method of the factory class, which Orleans automatically generates at compile time, and specifying the actor’s primary key. A reference may also be received from a remote method or property return. An actor reference can be passed as an input argument to actor method calls.

Actor references are virtual. An actor reference does not expose to the programmer any location information of the target actor. It also does not have a notion of binding. In a traditional RPC model (such as Java RMI, CORBA, or WCF) the programmer needs to explicitly bind the virtual reference to the service, usually via an external registry or location service. In Orleans, actor references are created locally by the sender and can immediately be used without a bind or register step. This simplifies programming and maximizes throughput by allowing immediate pipelining of requests to actors without waiting to bind or to resolve a service endpoint.

Both Erlang and Riak Core provide the ability to to obtain a reference to a given instance of an actor. In Erlang, this is a process identifier that can be operated on as if it's local: while it's possible to idenitfy the node that a given process is executing on, it's not recomended to base any specific logic on this behavior.

In Riak Core, the design tries to enforce that all messages to a given actor are routed through the proxy process responsible for routing and remotely starting up an actor on a node when it's not initialized yet. This is to facilitate services such as dynamic membership: if the cluster topology is changing and actors are moving between nodes while the system is under operation, the proxy process is responsible for using the updated membership to route the request accordingly. This is an example of an instantiation of an actor transcending its process identifier as the system restructures itself.

2.4 Promises

Actors interact by sending asynchronous messages. As in most modern distributed systems programming models, these message exchanges are exposed as method calls. However, unlike traditional models, Orleans method calls return immediately with a promise for a future result, rather than blocking until the result is returned. Promises allow for concurrency without requiring explicit thread management.

Promises have a three-state lifecycle. Initially, a promise is unresolved; it represents the expectation of receiving a result at some future time. When the result is received, the promise becomes fulfilled and the result becomes the value of the promise. If an error occurs, either in the calculation of the result or in the communication, the promise becomes broken.

Promises are exposed as instances of the class System.Threading.Tasks.Task<T> that represents the eventual value of a specified type or of the class System.Threading.Tasks.Task that represents a completion promise corresponding to void methods.

The main way to use a promise is to schedule a closure (or continuation) to execute when the promise is resolved. Closures are usually implicitly scheduled by using the await C# keyword on a promise. In the example below the compiler does stack ripping and transforms the code after ‘await’ into a closure that executes after the promise is resolved. Thus, the developer writes code, including error handling, that executes asynchronously but looks sequential and hence more natural.

Erlang's processes communicate through message passing; processes can either send messages to another process by process identifier or name, if the destination process happens to be registered through the process registry. Asynchrounous message passing occurs whether the destination process is running locally on the same node, or running remotely.

If a process wishes to explicitly wait for a response to a message it has sent, it uses a receive operation to block on an incoming message that matches a given pattern (using functional pattern matching, seen in languages like Erlang and Haskell.)

In the event of a failure of the remote node, a process may wait forever for a message, so timeouts and monitors usually used to ensure processes terminate.

The Orleans model could be mimicked in Erlang through the combined use of sending a message and blocking on a receive condition representing the success or failure of the operation.

That said, the Erlang/OTP abstractions for concurrent programming: gen_server, gen_fsm, gen_event, and their distributed versions provided by Riak Core: riak_core_vnode, provide a solid foundation to build applications with. Each of these components can be instantiated a number of times, process messages as they arrive in serial, and can choose whether or not they want to send a message as a reply. These abstractions are modeled as tail-recursive processes that thread their state through each execution: messages arrive, modify local state, replies are possibly sent (now, or schedule for the future!), and new state is generated.

2.5 Turns

Actor activations are single threaded and do work in chunks, called turns. An activation executes one turn at a time. A turn can be a method invocation or a closure executed on resolution of a promise. While Orleans may execute turns of different activations in parallel, each activation always executes one turn at a time.

The turn-based asynchronous execution model allows for interleaving of turns for multiple requests to the same activation. Since reasoning about interleaved execution of multiple requests is challenging, Orleans by default avoids such interleaving by waiting for an activation to finish processing one request before dispatching the next one. Thus, an activation does not receive a new request until all promises created during the processing of the current request have been resolved and all of their associated closures executed. To override this default behavior, an actor class may be marked with the [Reentrant] attribute. This indicates that an activation of that class may be given another request to process in between turns of a previous request, e.g. while waiting for a pending IO operation. Execution of turns of both requests is still guaranteed to be single threaded, so the activation is still executing one turn at a time. But turns belonging to different requests of a reentrant actor may be freely interleaved.

Similarly, in Erlang each process, (which represents an actor in the system) executes in serial. This is the same for the abstractions that Erlang/OTP provides to simplify concurrent programs (gen_server, gen_fsm, gen_event, ...)

In a similar fashion, abstractions such as the gen_server, which is built on top of a tail-recursive Erlang process, supports interleaving. Given all messages are sent asynchronously, an Erlang process does not need to reply to each message, and if it decides to, it can at any point.

Given the properties of the Erlang programming language, single-assignment, functional, message passing by copying, and per-process heaps, it is not necessary to label code as reentrant.

2.6 Persistence

The execution of actor requests may modify the actor state, which may or may not be persistent. Orleans provides a facility to simplify persistence management. An actor class can declare a property bag interface that represents the actor state that should be persisted. The runtime then provides each actor of that type with a state object that implements that interface and exposes methods for persisting and refreshing the state.

It is up to the application logic when to checkpoint an actor’s persistent state. For example, it can do so when each application request is completed, or periodically based on a timer or based on the number of requests processed since the last checkpoint.

The interaction with the underlying storage is implemented via persistence providers, which serve as adaptors for specific storage systems: SQL database, column store, blob store, etc.

No such mechanism exists today in Riak Core or Erlang.

While Erlang has shared data structures that can be persisted, such as dets, these data structures are managed by a separate process responsible for processing messages related to the storage and retrieval of objects. All of this is left to the application developer.

2.7 Timers and Reminders

There are two kinds of timer facilities in Orleans. Transient timers closely mimic the .NET timer interface but provide single threading execution guarantees. They are created local to an actor activation, and disappear when the actor is deactivated.

A reminder is a timer that fires whether or not the actor is active. Thus, it transcends the actor activation that created it, and continues to operate until explicitly deleted. If a reminder fires when its actor is not activated, a new activation is automatically created to process the reminder message, just like any other message sent to that actor. Reminders are reliable persistent timers that produce messages for actors that created them while allowing the runtime to reclaim system resources by deactivating those actors, if necessary, in between reminder ticks. Reminders follow the conceptual model of virtual actors that eternally exist in the system and are activated in memory only as needed to process incoming requests. Reminders are a useful facility to execute infrequent periodic work despite failures and without the need to pin an actor’s activation in memory forever.

Erlang provides a way to schedule a message to be sent in the future, but this mechanism relies on a process explicitly performing the scheduling.

While this mechanism could be used to start another process, for example, another actor that should do periodic processing, the operation is much lower level than the operations provided by Orleans.

3. Runtime Implementation

In this section, we describe the general architecture of the runtime, highlight key design choices and their rationale. Our guiding principle is to enable a simple programming model without sacrificing performance.

 

3.1 Overview

Orleans runs on a cluster of servers in a datacenter, each running a container process that creates and hosts actor activations. A server has three key subsystems: Messaging, Hosting, and Execution. The messaging subsystem connects each pair of servers with a single TCP connection and uses a set of communication threads to multiplex messages between actors hosted on those servers over open connections. The hosting subsystem decides where to place activations and manages their lifecycle. The execution subsystem runs actors’ application code on a set of compute threads with the single-threaded and reentrancy guarantees.

When an actor calls another actor, Execution converts the method call into a message and passes it to Messaging along with the identity of the target actor. Messaging consults with Hosting to determine the target server to send the message to. Hosting maintains a distributed directory to keep track of all actor activations in the cluster. It either finds an existing activation of the target actor or picks a server to create a new activation of it. Messaging then serializes the message and sends it to the already opened socket to the destination server. On the receiving end, the call parameters are deserialized and marshaled into a set of strongly-typed objects and passed to Execution, which schedules it for invocation. If the actor is busy processing a previous invocation, the request is queued until that request’s execution is completed. If the receiving server is instructed to create a new activation, it registers the actor in the directory and then creates a local in-memory instance of it. The single-activation guarantees are provided by the directory.

Hosting is also responsible for locally managing resources in the server. If an actor is idle for a configurable time or the server experiences memory pressure, the runtime automatically deactivates it and reclaims its system resources. This simple strategy for local resource management is enabled by actor virtualization. An unused actor can be deactivated and reclaim- ed independently and locally on any server because it can later be transparently re-activated. This approach does not require complicated distributed garbage collection protocols which involve tracking all physical references to an actor before it can be reclaimed.

Both Erlang and Riak Core run in a cluster of nodes. Each share a similar topology: one TCP connection is established between all pairs of nodes in the system and traffic is multiplexed across that connection for all actors communicating between those nodes.

Placement of processes in Erlang is done manually. Distributed Erlang allows application developers to spawn a process on any node in the system. The process terminates once it has finished executing.

Riak Core provides an abstraction on top of Distributed Erlang: given some work that you'd like to have performed, referred to as a virtual node, use a user-specified key to route it to a given node in the cluster where it will be performed. In Riak Core, the runtime system takes care of partitioning this space evenly across the cluster and provides mechanisms for transitioning the system live when nodes are added or removed.

For instance, you could have a cluster of nodes responsible for storing data; this cluster ideally wants to evenly partition the storage across all nodes. In this case, actors on each node are used to service requests for data they locally manage and the runtime system routes a message to the appropriate actor given a route key.

This differs from the model in Orleans, because activations are automatically created and terminated as cluster demand varies over time.

3.2 Distributed Directory

Many distributed systems use deterministic placement to avoid maintaining an explicit directory of the location of each component, by consistent hashing or range-based partitioning. Orleans allows completely flexible placement, keeping the location of each actor in a distributed directory. This allows the runtime more freedom in managing system resources by placing and moving actors as the load on the system changes.

The Orleans directory is implemented as a one-hop distributed hash table (DHT) [17]. Each server in the cluster holds a partition of the directory, and actors are assigned to the partitions using consistent hashing. Each record in the directory maps an actor id to the location(s) of its activations. When a new activation is created, a registration request is sent to the appropriate directory partition. Similarly, when an activation is deactivated, a request is sent to the partition to unregister the activation. The single-activation constraint is enforced by the directory: if a registration request is received for a single- activation actor that already has an activation registered, the new registration is rejected, and the address of the existing activation is returned with the rejection.

Using a distributed directory for placement and routing implies an additional hop for every message, to find out the physical location of a target actor. Therefore, Orleans maintains a large local cache on every server with recently resolved actor-to-activation mappings. Each cache entry for a single-activation actor is about 80 bytes. This allows us to comfortably cache millions of entries on typical production servers. We have found in production that the cache has a very high hit ratio and is effective enough to eliminate almost completely the need for an extra hop on every message.

Process placement in Erlang is explicit.

Riak Core, which is heavily inspired by DeCandia et al. work on Amazon's Dynamo system, uses consistent hashing and hash-space partitioning. Cluster state and membership information is disseminated to all nodes in the cluster via an epidemic broadcast protocol. While Riak Core does not have direct support for replication of data, most systems implemented on Riak Core, such as Riak itelf, replicate data to a number of adjacent partitions using a majority quorum system approach.

In Riak Core, consistent hashing guarantees deterministic placement, so each node can independently know where to route information without consulting any other nodes. However, under failure conditions, stale membership information may cause a message to be routed to a node that has failed. This scenario can also occur with Orleans and a stale local cache.

3.3 Strong Isolation

Actors in Orleans do not share state and are isolated from each other. The only way that actors can communicate is by sending messages, which are exposed as method calls on an actor reference. In this respect, Orleans follows the standard actor paradigm. In addition, method-call arguments and the return value are deep copied synchronously between actor calls, even if the two actors happen to reside on the same machine, to guarantee immutability of the sent data.

To reduce the cost of deep copying, Orleans uses two complementary approaches. First, an application can specify that it will not mutate an argument by using a markup generic wrapping class Immutable<T> in the actor method signature. This tells the runtime it is safe not to copy the argument. This is very useful for pass-through functional style scenarios, when the actor code never mutates the arguments. An example of such functionality is the Router actor in the Halo 4 presence service (Section 4.1), which performs decompression of the passed data blob without storing or mutating it. For the cases where the actual copy has to happen, Orleans uses a highly optimized copying module that is part of the serialization subsystem (Section 3.7 below).

Erlang is a single-assignment, functional language with process isolation. Erlang processes have independent heaps and all message passing is performed by copying.

(Notable exceptions: Erlang uses a shared binary heap for binary objects that are over 64 bytes in size; when performing pattern matching on binaries, sub-binaries may contain references to their parent; the shared binary heap uses a GC mechanism that performs reference counting on the number of processes that have a reference to it on their local heap.)

3.4 Asynchrony

Orleans imposes an asynchronous programming style, using promises to represent future results. All calls to actor methods are asynchronous; the results must be of type Task or Task<T> to indicate that they will be resolved later. The asynchronous programming model introduced in .NET 4.5, based on the async and await keywords, greatly simplifies code to handle promises.

Orleans’ pervasive use of asynchrony is important for the simplicity and scalability of applications. Preventing application code from holding a thread while waiting for a result ensures that system throughput is minimally impacted by the cost of remote requests. In our tests, increased distribution leads to higher latency due to more off-box calls, but has almost no impact on throughput in a communication-intensive application.

Erlang uses asynchronous message passing: actors send other actors messages and can optionally wait for a response using the Erlang "receive" operation.

3.5 Single-Threading

Orleans ensures that at most one thread runs at a time within each activation. Thus, activation state is never accessed by multiple threads simultaneously, so race conditions are impossible and locks and other synchronization primitives are unnecessary. This guarantee is provided by the execution subsystem without creating per-activation threads. While single-threading does limit performance of individual activations, the parallelism across many activations handling different requests is more than sufficient to efficiently use the available CPU resources, and actually leads to better overall system responsiveness and throughput.

Erlang processes process each received message in serial, in the order that they are "received"; the "receive" primitive can select messages out of the process mailbox in order, or in a user specified order through the use of selective receive.

3.6 Cooperative Multitasking

Orleans schedules application turns using cooperative multitasking. That means that once started, an application turn runs to completion, without interruption. The Orleans scheduler uses a small number of compute threads that it controls, usually equal to the number of CPU cores, to execute all application actor code.

To support tens of thousands to millions of actors on a server, preemptive multitasking with a thread for each activation would require more threads than modern hardware and operating systems can sustain. Even if the number of threads did not exceed the practical limit, the performance of preemptive multitasking at thousands of threads is known to be bad due to the overhead of context switches and lost cache locality. By using only cooperative multitasking, Orleans can efficiently run a large number of activations on a small number of threads. Cooperative multitasking also allows Orleans applications to run at very high CPU utilization. We have run load tests with full saturation of 25 servers for many days at 90+% CPU utilization without any instability.

A weakness of cooperative multitasking is that a poorly behaved component can take up an entire processor, degrading the performance of other components. For Orleans, this is not a major concern since all of the actors are owned by the same developers. (Orleans is not currently intended for a multi-tenant environment.) Orleans does provide monitoring and notification for long-running turns to help troubleshooting, but we have generally not seen this as a problem in production.

The Erlang runtime system performs preemptive multitasking. Each operation in Erlang counts for a number of reductions and processes are descheduled after a given number of reductions. (The use of the term "reduction" here goes back to the original implementation of Erlang on Prolog.)

Erlang processes are executed on a number of schedulers: normally, one scheduler is allocated for each CPU core, but this is customizable at runtime.

3.7 Serialization

Marshaling complex objects into a byte stream and later recreating the objects is a core part of any distributed system. While this process is hidden from application developers, its efficiency can greatly affect overall system performance. Serialization packages such as Protocol Buffers [12] offer excellent performance at the cost of limiting the types of objects that may be passed. Many serializers do not support dynamic types or arbitrary polymorphism, and many do not support object identity (so that two pointers to the same object still point to the same object after deserialization). The standard .NET binary supports any type marked with the [Serializable] attribute, but is slow and may create very large representations.

For better programmability, Orleans allows any data type and maintains object identity through the serializer. Structs, arrays, fully polymorphic and generic objects can be used. We balance this flexibility with a highly-optimized serialization subsystem that is competitive with the best ones available on “standard” types. We achieve this by automatically generating custom serialization code at compile time, with hand-crafted code for common types such as .NET collections. The serialized representation is compact and carries a minimal amount of dynamic type information.

Distributed Erlang uses Erlang's External Term Format for data serialization between nodes. Given the fixed number of types in Erlang, it is not necessary to create a specification for external formats used by the application.

3.8 Reliability

Orleans manages all aspects of reliability automatically, relieving the programmer from the need to explicitly do so. The only aspect that is not managed by Orleans is an actor’s persistent state: this part is left for the developer.

The Orleans runtime has a built-in membership mechanism for managing servers. Servers automatically detect failures via periodic heartbeats and reach an agreement on the membership view. For a short period of time after a failure, membership views on different servers may diverge, but it is guaranteed that eventually all servers will learn about the failed server and have identical membership views. The convergence time depends on the failure detection settings. The production services that use Orleans are configured to detect failures and converge on cluster membership within 30 to 60 seconds. In addition, if a server was declared dead by the membership service, it will shut itself down even if the failure was just a temporary network issue.

When a server fails, all activations on that server are lost. The directory information on the failed server is lost if directory partitions are not replicated. Once the surviving servers learn about the failure, they scan their directory partitions and local directory caches and purge entries for activations located on the failed server. Since actors are virtual, no actor fails when a server fails. Instead, the next request to an actor whose activation was on the failed server causes a new activation to be created on a surviving server. The virtual nature of the actors allows the lifespan of an individual actor to be completely decoupled from the lifespan of the hosting server.

A server failure may or may not lose an actors’ state on that server. Orleans does not impose a checkpointing strategy. It is up to the application to decide what actor state needs to be checkpointed and how often. For example, an actor may perform a checkpoint after every update to its in-memory state, or may perform a check-point and wait for its acknowledgment before returning success to its caller. Such an actor never loses its state when a server fails and is rehydrated with its last checkpointed state when reactivated on a different server. However, such rigorous checkpointing may be too expensive, too slow or simply unnecessary for some actors. For example, an actor that represents a device, such as a cellphone, sensor, or game console, may be a mere cache of the device’s state that the device periodically updates by sending messages to its actor. There is no need to checkpoint such an actor. When a server fails, it will be reactivated on a different server and its state will be reconstructed from data sent later by the device. Another popular strategy, if the application can afford to infrequently lose small updates to the state, is to checkpoint actor state periodically at a fixed time interval. This flexibility in checkpointing policy, coupled with the ability to use different backend storage providers, allows developers to reach the desired tradeoff between reliability and performance of the application.

There are situations where the directory information used to route a message is incorrect. For instance, the local cache may be stale and have a record for an activation that no longer exists, or a request to unregister an activation may have failed. Orleans does not require the directory information used by message routing to be perfectly accurate. If a message is misdirected, the recipient either reroutes the message to the correct location or returns the message to the sender for rerouting. In either case, both the sender and receiver take steps to correct the inaccuracy by flushing a local cache entry or by updating the distributed directory entry for the actor. If the directory has lost track of an existing activation, new requests to that actor will result in a new activation being created, and the old activation will eventually be deactivated.

Failures in Erlang are also detected through a heartbeat mechanism; once a failure is detected, the node is considered no longer part of the cluster. Riak Core provides an abstraction on top of this to distinguish nodes that are down from nodes that are no longer members of the cluster, using the heartbeat message as a detector of temporarily failed nodes.

Cluster membership information in Riak is disseminated through a epidemic broadcast protocol. This protocol is responsible for broadcasting current cluster state periodically and membership changes which are performed by an elected leader, called the claimant.

When a server fails, any process executing there is lost. Subsequent messages, if specified, will be routed to a "fallback" node by Riak Core, which is the first online adjacent node in the cluster.

Typically with Riak Core, majority quorum systems are used to ensure that all updates and operations are replicated to adjacent partitions; this ensures that in the event of a tolerable level of failures the system is still able to make progress.

Neither Erlang or Riak Core contain checkpointing or persistence: this is left up to the application developer.

3.9 Eventual Consistency

In failure-free times, Orleans guarantees that an actor only has a single activation. However, when failures occur, this is only guaranteed eventually.

Membership is in flux after a server has failed but before its failure has been communicated to all survivors. During this period, a register-activation request may be misrouted if the sender has a stale membership view. The target of the register request will reroute the request if it is not the proper owner of the directory partition in its view. However, it may be that two activations of the same actor are registered in two different directory partitions, resulting in two activations of a single- activation actor. In this case, once the membership has settled, one of the activations is dropped from the directory and a message is sent to its server to deactivate it.

We made this tradeoff in favor of availability over consistency to ensure that applications can make progress even when membership is in flux. For most applications this “eventual single activation” semantics has been sufficient, as the situation is rare. If it is insufficient, the application can rely on external persistent storage to provide stronger data consistency. We have found that relying on recovery and reconciliation in this way is simpler, more robust, and performs better than trying to maintain absolute accuracy in the directory and strict coherence in the local directory caches.

In the event of network partitions, failures, single-activation actors may exist more than once, instantiated on each side of the partition. In Erlang, a similar problem is seen with facilities like pg2, global, gproc, and mnesia. Clustered instances of the Erlang runtime system assume a node is no longer online after the heartbeat message times out. This means that under partitions, groups of nodes can make progress independently; for instance, the global registry gproc will allow concurrent writes to the same data item on either side of the partition, and when the partition is healed, won't know automatically how to resolve the conflicts.

Orleans allows progress to be made independently and ensures that once partitions have healed, the system will return to the correct state; however, it's unclear what happens to any state maintained locally. Consider the case where two activations exist of a single-activation actor that progress independently during a partition: once the network stabilizes and Orleans deactivates one of the activation, how is possibly divergent state resolved?

These examples are all related to membership and visibility: under dynamic membership transitions, network partitions, and other network-related failures, knowledge regarding the correct location to route a request to or of failed nodes may be out of date.

The alternative solution they propose that relies on external storage seems to allude to an external coordination service such as Chubby (or Riak Ensemble). Alternative solutions could be to allow single activation actors to only exist on the majority side of a partition, or using convergent data structures to ensure deterministic resolution when the network partition heals.

For an interesting solution to the problem of resolving concurrent updates deterministically in an eventually consistent system, see my other post on CRDTs.

3.10 Messaging Guarantees

By default Orleans provides at-most-once message delivery. In addition, only if configured to do resends, Orleans will resend messages that were not acknowledged after a configurable timeout, thus providing at least once message delivery. (This section amended by the author after the paper was published. For more information, see this issue on GitHub.) Exactly-once semantics could be added by persisting the identifiers of delivered messages, but we felt that the cost would be prohibitive and most applications do not need it. This can still be implemented at the application level.

General wisdom in distributed systems says that maintaining a FIFO order between messages is cheap and highly desirable. The price is just a sequence number on the sender and in the message header and a queue on the receiver. Our original implementation followed that pattern, guaranteeing that messages sent from actor A to actor B were delivered to B in order, regardless of failures. This approach however does not scale well in applications with a large number of actors. The per-actor-pair state totals n2 sequence numbers and queues. This is too much state to maintain efficiently. Moreover, we found that FIFO message ordering is not required for most request-response applications. Developers can easily express logical data and control dependencies in code by a handshake, issuing a next call to an actor only after receiving a reply to the previous call. If the application does not care about the ordering of two calls, it issues them in parallel.

Distributed Erlang provides an at-most-once guarantee, wherein messages are delivered in order between pairwise processes. However, if the connection channel is re-established between two nodes, some prefix of messages that were acknowledged as sent by the sender may never be delivered to the recipient. This differs from the delivery guarantees that Orleans provides, where ordering of messages from actor A to actor B are not guaranteed in FIFO order.