Erlang pg2 Failure Semantics

03 Jun 2013

I recently spent some time writing an Erlang/OTP application where I needed to broadcast messages to a series of blocking webmachine resources, which would return multipart responses to the user derived from the messages each process received. I implemented this prototype using the built in pg2 distributed named process group registry provided by OTP, as the webmachine resources would be split up amongst a series of nodes.

However, unsure of how pg2 actually performs during node unavailability and cluster topology changes, as well as reports of problems due to race conditions and partitions, I set out to figure out exactly what the failure semantics of pg2 actually are.

What is pg2?

pg2 is a distributed named process group registry, which has replaced the experimental process group registry pg. One of the major departures from pg is that rather than being able to message a group of processes, pg2 will only return a process listing, and messaging is left up to the user. This drastically simplifies the domain, as the library no longer needs to worry about partial writes to the group, or failed message sends due to processes being located across a partition, before the Erlang distribution protocol determines the node is unreachable and times out the connection.

So, how does pg2 work?

First, we can register a process group on one node, and see the results on the clustered node.

We’ll run these operations in a riak_core application, with two clustered nodes riak_pg1, riak_pg2.

(riak_pg1@> pg2:start().
(riak_pg1@> pg2:create(group).

Getting the members, then returns the membership list containing no processes, rather than throwing {error, no_such_group}.

(riak_pg2@> pg2:start().
(riak_pg2@> pg2:get_members(group).

However, you can see interesting behaviour if you attempt to use pg2, prior to starting the pg2 application, as it’s started on demand.

(riak_pg1@> pg2:create(group).
(riak_pg2@> pg2:get_members(group).
(riak_pg2@> pg2:get_members(group).

pg2 also, maybe counterintuitively, allows processes to join the group multiple times, which is something to be aware of when leveraging pg2 as a publish/subscribe mechanism.

(riak_pg1@> pg2:create(group2).
(riak_pg1@> pg2:join(group2, self()).
(riak_pg1@> pg2:join(group2, self()).
(riak_pg1@> pg2:get_members(group2).

First, how does pg2 handle a node addition?

First, register a group and add a process to it.

(riak_pg1@> pg2:create(group).
(riak_pg1@> pg2:join(group, self()).
(riak_pg1@> pg2:get_members(group).
(riak_pg2@> pg2:get_members(group).

Then, start a third node and register two groups, one with the same name, and one with a different name.

(riak_pg3@> pg2:create(group).
(riak_pg3@> pg2:create(group2).
(riak_pg3@> pg2:join(group, self()).
(riak_pg3@> pg2:join(group2, self()).
(riak_pg3@> pg2:get_members(group).
(riak_pg3@> pg2:get_members(group2).

Now, join the node to the cluster.

(riak_pg1@> pg2:get_members(group).
(riak_pg1@> pg2:get_members(group2).
(riak_pg2@> pg2:get_members(group).
(riak_pg2@> pg2:get_members(group2).
(riak_pg3@> pg2:get_members(group).
(riak_pg3@> pg2:get_members(group2).

Everything resolves nicely.

So, how does pg2 handle a partition?

(riak_pg1@> pg2:join(group, self()).
(riak_pg1@> pg2:get_members(group).
(riak_pg2@> pg2:join(group, self()).
(riak_pg2@> pg2:get_members(group).
(riak_pg1@> pg2:get_members(group).

At this point, we suspend the [email protected] node, and wait the net tick time interval for the node to be detected as down.

[error] ** Node '[email protected]' not responding **
** Removing (timedout) connection **
(riak_pg2@> pg2:get_members(group).

And, once the node is restored the process returns.

(riak_pg2@> pg2:get_members(group).

So, how does this work?

When a remote pid is registered, and its state transferred to another node, the pg2 registration system sets up an Erlang monitor to that node. When that node happens to become unavailable, processes located on remote nodes are removed from the process group. When the node returns, it transfers its state, repopulating the process list with the processes which are now available.

What happens when you attempt to delete a group during a partition?

First, the delete request will block until the unavailable node times out. At that point the group will be considered deleted.

(riak_pg1@> pg2:delete(group).
21:39:09.898 [error] ** Node '[email protected]' not responding **
** Removing (timedout) connection **
(riak_pg1@> pg2:get_members(group).

However, when the partition heals, the second node will re-create the group on the primary.

(riak_pg1@> pg2:get_members(group).
(riak_pg2@> pg2:get_members(group).


We also observe the same timeout behaviour when performing leave and create operations, however, once the disconnected node is reconnected the correct state is propagated out.

When performing a create and join on either side of a partition, we can also observe that when the partition heals, the updates are performed correctly.

What about create with a three node cluster?

Here’s the state before the partition is healed.

(riak_pg3@> pg2:create(group3).
22:12:10.669 [error] ** Node '[email protected]' not responding **
** Removing (timedout) connection **
22:12:10.669 [error] ** Node '[email protected]' not responding **
** Removing (timedout) connection **
(riak_pg3@> pg2:join(group3, self()).
(riak_pg3@> pg2:get_members(group3).

At this point node 2 is completely disconnected from the rest of the nodes.

(riak_pg2@> pg2:create(group3).
(riak_pg2@> pg2:get_members(group3).
(riak_pg2@> pg2:join(group3, self()).
(riak_pg2@> pg2:get_members(group3).

Here’s the state after the partition is healed.

(riak_pg1@> pg2:get_members(group3).
(riak_pg2@> pg2:get_members(group3).
(riak_pg3@> pg2:get_members(group3).

What about leave with a three node cluster?

(riak_pg1@> pg2:join(group, self()).
(riak_pg1@> pg2:get_members(group).
(riak_pg2@> pg2:get_members(group).
(riak_pg3@> pg2:get_members(group).

Then we leave during the partition, which can only happen on the node running the process.

(riak_pg1@> pg2:leave(group, self()).
14:33:08.769 [error] ** Node '[email protected]' not responding **
** Removing (timedout) connection **
14:33:08.769 [error] ** Node '[email protected]' not responding **
** Removing (timedout) connection **
(riak_pg1@> pg2:get_members(group).

And, we see that the updated state is propagated.

(riak_pg2@> pg2:get_members(group).
(riak_pg3@> pg2:get_members(group).


While pg2 has some interesting semantics regarding processes able to register themselves in the process group, and with groups resurrecting themselves during the healing of a partition, the failure conditions during cluster membership and partitions appears to be straightforward, and deterministic.

I’m hopeful that I didn’t miss any obvious failure scenarios. Feedback welcome and greatly appreciated!

Updated 2013-06-04: Added details on how node addition is handled in pg2.

View this story on Hacker News.