21. Distributed Algorithms

We define a distributed system as a collection of individual computing devices that can communicate with each other [2]. This definition is very broad, it includes anything, from a VLSI chip, to a tightly coupled multiprocessor, to a local area cluster of workstations, to the Internet. Here we focus on more loosely coupled systems. In a distributed system as we view it, each processor has its semi-independent agenda, but for various reasons, such as sharing of resources, availability, and fault-tolerance, processors need to coordinate their actions.

Distributed systems are highly desirable, but it is notoriously difficult to construct efficient distributed algorithms that perform well in realistic system settings. These difficulties are not just of a more practical nature, they are also fundamental in nature. In particular, many of the difficulties are introduced by the three factors of: asynchrony, limited local knowledge, and failures. Asynchrony means that global time may not be available, and that both absolute and relative times at which events take place at individual computing devices can often not be known precisely. Moreover, each computing device can only be aware of the information it receives, it has therefore an inherently local view of the global status of the system. Finally, computing devices and network components may fail independently, so that some remain functional while others do not.

We will begin by describing the models used to analyze distributed systems in the message-passing model of computation. We present and analyze selected distributed algorithms based on these models. We include a discussion of fault-tolerance in distributed systems and consider several algorithms for reaching agreement in the messages-passing models for settings prone to failures. Given that global time is often unavailable in distributed systems, we present approaches for providing logical time that allows one to reason about causality and consistent states in distributed systems. Moving on to more advanced topics, we present a spectrum of broadcast services often considered in distributed systems and present algorithms implementing these services. We also present advanced algorithms for rumor gathering algorithms. Finally, we also consider the mutual exclusion problem in the shared-memory model of distributed computation.

We present our first model of distributed computation, for message passing systems without failures \[1\]. We consider both synchronous and asynchronous systems and present selected algorithms for message passing systems with arbitrary network topology, and both synchronous and asynchronous settings.

21.1.1. Modeling Message Passing Systems

In a message passing system, processors communicate by sending messages over communication channels, where each channel provides a bidirectional connection between two specific processors. We call the pattern of connections described by the channels, the topology of the system. This topology is represented by an undirected graph, where each node represents a processor, and an edge is present between two nodes if and only if there is a channel between the two processors represented by the nodes. The collection of channels is also called the network. An algorithm for such a message passing system with a specific topology consists of a local program for each processor in the system. This local program provides the ability to the processor to perform local computations, to send and receive messages from each of its neighbors in the given topology.

Each processor in the system is modeled as a possibly infinite state machine \[13\]. A configuration is a vector \( C = (q_0, \ldots, q_{n-1}) \) where each \( q_i \) is the state of a processor \( p_i \). Activities that can take place in the system are modeled as events (or actions) that describe indivisible system operations. Examples of events include local computation events and delivery events where a processor receives a message. The behavior of the system over time is modeled as an execution, a (finite or infinite) sequence of configurations \( (C_t) \) alternating with events \( (a_t) \): \( C_0, a_1, C_1, a_2, C_2, \ldots \) \[13, 14, 15\]. Executions must satisfy a variety of conditions that are used to represent the correctness properties, depending on the system being modeled. These conditions can be classified as either safety or liveness conditions. A safety condition for a system is a condition that must hold in every finite prefix of any execution of the system. Informally it states that nothing bad has happened yet. A liveness condition is a condition that must hold a certain (possibly infinite) number of times. Informally it states that eventually something good must happen. An important liveness condition is fairness, which requires that an (infinite) execution contains infinitely many actions by a processor, unless after some configuration no actions are enabled at that processor.

21.1.2. Asynchronous systems

We say that a system is asynchronous if there is no fixed upper bound on how long it takes for a message to be delivered or how much time elapses between consecutive steps of a processor \[3, 17\]. An obvious example of such an asynchronous system is the Internet. In an implementation of a distributed system there are often upper bounds on message delays and processor step times. But since these upper bounds are often very large and can change over time, it is often desirable to develop an
algorithm that is independent of any timing parameters, that is, an asynchronous algorithm.

In the asynchronous model we say that an execution is admissible if each processor has an infinite number of computation events, and every message sent is eventually delivered. The first of these requirements models the fact that processors do not fail. (It does not mean that a processor’s local program contains an infinite loop. An algorithm can still terminate by having a transition function not change a processors state after a certain point.)

We assume that each processor’s set of states includes a subset of terminated states. Once a processor enters such a state it remains in it. The algorithm has terminated if all processors are in terminated states and no messages are in transit.

The message complexity of an algorithm in the asynchronous model is the maximum over all admissible executions of the algorithm, of the total number of (point-to-point) messages sent.

A timed execution is an execution that has a nonnegative real number associated with each event, the time at which the event occurs. To measure the time complexity of an asynchronous algorithm we first assume that the maximum message delay in any execution is one unit of time. Hence the time complexity is the maximum time until termination among all timed admissible executions in which every message delay is at most one. Intuitively this can be viewed as taking any execution of the algorithm and normalizing it in such a way that the longest message delay becomes one unit of time.

21.1.3. Synchronous systems

In the synchronous model processors execute in lock-step. The execution is partitioned into rounds so that every processor can send a message to each neighbor, the messages are delivered, and every processor computes based on the messages just received. This model is very convenient for designing algorithms. Algorithms designed in this model can in many cases be automatically simulated to work in other, more realistic timing models.

In the synchronous model we say that an execution is admissible if it is infinite. From the round structure it follows then that every processor takes an infinite number of computation steps and that every message sent is eventually delivered. Hence in a synchronous system with no failures, once a (deterministic) algorithm has been fixed, the only relevant aspect determining an execution that can change is the initial configuration. On the other hand in an asynchronous system, there can be many different executions of the same algorithm, even with the same initial configuration and no failures, since here the interleaving of processor steps, and the message delays, are not fixed.

The notion of terminated states and the termination of the algorithm is defined in the same way as in the asynchronous model.

The message complexity of an algorithm in the synchronous model is the maximum over all admissible executions of the algorithm, of the total number of messages sent.

To measure time in a synchronous system we simply count the number of rounds
until termination. Hence the time complexity of an algorithm in the synchronous model is the maximum number of rounds in any admissible execution of the algorithm until the algorithm has terminated.

21.2. Basic algorithms

We begin with some simple examples of algorithms in the message passing model.

21.2.1. Broadcast

We start with a simple algorithm [?] for the (single message) broadcast problem, assuming that a spanning tree of the network graph with \( n \) nodes (processors) is already given. Later, we will remove this assumption. A processor \( p_i \) wishes to send a message \( M \) to all other processors. The spanning tree rooted at \( p_i \) is maintained in a distributed fashion: Each processor has a distinguished channel that leads to its parent in the tree as well as a set of channels that lead to its children in the tree. The root \( p_i \) sends the message \( M \) on all channels leading to its children. When a processor receives the message on a channel from its parent, it sends \( M \) on all channels leading to its children.

**Spanning tree broadcast algorithm for \( n \) processors.**

Initially \( M \) is in transit from \( p_i \) to all its children in the spanning tree.

Code for \( p_i \):

1. upon receiving no message: // first computation event by \( p_i \)
2. terminate

Code for \( p_j \), \( 0 \leq j \leq n - 1 \), \( j \neq i \):

3. upon receiving \( M \) from parent:
4. send \( M \) to all children
5. terminate

The Spanning tree broadcast algorithm is correct whether the system is synchronous or asynchronous. Moreover, the message and time complexities are the same in both models.

Using simple inductive arguments we will first prove a lemma that shows that by the end of round \( t \), the message \( M \) reaches all processors at distance \( t \) (or less) from \( p_i \) in the spanning tree.

**Lemma 21.1** \( \text{In every admissible execution of the broadcast algorithm in the synchronous model, every processor at distance } t \text{ from } p_r \text{ in the spanning tree receives the message } M \text{ in round } t. \)

**Proof.** We proceed by induction on the distance \( t \) of a processor from \( p_r \). First let \( t = 1 \). It follows from the algorithm that each child of \( p_r \) receives the message in round 1.
Assume that each processor at distance $t-1$ received the message $M$ in round $t-1$. We need to show that each processor $p_t$ at distance $t$ receives the message in round $t$. Let $p_a$ be the parent of $p_t$ in the spanning tree. Since $p_a$ is at distance $t-1$ from $p_v$, by the induction hypothesis, $p_a$ received $M$ in round $t-1$. By the algorithm, $p_t$ will hence receive $M$ in round $t$.

By Lemma 21.1 the time complexity of the broadcast algorithm is $d$, where $d$ is the depth of the spanning tree. Now since $d$ is at most $n - 1$ (when the spanning tree is a chain) we have:

**Theorem 21.2** There is a synchronous broadcast algorithm for $n$ processors with message complexity $n - 1$ and time complexity $d$, when a rooted spanning tree with depth $d$ is known in advance.

We now move to an asynchronous system and apply a similar analysis.

**Lemma 21.3** In every admissible execution of the broadcast algorithm in the asynchronous model, every processor at distance $t$ from $p_v$ in the spanning tree receives the message $M$ by time $t$.

**Proof:** We proceed by induction on the distance $t$ of a processor from $p_v$. First let $t = 1$. It follows from the algorithm that $M$ is initially in transit to each processor $p_t$ at distance 1 from $p_v$. By the definition of time complexity for the asynchronous model, $p_t$ receives $M$ by time 1.

Assume that each processor at distance $t - 1$ received the message $M$ at time $t - 1$. We need to show that each processor $p_t$ at distance $t$ receives the message by time $t$. Let $p_a$ be the parent of $p_t$ in the spanning tree. Since $p_a$ is at distance $t - 1$ from $p_v$, by the induction hypothesis, $p_a$ sends $M$ to $p_t$ when it receives $M$ at time $t - 1$. By the algorithm, $p_t$ will hence receive $M$ by time $t$.

We immediately obtain:

**Theorem 21.4** There is an asynchronous broadcast algorithm for $n$ processors with message complexity $n - 1$ and time complexity $d$, when a rooted spanning tree with depth $d$ is known in advance.

**21.2.2. Construction of a spanning tree**

The asynchronous algorithm called FLOOD, discussed next, constructs a spanning tree rooted at a designated processor $p_v$. The algorithm is similar to the Depth First Search (DFS) algorithm. However, unlike DFS where there is just one processor with "global knowledge" about the graph, in the FLOOD algorithm, each processor has "local knowledge" about the graph, processors coordinate their work by exchanging messages, and processors and messages may get delayed arbitrarily. This makes the design and analysis of the FLOOD algorithm challenging, because we need to show that the algorithm indeed constructs a spanning tree despite conspiratorial selection of these delays.

**Algorithm description**

Each processor has four local variables. The links adjacent to a processor are
identified with distinct numbers starting from 1 and stored in a local variable called neighbors. We will say that the spanning tree has been constructed, when the variable parent stores the identifier of the link leading to the parent of the processor in the spanning tree, except that this variable is NONE for the designated processor $pr$; children is a set of identifiers of the links leading to the children processors in the tree; and other is a set of identifiers of all other links. So the knowledge about the spanning tree may be “distributed” across processors.

The code of each processor is composed of segments. There is a segment (lines 1 to 4) that describes how local variables of a processor are initialized. Recall that the local variables are initialized that way before time 0. The next three segments (lines 5–11, 12–15 and 16–19) describe the instructions that any processor executes in response to having received a message: $<adopt>$, $<approved>$ or $<rejected>$. The last segment (lines 20 to 22) is only included in the code of processor $pr$. This segment is executed only when the local variable parent of processor $pr$ is NIL. At some point of time, it may happen that more than one segment can be executed by a processor (e.g., because the processor received $<adopt>$ messages from two processors). Then the processor executes the segments serially, one by one (segments of any given processor are never executed concurrently). However, instructions of different processor may be arbitrarily interleaved during an execution. Every message that can be processed is eventually processed and every segment that can be executed is eventually executed (fairness).
FLOOD

Code for any processor \( p_k \), \( 1 \leq k \leq n \)

1 \textbf{initialization}
2 \hspace{1em} parent := NIL
3 \hspace{1em} children := \emptyset
4 \hspace{1em} other := \emptyset

5 \textbf{process message} \(<\text{adopt}\>) that has arrived on link \( j \)
6 \hspace{1em} \textbf{if} parent = NIL \textbf{then}
7 \hspace{2em} parent := \( j \)
8 \hspace{2em} \text{send} \(<\text{approved}\)> to link \( j \)
9 \hspace{2em} \text{send} \(<\text{adopt}\)> to all links in neighbors \( \setminus \{j\} \)
10 \hspace{1em} \textbf{else}
11 \hspace{2em} \text{send} \(<\text{rejected}\)> to link \( j \)

12 \textbf{process message} \(<\text{approved}\>) that has arrived on link \( j \)
13 \hspace{1em} children := children \cup \{j\}
14 \hspace{1em} \textbf{if} children \cup other = neighbors \setminus \{parent\} \textbf{then}
15 \hspace{2em} \text{terminate}

16 \textbf{process message} \(<\text{rejected}\>) that has arrived on link \( j \)
17 \hspace{1em} other := other \cup \{j\}
18 \hspace{1em} \textbf{if} children \cup other = neighbors \setminus \{parent\} \textbf{then}
19 \hspace{2em} \text{terminate}

Extra code for the designated processor \( p_r \)

20 \hspace{1em} \textbf{if} parent = NIL \textbf{then}
21 \hspace{2em} parent := NONE
22 \hspace{2em} \text{send} \(<\text{adopt}\)> to all links in neighbors

Let us outline how the algorithm works. The designated processor sends an \(<\text{adopt}\>) message to all its neighbors, and assigns NONE to the parent variable (NIL and NONE are two distinguished values, different from any natural number), so that it never again sends the message to any neighbor.

When a processor processes message \(<\text{adopt}\>) for the first time, the processor assigns to its own parent variable the identifier of the link on which the message has arrived, responds with an \(<\text{approved}\)> message to that link, and forwards an \(<\text{adopt}\>) message to every other link. However, when a processor processes message \(<\text{adopt}\>) again, then the processor responds with a \(<\text{rejected}\>) message, because the parent variable is no longer NIL.

When a processor processes message \(<\text{approved}\>) , it adds the identifier of the link on which the message has arrived to the set children. It may turn out that the sets children and other combined form identifiers of all links adjacent to the processor except for the identifier stored in the parent variable. In this case the processor enters a terminating state.
21.2. Basic algorithms

When a processor processes message $\text{<rejected>}$, the identifier of the link is added to the set other. Again, when the union of children and other is large enough, the processor enters a terminating state.

**Correctness proof**

We now argue that the FLOOD algorithm constructs a spanning tree. The key moments in the execution of the algorithm are when any processor assigns a value to its parent variable. These assignments determine the “shape” of the spanning tree. The facts that any processor eventually executes an instruction, any message is eventually delivered, and any message is eventually processed, ensure that the knowledge about these assignments spreads to neighbors. Thus the algorithm is expanding a subtree of the graph, albeit the expansion may be slow. Eventually, a spanning tree is formed. Once a spanning tree has been constructed, eventually every processor will terminate, even though some processors may have terminated even before the spanning tree has been constructed.

**Lemma 21.5** For any $1 \leq k \leq n$, there is time $t_k$ which is the first moment when there are exactly $k$ processors whose parent variables are not NIL, and these processors and their parent variables form a tree rooted at $p_r$.

**Proof:** We prove the statement of the lemma by induction on $k$. For the base case, assume that $k = 1$. Observe that processor $p_r$ eventually assigns NONE to its parent variable. Let $t_1$ be the moment when this assignment happens. At that time, the parent variable of any processor other than $p_r$ is still NIL, because no $\text{<adopt>}$ messages have been sent so far. Processor $p_r$ and its parent variable form a tree with a single node and no arcs. Hence they form a rooted tree. Thus the inductive hypothesis holds for $k = 1$.

For the inductive step, suppose that $1 \leq k < n$ and that the inductive hypothesis holds for $k$. Consider the time $t_k$ which is the first moment when there are exactly $k$ processors whose parent variables are not NIL. Because $k < n$, there is a non-tree processor. But the graph $G$ is connected, so there is a non-tree processor adjacent to the tree. (For any subset $T$ of processors, a processor $p_i$ is adjacent to $T$ if and only if there is an edge in the graph $G$ from $p_i$ to a processor in $T$.) Recall that by definition, parent variable of such processor is NIL. By the inductive hypothesis, the $k$ processors must have executed line 7 of their code, and so each either has already sent or will eventually send $\text{<adopt>}$ message to all its neighbors on links other than the parent link. So the non-tree processors adjacent to the tree have already received or will eventually receive $\text{<adopt>}$ messages. Eventually, each of these adjacent processors will, therefore, assign a value other than NIL to its parent variable. Let $t_{k+1} > t_k$ be the first moment when any processor performs such assignment, and let us denote this processor by $p_i$. This cannot be a tree processor, because such processor never again assigns any value to its parent variable. Could $p_i$ be a non-tree processor that is not adjacent to the tree? It could not, because such processor does not have a direct link to a tree processor, so it cannot receive $\text{<adopt>}$ directly from the tree, and so this would mean that at some time $t'$ between $t_k$ and $t_{k+1}$ some other non-tree processor $p_j$ must have sent $\text{<adopt>}$ message to $p_i$, and so $p_j$ would have to assign a value other than NIL to its parent variable some time after $t_k$ but before $t_{k+1}$, contradicting the fact the $t_{k+1}$ is the first such
moment. Consequently, \( p_i \) is a non-tree processor adjacent to the tree, such that, at time \( t_{k+1} \), \( p_i \) assigns to its parent variable the index of a link leading to a tree processor. Therefore, time \( t_{k+1} \) is the first moment when there are exactly \( k + 1 \) processors whose parent variables are not NIL, and, at that time, these processors and their parent variables form a tree rooted at \( p_r \). This completes the inductive step, and the proof of the lemma.

**Theorem 21.6** Eventually each processor terminates, and when every processor has terminated, the subgraph induced by the parent variables forms a spanning tree rooted at \( p_r \).

**Proof.** By Lemma \([21.5]\) we know that there is a moment \( t_n \) which is the first moment when all processors and their parent variables form a spanning tree.

Is it possible that every processor has terminated before time \( t_n \)? By inspecting the code, we see that a processor terminates only after it has received <rejected> or <approved> messages from all its neighbors other than the one to which parent link leads. A processor receives such messages only in response to <adopt> messages that the processor sends. At time \( t_n \), there is a processor that still has not even sent <adopt> messages. Hence, not every processor has terminated by time \( t_n \).

Will every processor eventually terminate? We notice that by time \( t_n \), each processor either has already sent or will eventually send <adopt> message to all its neighbors other than the one to which parent link leads. Whenever a processor receives <adopt> message, the processor responds with <rejected> or <approved>, even if the processor has already terminated. Hence, eventually, each processor will receive either <rejected> or <approved> message on each link to which the processor has sent <adopt> message. Thus, eventually, each processor terminates. ■

We note that the fact that a processor has terminated does not mean that a spanning tree has already been constructed. In fact, it may happen that processors in a different part of the network have not even received any message, let alone terminated.

**Theorem 21.7** Message complexity of the FLOOD algorithm is \( O(e) \), where \( e \) is the number of edges in the graph \( G \).

The proof of this theorem is left as an exercise.

**Exercises**

21.2-1 It may happen that a processor has terminated even though a processor has not even received any message. Show a simple network and how to delay message delivery and processor computation to demonstrate that this can indeed happen.

21.2-2 It may happen that a processor has terminated but may still respond to a message. Show a simple network and how to delay message delivery and processor computation to demonstrate that this can indeed happen.

21.2-3 Prove that the FLOOD algorithm sends \( O(e) \) messages in any execution, given a graph \( G \) with \( n \) nodes and \( e \) links. What is the exact number of messages as a function of the number of nodes and edges in the graph?
21.3. Ring Algorithms

One often needs to coordinate the activities of processors in a distributed system. This can frequently be simplified when there is a single processor that acts as a coordinator. Initially, the system may not have any coordinator, or an existing coordinator may fail and so another may need to be elected. This creates the problem where processors must elect exactly one among them, a leader. In this section we study the problem for special types of networks — rings. We will develop an asynchronous algorithm for the problem. As we shall demonstrate, the algorithm has asymptotically optimal message complexity. In the current section, we will see a distributed analogue of the well-known divide-and-conquer technique often used in sequential algorithms to keep their time complexity low. The technique used in distributed systems helps reduce the message complexity.

21.3.1. The leader election problem

The leader election problem is to elect exactly leader among a set of processors. Formally each processor has a local variable leader initially equal to NIL. An algorithm is said to solve the leader election problem if it satisfies the following conditions:

1. in any execution, exactly one processor eventually assigns TRUE to its leader variable, all other processors eventually assign FALSE to their leader variables, and

2. in any execution, once a processor has assigned a value to its leader variable, the variable remains unchanged.

**Ring model**

We study the leader election problem on a special type of network — the ring. Formally, the graph \( G \) that models a distributed system consists of \( n \) nodes that form a simple cycle; no other edges exist in the graph. The two links adjacent to a processor are labeled CW (clock-wise) and CCW (counter clock-wise). Processors agree on the orientation of the ring i.e., if a message is passed on in CW direction \( n \) times, then it visits all \( n \) processors and comes back to the one that initially sent the message; same for CCW direction. Each processor has a unique identifier that is a natural number, i.e., the identifier of each processor is different from the identifier of any other processor; the identifiers do not have to be consecutive numbers \( 1, \ldots, n \). Initially, no processor knows the identifier of any other processor. Also processors do not know the size \( n \) of the ring.

21.3.2. The Leader Election Algorithm

The algorithm BULLY elects a leader among asynchronous processors \( p_1, \ldots, p_n \). Identifiers of processors are used by the algorithm in a crucial way. Briefly speaking, each processor tries to become the leader, the processor that has the largest identifier among all processors blocks the attempts of other processors, declares itself to be the leader, and forces others to declare themselves not to be leaders.

Let us begin with a simpler version of the algorithm to exemplify some of the
ideas of the algorithm. Suppose that each processor sends a message around the ring containing the identifier of the processor. Any processor passes on such message only if the identifier that the message carries is strictly larger than the identifier of the processor. Thus the message sent by the processor that has the largest identifier among the processors of the ring, will always be passed on, and so it will eventually travel around the ring and come back to the processor that initially sent it. The processor can detect that such message has come back, because no other processor sends a message with this identifier (identifiers are distinct). We observe that, no other message will make it all around the ring, because the processor with the largest identifier will not pass it on. We could say that the processor with the largest identifier “swallows” these messages that carry smaller identifiers. Then the processor becomes the leader and sends a special message around the ring forcing all others to decide not to be leaders. The algorithm has \( \Theta(n^2) \) message complexity, because each processor induces at most \( n \) messages, and the leader induces \( n \) extra messages; and one can assign identifiers to processors and delay processors and messages in such a way that the messages sent by a constant fraction of \( n \) processors are passed on around the ring for a constant fraction of \( n \) hops. The algorithm can be improved so as to reduce message complexity to \( O(n \log n) \), and such improved algorithm will be presented in the remainder of the section.

The key idea of the BULLY algorithm is to make sure that not too many messages travel far, which will ensure \( O(n \log n) \) message complexity. Specifically, the activity of any processor is divided into phases. At the beginning of a phase, a processor sends “probe” messages in both directions: CW and CCW. These messages carry the identifier of the sender and a certain “time-to-live” value that limits the number of hops that each message can make. The probe message may be passed on by a processor provided that the identifier carried by the message is larger than the identifier of the processor. When the message reaches the limit, and has not been swallowed, then it is “bounced back”. Hence when the initial sender receives two bounced back messages, each from each direction, then the processor is certain that there is no processor with larger identifier up until the limit in CW nor CCW directions, because otherwise such processor would swallow a probe message. Only then does the processor enter the next phase through sending probe messages again, this time with the time-to-live value increased by a factor, in an attempt to find if there is no processor with a larger identifier in twice as large neighborhood. As a result, a probe message that the processor sends will make many hops only when there is no processor with larger identifier in a large neighborhood of the processor. Therefore, fewer and fewer processors send messages that can travel longer and longer distances. Consequently, as we will soon argue in detail, message complexity of the algorithm is \( O(n \log n) \).

We detail the BULLY algorithm. Each processor has five local variables. The variable \( id \) stores the unique identifier of the processor. The variable \( leader \) stores TRUE when the processor decides to be the leader, and FALSE when it decides not to be the leader. The remaining three variables are used for bookkeeping: \( asleep \) determines if the processor has ever sent a \( \langle \text{probe}, id, 0, 0 \rangle \) message that carries the identifier \( id \) of the processor. Any processor may send \( \langle \text{probe}, id, \text{phase}, 2^{\text{phase}} - 1 \rangle \) message in both directions (CW and CCW) for different values of \( \text{phase} \). Each time a
message is sent, a \(<\text{reply}, \text{id}, \text{phase}\)> message may be sent back to the processor. The variables \textit{CW replied} and \textit{CCW replied} are used to remember whether the replies have already been processed the processor.

The code of each processor is composed of five segments. The first segment (lines 1 to 5) initializes the local variables of the processor. The second segment (lines 6 to 8) can only be executed when the local variable \textit{asleep} is \text{TRUE}. The remaining three segments (lines 9 to 17, 18 to 26, and 27 to 31) describe the actions that the processor takes when it processes each of the three types of messages: \(<\text{probe}, \text{id}, \text{phase}, \text{ttl}\>\), \(<\text{reply}, \text{id}, \text{phase}\>) and \(<\text{terminate}\>) respectively. The messages carry parameters \textit{id}s, \textit{phase} and \textit{ttl} that are natural numbers.

We now describe how the algorithm works. Recall that we assume that the local variables of each processor have been initialized before time 0 of the global clock. Each processor eventually sends a \(<\text{probe}, \text{id}, 0, 0\>) message carrying the identifier \textit{id} of the processor. At that time we say that the processor \textit{enters} phase number \text{zero}. In general, when a processor sends a message \(<\text{probe}, \text{id}, \text{phase}, 2\times\text{phase} - 1\>)\,\text{we say that the processor enters phase number} \text{phase}. Message \(<\text{probe}, \text{id}, 0, 0\>) is never sent again because \text{FALSE} is assigned to \textit{asleep} in line 7. It may happen that by the time this message is sent, some other messages have already been processed by the processor.

When a processor processes message \(<\text{probe}, \text{id}, \text{phase}, \text{ttl}\>) that has arrived on link \textit{CW} (the link leading in the clock-wise direction), then the actions depend on the relationship between the parameter \textit{id}s and the identifier \textit{id} of the processor. If \textit{id}s is smaller than \textit{id}, then the processor does nothing else (the processor swallows the message). If \textit{id}s is equal to \textit{id} and processor has not yet decided, then, as we shall see, the probe message that the processor sent has circulated around the entire ring. Then the processor sends a \(<\text{terminate}\>) message, decides to be the leader, and terminates (the processor may still process messages after termination). If \textit{id}s is larger than \textit{id}, then actions of the processor depend on the value of the parameter \textit{ttl} (time-to-live). When the value is strictly larger than \text{zero}, then the processor passes on the probe message with \textit{ttl} decreased by one. If, however, the value of \textit{ttl} is already \text{zero}, then the processor sends back (in the \textit{CW} direction) a reply message. Symmetric actions are executed when the \(<\text{probe}, \text{id}, \text{phase}, \text{ttl}\>) message has arrived on link \textit{CCW}, in the sense that the directions of sending messages are respectively reversed – see the code for details.
BULLY

Code for any processor $p_k$, $1 \leq k \leq n$
1 **initialization**
2     asleep := TRUE
3     CWreplied := FALSE
4     CCWreplied := FALSE
5     leader := NIL
6     if asleep then
7         asleep := FALSE
8         send <-probe, id, 0, 0> to links CW and CCW
9 **process message** <-probe, ids, phase, ttl> that has arrived on link CW (resp. CCW)
10         if id = ids and leader = NIL then
11             send <-terminate> to link CCW
12             leader := TRUE
13             terminate
14         if ids > id and ttl > 0 then
15             send <-probe, ids, phase, ttl − 1> to link CCW (resp. CW)
16         if ids > id and ttl = 0 then
17             send <-reply, ids, phase> to link CW (resp. CCW)
18 **process message** <-reply, ids, phase> that has arrived on link CW (resp. CCW)
19         if id ≠ ids then
20             send <-reply, ids, phase> to link CCW (resp. CW)
21         else
22             CWreplied := TRUE (resp. CCWreplied)
23             if CWreplied and CCWreplied then
24                 CWreplied := FALSE
25                 CCWreplied := FALSE
26                 send <-probe, id, phase + 1, 2^{phase+1} − 1> to links CW and CCW
27 **process message** <-terminate> that has arrived on link CW
28         if leader = NIL then
29             send <-terminate> to link CCW
30             leader := FALSE
31             terminate

When a processor processes message <-reply, ids, phase> that has arrived on link CW, then the processor first checks if ids is different from the identifier id of the processor. If so, the processor merely passes on the message. However, if ids = id, then the processor records the fact that a reply has been received from direction CW, by assigning TRUE to CWreplied. Next the processor checks if both CWreplied and CCWreplied variables are TRUE. If so, the processor has received replies from both directions. Then the processor assigns FALSE to both variables. Next the processor
21.3. Ring Algorithms

sends a probe message. This message carries the identifier id of the processor, the next phase number phase + 1, and an increased time-to-live parameter \(2^{\text{phase}+1} - 1\). Symmetric actions are executed when \(<\text{reply, ids, phase}>\) has arrived on link CCW.

The last type of message that a processor can process is <terminate>. The processor checks if it has already decided to be or not to be the leader. When no decision has been made so far, the processor passes on the <terminate> message and decides not to be the leader. This message eventually reaches a processor that has already decided, and then the message is no longer passed on.

21.3.3. Analysis of the Leader Election Algorithm

We begin the analysis by showing that the BULLY algorithm solves the leader election problem.

Correctness proof

**Theorem 21.8** The BULLY algorithm solves the leader election problem on any ring with asynchronous processors.

**Proof.** We need to show that the two conditions listed at the beginning of the section are satisfied. The key idea that simplifies the argument is to focus on one processor. Consider the processor \(p_i\) with maximum id among all processors in the ring. This processor eventually executes lines 6 to 8. Then the processor sends \(<\text{probe, id, 0, 0}>\) messages in CW and CCW directions. Note that whenever the processor sends \(<\text{probe, id, phase, 2}\text{phase} - 1>\) messages, each such message is always passed on by other processors, until the ttl parameter of the message drops down to zero, or the message travels around the entire ring and arrives at \(p_i\). If the message never arrives at \(p_i\), then a processor eventually receives the probe message with ttl equal to zero, and the processor sends a response back to \(p_i\). Then, eventually \(p_i\) receives messages \(<\text{reply, id, phase}>\) from each direction, and enters phase number phase + 1 by sending probe messages \(<\text{probe, id, phase + 1, 2}\text{phase} + 1 - 1>\) in both directions. These messages carry a larger time-to-live value compared to the value from the previous phase number phase. Since the ring is finite, eventually ttl becomes so large that processor \(p_i\) receives a probe message that carries the identifier of \(p_i\). Note that \(p_i\) will eventually receive two such messages. The first time when \(p_i\) processes such message, the processor sends a <terminate> message and terminates as the leader. The second time when \(p_i\) processes such message, lines 11 to 13 are not executed, because variable leader is no longer NIL. Note that no other processor \(p_j\) can execute lines 11 to 13, because a probe message originated at \(p_j\) cannot travel around the entire ring, since \(p_i\) is on the way, and \(p_i\) would swallow the message; and since identifiers are distinct, no other processor sends a probe message that carries the identifier of processor \(p_j\). Thus no processor other than \(p_i\) can assign TRUE to its leader variable. Any processor other than \(p_i\) will receive the <terminate> message, assign FALSE to its leader variable, and pass on the message. Finally, the <terminate> message will arrive at \(p_i\), and \(p_i\) will not pass it anymore. The argument presented thus far ensures that eventually exactly one processor assigns TRUE to its leader variable, all other processors assign FALSE to their leader variables, and once a processor has assigned a value to its leader variable, the variable remains
unchanged.

Our next task is to give an upper bound on the number of messages sent by the algorithm. The subsequent lemma shows that the number of processors that can enter a phase decays exponentially as the phase number increases.

Lemma 21.9  Given a ring of size $n$, the number $k$ of processors that enter phase number $i \geq 0$ is at most $n/2^{2i-1}$.

Proof. There are exactly $n$ processors that enter phase number $i = 0$, because each processor eventually sends $\langle \text{probe}, \text{id}, 0, 0 \rangle$ message. The bound stated in the lemma says that the number of processors that enter phase 0 is at most $2n$, so the bound evidently holds for $i = 0$. Let us consider any of the remaining cases i.e., let us assume that $i \geq 1$. Suppose that a processor $p_j$ enters phase number $i$, and so by definition it sends message $\langle \text{probe}, \text{id}, i, 2^i - 1 \rangle$. In order for a processor to send such message, each of the two probe messages $\langle \text{probe}, \text{id}, i - 1, 2^{i-1} - 1 \rangle$ that the processor sent in the previous phase in both directions must have made $2^{i-1}$ hops always arriving at a processor with strictly lower identifier than the identifier of $p_j$ (because otherwise, if a probe message arrives at a processor with strictly larger or the same identifier, than the message is swallowed, and so a reply message is not generated, and consequently $p_j$ cannot enter phase number $i$). As a result, if a processor enters phase number $i$, then there is no other processor $2^{i-1}$ hops away in both directions that can enter the phase. Suppose that there are $k \geq 1$ processors that enter phase $i$. We can associate with each such processor $p_j$, the $2^{i-1}$ consecutive processors that follow $p_j$ in the CW direction. This association assigns $2^{i-1}$ distinct processors to each of the $k$ processors. So there must be at least $k + k \cdot 2^{i-1}$ distinct processor in the ring. Hence $k(1 + 2^{i-1}) \leq n$, and so we can weaken this bound by dropping 1, and conclude that $k \cdot 2^{i-1} \leq n$, as desired.

Theorem 21.10  The BULLY algorithm has $O(n \log n)$ message complexity, where $n$ is the size of the ring.

Proof. Note that any processor in phase $i$, sends messages that are intended to travel $2^i$ away and back in each direction (CW and CCW). This contributes at most $4 \cdot 2^i$ messages per processor that enters phase number $i$. The contribution may be smaller than $4 \cdot 2^i$ if a probe message gets swallowed on the way away from the processor. Lemma 21.9 provides an upper bound on the number of processors that enter phase number $k$. What is the highest phase that a processor can ever enter? The number $k$ of processors that can be in phase $i$ is at most $n/2^{i-1}$. So when $n/2^{i-1} < 1$, then there can be no processor that ever enters phase $i$. Thus no processor can enter any phase beyond phase number $h = 1 + \lceil \log_2 n \rceil$, because $n < 2^{(h+1)-1}$. Finally, a single processor sends one termination message that travels around the ring once. So the total number of messages sent by the algorithm is at most

\[ n + \sum_{i=0}^{1 + \lceil \log_2 n \rceil} \left( \frac{n}{2^{i-1}} \cdot 4 \cdot 2^i \right) = n + \sum_{i=0}^{1 + \lceil \log_2 n \rceil} 8n = O(n \log n) \]

\[ \]
Burns [?] furthermore showed that the asynchronous leader election algorithm is asymptotically optimal: Any uniform algorithm for electing a leader in an asynchronous ring sends at least \( \Omega(n \log n) \) messages.

**Theorem 21.11** Any uniform algorithm for electing a leader in an asynchronous ring sends at least \( \Omega(n \log n) \) messages.

The proof builds wasteful executions of any algorithm on rings of size \( n/2 \). Then two rings of size \( n/2 \) are pasted together in such a way that the wasteful executions of the smaller rings can be combined and \( \Theta(n) \) additional messages are received.

**Exercises**

21.3-1 Show that the simplified BULLY algorithm has \( \Omega(n^2) \) message complexity, by appropriately assigning identifiers to processors on a ring of size \( n \), and by determining how to delay processors and messages.

21.3-2 Show that the BULLY algorithm has \( \Omega(n \log n) \) message complexity.

21.3-3 Assume that messages can only be sent in CW direction, and design an asynchronous algorithm for leader election on a ring that has \( O(n \log n) \) message complexity. **Hint.** Let processors work in phases. Each processor begins in the active mode with a value equal to the identifier of the processor, and under certain conditions can enter the relay mode, where it just relays messages. An active processor waits for messages from two active processors, and then inspects the values sent by the processors, and decides whether to become the leader, remain active and adopt one of the values, or start relaying. Determine how the decisions should be made so as to ensure that if there are three or more active processors, then at least one will remain active; and no matter what values active processors have in a phase, at most half of them will still be active in the next phase.

### 21.4. Fault-Tolerant Consensus

The algorithms presented so far are based on the assumption that the system on which they run is reliable. Here we present selected algorithms for unreliable distributed systems, where the active (or correct) processors need to coordinate their activities based on common decisions.

It is inherently difficult for processors to reach agreement in a distributed setting prone to failures. Consider the deceptively simple problem of two failure-free processors attempting to agree on a common bit using a communication medium where messages may be lost. This problem is known as the two generals problem [9]. Here two generals must coordinate an attack using couriers that may be destroyed by the enemy. It turns out that it is not possible to solve this problem using a finite number of messages. We prove this fact by contradiction. Assume that there is a protocol used by processors \( A \) and \( B \) involving a finite number of messages. Let us consider such a protocol that uses the smallest number of messages, say \( k \) messages. Assume without loss of generality that the last \( k^{th} \) message is sent from \( A \) to \( B \).
Since this final message is not acknowledged by $B$, $A$ must determine the decision value whether or not $B$ receives this message. Since the message may be lost, $B$ must determine the decision value without receiving this final message. But now both $A$ and $B$ decide on a common value without needing the $k^{th}$ message. In other words, there is a protocol that uses only $k - 1$ messages for the problem. But this contradicts the assumption that $k$ is the smallest number of messages needed to solve the problem.

In the rest of this section we consider agreement problems where the communication medium is reliable, but where the processors are subject to two types of failures: crash failures, where a processor stops and does not perform any further actions, and byzantine failures, where a processor may exhibit arbitrary, or even malicious, behavior as the result of the failure.

The algorithms presented deal with the so called consensus problem, first introduced by Lamport, Pease, and Shostak [12][16]. The consensus problem is a fundamental coordination problem that requires processors to agree on a common output, based on their possibly conflicting inputs.

### 21.4.1. The Consensus Problem

We consider a system in which each processor $p_i$ has a special state component $x_i$, called the input and $y_i$, called the output (also called the decision). The variable $x_i$ initially holds a value from some well ordered set of possible inputs and $y_i$ is undefined. Once an assignment to $y_i$ has been made, it is irreversible. Any solution to the consensus problem must guarantee:

- **Termination:** In every admissible execution, $y_i$ is eventually assigned a value, for every nonfaulty processor $p_i$.
- **Agreement:** In every execution, if $y_i$ and $y_j$ are assigned, then $y_i = y_j$, for all nonfaulty processors $p_i$ and $p_j$. That is nonfaulty processors do not decide on conflicting values.
- **Validity:** In every execution, if for some value $v$, $x_i = v$ for all processors $p_i$, and if $y_i$ is assigned for some nonfaulty processor $p_i$, then $y_i = v$. That is, if all processors have the same input value, then any value decided upon must be that common input.

Note that in the case of crash failures this validity condition is equivalent to requiring that every nonfaulty decision value is the input of some processor. Once a processor crashes it is of no interest to the algorithm, and no requirements are put on its decision.

We begin by presenting a simple algorithm for consensus in a synchronous message passing system with crash failures.

### 21.4.2. Consensus with Crash Failures

Since the system is synchronous, an execution of the system consists of a series of rounds. Each round consists of the delivery of all messages, followed by one computation event for every processor. The set of faulty processors can be different in
different executions, that is, it is not known in advance. Let $F$ be a subset of at most $f$ processors, the faulty processors. Each round contains exactly one computation event for the processors not in $F$ and at most one computation event for every processor in $F$. Moreover, if a processor in $F$ does not have a computation event in some round, it does not have such an event in any further round. In the last round in which a faulty processor has a computation event, an arbitrary subset of its outgoing messages are delivered.

**Consensus in the presence of crash failures:**

Code for processor $p_i$, $0 \leq i \leq n - 1$.
Initially $V = \{x\}$
round $k$, $1 \leq k \leq f + 1$
1. send $\{v \in V : p_i \text{ has not already sent } v\}$ to all processors
2. receive $S_j$ from $p_j$, $0 \leq j \leq n - 1$, $j \neq i$
3. $V := V \cup \bigcup_{j=0}^{n-1} S_j$
4. if $k = f + 1$ then $y = \min(V)$

In the previous algorithm, which is based on an algorithm by Dolev and Strong [3], each processor maintains a set of the values it knows to exist in the system. Initially, the set contains only its own input. In later rounds the processor updates its set by joining it with the sets received from other processors. It then broadcasts any new additions to the set of all processors. This continues for $f + 1$ rounds, where $f$ is the maximum number of processors that can fail. At this point, the processor decides on the smallest value in its set of values.

To prove the correctness of this algorithm we first notice that the algorithm requires exactly $f + 1$ rounds. This implies termination. Moreover the validity condition is clearly satisfied since the decision value is the input of some processor. It remains to show that the agreement condition holds. We prove the following lemma:

**Lemma 21.12** In every execution at the end of round $f + 1$, $V_i = V_j$, for every two non-faulty processors $p_i$ and $p_j$.

**Proof.** We prove the claim by showing that if $x \in V_i$ at the end of round $f + 1$ then $x \in V_j$ at the end of round $f + 1$.

Let $r$ be the first round in which $x$ is added to $V_i$ for any nonfaulty processor $p_i$. If $x$ is initially in $V_i$ let $r = 0$. If $r \leq f$ then, in round $r + 1 \leq f + 1$ $p_i$ sends $x$ to each $p_j$, causing $p_j$ to add $x$ to $V_j$, if not already present.

Otherwise, suppose $r = f + 1$ and let $p_j$ be a nonfaulty processor that receives $x$ for the first time in round $f + 1$. Then there must be a chain of $f + 1$ processors $p_i, \ldots, p_{i+j}$ that transfers the value $x$ to $p_j$. Hence $p_i$ sends $x$ to $p_{i+1}$ in round one etc. until $p_{i+j}$ sends $x$ to $p_j$ in round $f + 1$. But then $p_{i+1}, \ldots, p_{i+j}$ is a chain of $f + 1$ processors. Hence at least one of them, say $p_{ik}$, must be nonfaulty. Hence $p_{ik}$ adds $x$ to its set in round $k - 1 < r$, contradicting the minimality of $r$. $\blacksquare$

This lemma together with the before mentioned observations hence implies the following theorem.
Theorem 21.13  The previous consensus algorithm solves the consensus problem in the presence of $f$ crash failures in a message passing system in $f + 1$ rounds.

The following theorem was first proved by Fischer and Lynch [7] for Byzantine failures. Dolev and Strong [6] later extended it to crash failures. The Theorem shows that the previous algorithm, assuming the given model, is optimal.

Theorem 21.14  There is no algorithm which solves the consensus problem in less than $f + 1$ rounds in the presence of $f$ crash failures, if $n \geq f + 2$.

What if failures are not benign? That is can the consensus problem be solved in the presence of Byzantine failures? And if so, how?

21.4.3. Consensus with Byzantine Failures

In a computation step of a faulty processor in the Byzantine model, the new state of the processor and the message sent are completely unconstrained. As in the reliable case, every processor takes a computation step in every round and every message sent is delivered in that round. Hence a faulty processor can behave arbitrarily and even maliciously. For example, it could send different messages to different processors. It can even appear that the faulty processors coordinate with each other. A faulty processor can also mimic the behavior of a crashed processor by failing to send any messages from some point on.

In this case, the definition of the consensus problem is the same as in the message passing model with crash failures. The validity condition in this model, however, is not equivalent with requiring that every nonfaulty decision value is the input of some processor. Like in the crash case, no conditions are put on the output of faulty processors.

21.4.4. Lower Bound on the Ratio of Faulty Processors

Pease, Shostak and Lamport [16] first proved the following theorem.

Theorem 21.15  In a system with $n$ processors and $f$ Byzantine processors, there is no algorithm which solves the consensus problem if $n \leq 3f$.

21.4.5. A Polynomial Algorithm

The following algorithm uses messages of constant size, takes $2(f + 1)$ rounds, and assumes that $n > 4f$. It was presented by Berman and Garay [4].

This consensus algorithm for Byzantine Failures contains $f + 1$ phases, each taking two rounds. Each processor has a preferred decision for each phase, initially its input value. At the first round of each phase, processors send their preferences to each other. Let $v_i^k$ be the majority value in the set of values received by processor $p_i$ at the end of the first round of phase $k$. If no majority exists, a default value $v_\perp$ is used. In the second round of the phase processor $p_k$, called the king of the phase, sends its majority value $v_k^k$ to all processors. If $p_i$ receives more than $n/2 + f$ copies
of \(v_k\) (in the first round of the phase) then it sets its preference for the next phase to be \(v_k^i\); otherwise it sets its preference to the phase king's preference, \(v_k^i\) received in the second round of the phase. After \(f+1\) phases, the processor decides on its preference. Each processor maintains a local array \(\text{pref}\) with \(4n\) entries.

We prove correctness using the following lemmas. Termination is immediate. We next note the persistence of agreement:

**Lemma 21.16** If all nonfaulty processors prefer \(v\) at the beginning of phase \(k\), then they all prefer \(v\) at the end of phase \(k\), for all \(k, 1 \leq k \leq f + 1\).

**Proof.** Since all nonfaulty processors prefer \(v\) at the beginning of phase \(k\), they all receive at least \(n - f\) copies of \(v\) (including their own) in the first round of phase \(k\). Since \(n > 4f\), \(n - f > n/2 + f\), implying that all nonfaulty processors will prefer \(v\) at the end of phase \(k\). \(\blacksquare\)

**Consensus in the presence of Byzantine failures:**

Code for processor \(p_i, 0 \leq i \leq n - 1\).
- Initially \(\text{pref}[j] = v_\perp\), for any \(j \neq i\)
- Round \(2k - 1, 1 \leq k \leq f + 1\)
  1. Send \(\langle\text{pref}[i]\rangle\) to all processors
  2. Receive \(\langle u_j\rangle\) from \(p_j\) and assign to \(\text{pref}[j]\), for all \(0 \leq j \leq n - 1, j \neq i\)
  3. Let \(\text{maj}\) be the majority value of \(\text{pref}[0], \ldots, \text{pref}[n - 1](v_\perp\text{if none})\)
  4. Let \(\text{mult}\) be the multiplicity of \(\text{maj}\)

  Round \(2k, 1 \leq k \leq f + 1\)

  5. If \(i = k\) then send \(\langle\text{maj}\rangle\) to all processors
  6. Receive \(\langle\text{king-maj}\rangle\) from \(p_k\) (\(v_\perp\text{if none}\))
  7. If \(\text{mult} > \frac{n}{2} + f\)
     8. Then \(\text{pref}[i] := \text{maj}\)
     9. Then \(\text{pref}[i] := \text{king-maj}\)
  10. If \(k = f + 1\) then \(y := \text{pref}[i]\)

This implies the validity condition: If they all start with the same input \(v\) they will continue to prefer \(v\) and finally decide on \(v\) in phase \(f + 1\). Agreement is achieved by the king breaking ties. Since each phase has a different king and there are \(f + 1\) phases, at least one round has a nonfaulty king.

**Lemma 21.17** Let \(q\) be a phase whose king \(p_q\) is nonfaulty. Then all nonfaulty processors finish phase \(q\) with the same preference.

**Proof.** Suppose all nonfaulty processors use the majority value received from the king for their preference. Since the king is nonfaulty, it sends the same message and hence all the nonfaulty preferences are the same.

Suppose a nonfaulty processor \(p_i\) uses its own majority value \(v\) for its preference.
Thus \( p_i \) receives more than \( n/s + f \) messages for \( v \) in the first round of phase \( g \).
Hence every processor, including \( p_g \) receives more than \( n/2 \) messages for \( v \) in the first round of phase \( g \) and sets its majority value to \( v \). Hence every nonfaulty processor has \( v \) for its preference.

Hence at phase \( g+1 \) all processors have the same preference and by Lemma 21.16 they will decide on the same value at the end of the algorithm. Hence the algorithm has the agreement property and solves consensus.

**Theorem 21.18** There exists an algorithm for \( n \) processors which solves the consensus problem in the presence of \( f \) Byzantine failures within \((2(f + 1)) \) rounds using constant size messages, if \( n > 4f \).

### 21.4.6. Impossibility in Asynchronous Systems

As shown before, the consensus problem can be solved in synchronous systems in the presence of both crash (benign) and Byzantine (severe) failures. What about asynchronous systems? Under the assumption that the communication system is completely reliable, and the only possible failures are caused by unreliable processors, it can be shown that if the system is completely asynchronous that there is no consensus algorithm even in the presence of only a single processor failure. The result holds even if the processors only fail by crashing. The impossibility proof relies heavily on the system being asynchronous. This result was first shown in a breakthrough paper by Fischer, Lynch and Paterson [8]. It is one of the most influential results in Distributed Computing.

The impossibility holds for both shared memory systems if only read/write registers are used, and for message passing systems. The proof first shows it for shared memory systems. The result for message passing systems can then be obtained through simulation.

**Theorem 21.19** There is no consensus algorithm for a read/write asynchronous shared memory system that can tolerate even a single crash failure.

And through simulation it can be shown:

**Theorem 21.20** There is no algorithm for solving the consensus problem in an asynchronous message passing system with \( n \) processors, one of which may fail by crashing.

Note that these results do not mean that consensus can never be solved in asynchronous systems. Rather the results mean that there are no algorithms that guarantee termination, agreement, and validity, in all executions. It is reasonable to assume that agreement and validity are essential, that is, if a consensus algorithm terminates, then agreement and validity are guaranteed. In fact there are efficient and useful algorithms for the consensus problem that are not guaranteed to terminate in all executions. In practice this is often sufficient because the special conditions that cause non-termination may be quite rare. Additionally, since in many real systems one can make some timing assumption, it may not be necessary to provide a solution.
Exercises

21.4-1 Show that the validity condition is equivalent to requiring that every non-faulty processor decision be the input of some processor.

21.4-2 Prove the correctness of Algorithm consensus-crash.

21.4-3 Prove the correctness of the consensus algorithm in the presence of Byzantine Failures.

21.4-4 Prove Theorem 21.20

21.4-5 An alternative version of the consensus problem requires that the input value of one distinguished processor (the general) be distributed to all the other processors (the lieutenants). This problem is also called single source consensus. The conditions that need to be satisfied are:

- **Termination:** Every nonfaulty lieutenant must eventually decide.

- **Agreement:** All the nonfaulty lieutenants must have the same decision.

- **Validity:** If the general is nonfaulty, then the common decision value is the general’s input.

So if the general is faulty, then the nonfaulty processors need not decide on the general’s input, but they must still agree with each other. Consider the synchronous message passing system with Byzantine faults. Show how to transform a solution to the consensus problem 21.4.5 into a solution to the general’s problem and vice versa. What are the message and round overheads of your transformation? 21

2.1.5. Logical Time, Causality, and Consistent State

In a distributed system it is often useful to compute a global state that consists of the states of all processors. Having access to the global can allow us to reason about the system properties that depend on all processors, for example to be able to detect a deadlock. One may attempt to compute global state by stopping all processors, and then gathering their states to a central location. Such a method is well-suited for many distributed systems that must continue computation at all times. This section discusses how one can compute global state that is quite intuitive, yet consistent, in a precise sense. We first discuss a distributed algorithm that imposes a global order on instructions of processors. This algorithm creates the illusion of a global clock available to processors. Then we introduce the notion of one instruction causally affecting other instruction, and an algorithm for computing which instruction affects which. The notion turns out to be very useful in defining a consistent global state of distributed system. We close the section with distributed algorithms that compute a consistent global state of distributed system.
21.5.1. Logical time

The design of distributed algorithms is easier when processors have access to (Newtonian) global clock, because then each event that occurs in the distributed system can be labeled with the reading of the clock, processors agree on the ordering of any events, and this consensus can be used by algorithms to make decisions. However, construction of a global clock is difficult. There exist algorithms that approximate the ideal global clock by periodically synchronizing drifting local hardware clocks. However, it is possible to totally order events without using hardware clocks. This idea is called the logical clock.

Recall that an execution is an interleaving of instructions of the $n$ programs. Each instruction can be either a computational step of a processor, or sending a message, or receiving a message. Any instruction is performed at a distinct point of global time. However, the reading of the global clock is not available to processors. Our goal is to assign values of the logical clock to each instruction, so that these values appear to be readings of the global clock. That is, it possible to postpone or advance the instants when instructions are executed in such a way, that each instruction $x$ that has been assigned a value $t_x$ of the logical clock, is executed exactly at the instant $t_x$ of the global clock, and that the resulting execution is a valid one, in the sense that it can actually occur when the algorithm is run with the modified delays.

The Logical Clock algorithm assigns logical time to each instruction. Each processor has a local variable called counter. This variable is initially zero and it gets incremented every time processor executes an instruction. Specifically, when a processor executes any instruction other than sending or receiving a message, the variable counter gets incremented by one. When a processor sends a message, it increments the variable by one, and attaches the resulting value to the message. When a processor receives a message, then the processor retrieves the value attached to the message, then calculates the maximum of the value and the current value of counter, increments the maximum by one, and assigns the result to the counter variable. Note that every time instruction is executed, the value of counter is incremented by at least one, and so it grows as processor keeps on executing instructions. The value of logical time assigned to instruction $x$ is defined as the pair $(counter, id)$, where counter is the value of the variable counter right after the instruction has been executed, and id is the identifier of the processor. The values of logical time form a total order, where pairs are compared lexicographically. This logical time is also called Lamport time. We define $t_x$ to be a quotient $counter + 1/(id + 1)$, which is an equivalent way to represent the pair.

\textbf{Beobachtung 21.1} For any execution, logical time satisfies three conditions

(i) if an instruction $x$ is performed by a processor before an instruction $y$ is performed by the same processor, then the logical time of $x$ is strictly smaller than that of $y$,

(ii) any two distinct instructions of any two processors get assigned different logical times,

(iii) if instruction $x$ sends a message and instruction $y$ receives this message, then
the logical time of \( x \) is strictly smaller than that of \( y \).

Our goal now is to argue that logical clock provides to processors the illusion of global clock. Intuitively, the reason why such an illusion can be created is that we can take any execution of a deterministic algorithm, compute the logical time \( t_x \) of each instruction \( x \), and run the execution again delaying or speeding up processors and messages in such a way that each instruction \( x \) is executed at the instant \( t_x \) of the global clock. Thus, without access to a hardware clock or other external measurements not captured in our model, the processors cannot distinguish the reading of logical clock from the reading of a real global clock. Formally, the reason why the re-timed sequence is a valid execution that is indistinguishable from the original execution, is summarized in the subsequent corollary that follows directly from Observation 21.1

**Korollar 21.2** For any execution \( \alpha \), let \( T \) be the assignment of logical time to instructions, and let \( \beta \) be the sequence of instructions ordered by their logical time in \( \alpha \). Then for each processor, the subsequence of instructions executed by the processor in \( \alpha \) is the same as the subsequence in \( \beta \). Moreover, each message is received in \( \beta \) after it is sent in \( \beta \).

### 21.5.2. Causality

In a system execution, an instruction can affect another instruction by altering the state of the computation in which the second instruction executes. We say that one instruction can *causally* affect (or influence) another, if the information that one instruction produces can be passed on to the other instruction. Recall that in our model of distributed system, each instruction is executed at a distinct instant of global time, but processors do not have access to the reading of the global clock. Let us illustrate causality. If two instructions are executed by the same processor, then we could say that the instruction executed earlier can causally affect the instruction executed later, because it is possible that the result of executing the former instruction was used when the latter instruction was executed. We stress the word possible, because in fact the later instruction may not use any information produced by the former. However, when defining causality, we simplify the problem of capturing how processors influence other processors, and focus on what is possible. If two instructions \( x \) and \( y \) are executed by two different processors, then we could say that instruction \( x \) can causally affect instruction \( y \), when the processor that executes \( x \) sends a message when or after executing \( x \), and the message is delivered before or during the execution of \( y \) at the other processor. It may also be the case that influence is passed on through intermediate processors or multiple instructions executed by processors, before reaching the second processor.

We will formally define the intuition that one instruction can causally affect another in terms of a relation called *happens before*, and that relates pairs of instructions. The relation is defined for a given execution, i.e., we fix a sequence of instructions executed by the algorithm and instances of global clock when the instructions were executed, and define which pairs of instructions are related by the happens before relation. The relation is introduced in two steps. If instructions \( x \) and \( y \) are executed
by the same processor, then we say that $x$ happens before $y$ if and only if $x$ is executed before $y$. When $x$ and $y$ are executed by two different processors, then we say that $x$ happens before $y$ if and only if there is a chain of instructions and messages

$$
\begin{array}{c}
  \text{snd}_1 \\
  \text{rcv}_2 \ldots \text{snd}_2 \\
  \vdots \\
  \text{rcv}_{k-1} \ldots \text{snd}_{k-1} \\
  \text{rcv}_k
\end{array}
$$

for $k \geq 2$, such that $\text{snd}_1$ is either equal to $x$ or is executed after $x$ by the same processor that executes $x$; $\text{rcv}_k$ is either equal to $y$ or is executed before $y$ by the same processor that executes $y$; $\text{rcv}_h$ is executed before $\text{snd}_h$ by the same processor, $2 \leq h < k$; and $\text{snd}_h$ sends a message that is received by $\text{rcv}_{h+1}$, $1 \leq h < k$. Note that no instruction happens before itself. We write $x <_{HB} y$ when $x$ happens before $y$. We omit the reference to the execution for which the relation is defined, because it will be clear from the context which execution we mean. We say that two instructions $x$ and $y$ are concurrent when neither $x <_{HB} y$ nor $y <_{HB} x$. The question stands how processors can determine if one instruction happens before another in a given execution according to our definition. This question can be answered through a generalization of the Logical Clock algorithm presented earlier. This generalization is called vector clocks.

The Vector Clocks algorithm allows processes to relate instructions, and this relation is exactly the happens before relation. Each processor $p_i$ maintains a vector $V_i$ of $n$ integers. The $j$-th coordinate of the vector is denoted by $V_i[j]$. The vector is initialized to the zero vector $(0, \ldots, 0)$. A vector is modified each time a processor executes an instruction, in a way similar to the way counter was modified in the Logical Clock algorithm. Specifically, when a processor $p_i$ executes any instruction other than sending or receiving a message, the coordinate $V_i[i]$ gets incremented by one, and other coordinates remain intact. When a processor sends a message, it increments $V_i[i]$ by one, and attaches the resulting vector $V_i$ to the message. When a processor $p_j$ receives a message, then the processor retrieves the vector $V$ attached to the message, calculates coordinate-wise maximum of the current vector $V_j$ and the vector $V$, except for coordinate $V_j[i]$ that gets incremented by one, and assigns the result to the variable $V_j$.

$$
V_j[j] := V_j[j] + 1 \\
\text{for all} \quad k \in [n] \setminus \{j\} \\
V_j[k] := \max\{V_j[k], V[k]\}
$$

We label each instruction $x$ executed by processor $p_i$ with the value of the vector $V_i$ right after the instruction has been executed. The label is denoted by $VT(x)$ and
is called vector timestamp of instruction \( x \). Intuitively, \( VT(x) \) represents the knowledge of processor \( p_i \) about how many instructions each processor has executed at the moment when \( p_i \) has executed instruction \( x \). This knowledge may be obsolete.

Vector timestamps can be used to order instructions that have been executed. Specifically, given two instructions \( x \) and \( y \), and their vector timestamps \( VT(x) \) and \( VT(y) \), we write that \( x \leq_{VT} y \) when the vector \( VT(x) \) is majorized by the vector \( VT(y) \). 

The next theorem explains that the Vector Clocks algorithm indeed implements the happens before relation, because we can decide if two instructions happen or not before each other, just by comparing the vector timestamps of the instructions.

**Theorem 21.21** For any execution and any two instructions \( x \) and \( y \), \( x <_{HB} y \) if and only if \( x <_{VT} y \).

**Proof.** We first show the forward implication. Suppose that \( x <_{HB} y \). Hence \( x \) and \( y \) are two different instructions. If the two instructions are executed on the same processor, then \( x \) must be executed before \( y \). Only finite number of instructions have been executed by the time \( y \) has been executed. The Vector Clock algorithm increases a coordinate by one as it calculates vector timestamps of instructions from \( x \) until \( y \) inclusive, and no coordinate is ever decreased. Thus \( x <_{VT} y \). If \( x \) and \( y \) were executed on different processors, then by the definition of happens before relation, there must be a finite chain of instructions and messages leading from \( x \) to \( y \). But then by the Vector Clock algorithm, the value of a coordinate of vector timestamp gets increased at each move, as we move along the chain, and so again \( x <_{VT} y \).

Now we show the reverse implication. Suppose that it is not the case that \( x <_{HB} y \). We consider a few subcases always concluding that it is not that case that \( x <_{VT} y \). First, it could be the case that \( x \) and \( y \) are the same instruction. But then obviously vector clocks assigned to \( x \) and \( y \) are the same, and so it cannot be the case that \( x <_{VT} y \). Let us, therefore, assume that \( x \) and \( y \) are different instructions. If they are executed by the same processor, then \( x \) cannot be executed before \( y \), and so \( x \) is executed after \( y \). Thus, by monotonicity of vector timestamps, \( y <_{VT} x \), and so it is not the case that \( x <_{VT} y \). The final subcase is when \( x \) and \( y \) are executed by two distinct processors \( p_i \) and \( p_j \). Let us focus on the component \( i \) of vector clock \( V_i \) of processor \( p_i \) right after \( x \) was executed. Let its value be \( k \). Recall that other processors can only increase the value of their components \( i \) by adopting the value sent by other processors. Hence, in order for the value of component \( i \) of processor \( p_j \) to be \( k \) or more at the moment \( y \) is executed, there must be a chain of instructions and messages that passes a value at least \( k \), originating at processor \( p_i \). This chain starts at \( x \) or at an instruction executed by \( p_i \) subsequent to \( x \). But the existence of such chain would imply that \( x \) happens before \( y \), which we assumed was not the case. So the component \( i \) of vector clock \( VT(y) \) is strictly smaller than the component \( i \) of vector clock \( VT(x) \). Thus it cannot be the case that \( x <_{VT} y \).}

This theorem tells us that we can decide if two distinct instructions \( x \) and \( y \) are concurrent, by checking that it is not the case that \( VT(x) < VT(y) \) nor is it the case that \( VT(x) > VT(y) \).
21.5.3. Consistent state

The happens before relation can be used to compute a global state of distributed system, such that this state is in some sense consistent. Shortly, we will formally define the notion of consistency. Each processor executes instructions. A cut $K$ is defined as a vector $K = (k_1, \ldots, k_n)$ of non-negative integers. Intuitively, the vector $K$ denotes the states of processors. Formally, $k_i$ denotes the number of instructions that processor $p_i$ has executed. Not all cuts correspond to collections of states of distributed processors that could be considered natural or consistent. For example, if a processor $p_i$ has received a message from $p_j$ and we record the state of $p_i$ in the cut by making $k_i$ appropriately large, but make $k_j$ too small that the cut contains the state of the sender before the moment when the message was sent, then we could say that such cut is not natural – there are instructions recorded in the cut that are causally affected by instructions that are not recorded in the cut. Such cuts we consider not consistent and so undesirable. Formally, a cut $K = (k_1, \ldots, k_n)$ is inconsistent when there are processors $p_i$ and $p_j$ such that the instruction number $k_i$ of processor $p_i$ is causally affected by an instruction subsequent to instruction number $k_j$ of processor $p_j$. So in an inconsistent cut there is a message that “crosses” the cut in a backward direction. Any cut that is not inconsistent is called a consistent cut.

The Consistent Cut algorithm uses vector timestamps to find a consistent cut. We assume that each processor is given the same cut $K = (k_1, \ldots, k_n)$ as an input. Then processors must determine a consistent cut $K'$ that is majorized by $K$. Each processor $p_i$ has an infinite table $VT_i[0, 1, 2, \ldots]$ of vectors. Processor $p_i$ executes instructions, and stores vector timestamps in consecutive entries of the table. Specifically, entry $m$ of the table is the vector timestamp $VT_i[m]$ of the $m$-th instruction executed by the processor; we define $VT_i[0]$ to be the zero vector. Processor $p_i$ begins calculating a cut right after the moment when the processor has executed instruction number $k_i$. The processor determines the largest number $k_i' \geq 0$ that is at most $k_i$, such that the vector $VT_i[k_i']$ is majorized by $K$. The vector $K' = (k_1', \ldots, k_n')$ that processors collectively find turns out to be a consistent cut.

**Theorem 21.22** For any cut $K$, the cut $K'$ computed by the Consistent Cut algorithm is a consistent cut majorized by $K$.

**Proof.** First observe that there is no need to consider entries of $VT_i$ further than $k_i$. Each of these entries is not majorized by $K$, because the $i$-th coordinate of any of these vectors is strictly larger than $k_i$. So we can indeed focus on searching among the first $k_i$ entries of $VT_i$. Let $k_i'\geq 0$ be the largest entry such that the vector $VT_i[k_i']$ is majorized by the vector $K$. We know that such vector exists, because $VT_i[0]$ is a zero vector, and such vector is majorized by any cut $K$.

We argue that $(k_1', \ldots, k_n')$ is a consistent cut by way of contradiction. Suppose that the vector $(k_1', \ldots, k_n')$ is an inconsistent cut. Then, by definition, there are processors $p_i$ and $p_j$ such that there is an instruction $x$ of processor $p_i$ subsequent to instruction number $k_i'$, such that $x$ happens before instruction number $k_j'$ of processor $p_j$. Recall that $k_i'$ is the furthest entry of $VT_i$ majorized by $K$. So entry $k_i'+1$ is not majorized by $K$, and since all subsequent entries, including the one for instruction
21.5. Logical Time, Causality, and Consistent State

$x$, can have only larger coordinates, the entries are not majorized by $K$ either. But, $x$ happens before instruction number $k_j'$, so entry $k_j'$ can only have larger coordinates than respective coordinates of the entry corresponding to $x$, and so $VT_j[k_j']$ cannot be majorized by $K$ either. This contradicts the assumption that $VT_j[k_j']$ is majorized by $K$. Therefore, $(k_1', \ldots, k_n')$ must be a consistent cut.

There is a trivial algorithm for finding a consistent cut. The algorithm picks $K' = (0, \ldots, 0)$. However, the Consistent Cut algorithm is better in the sense that the consistent cut found in maximal. That this is indeed true, is left as an exercise.

There is an alternative way to find a consistent cut. The Consistent Cut algorithm requires that we attach vector timestamps to messages and remember vector timestamps for all instructions executed so far by the algorithm $A$ which consistent cut we want to compute. This may be too costly. The algorithm called Distributed Snapshot avoids this cost. In the algorithm, a processor initializes the calculation of consistent cut by flooding the network with a special message that acts like a sword that cuts the execution of algorithm $A$ consistently. In order to prove that the cut is indeed consistent, we require that messages are received by the recipient in the order they were sent by the sender. Such ordering can be implemented using sequence number.

In the Distributed Snapshot algorithm, each processor $p_i$ has a variable called counter that counts the number of instructions of algorithm $A$ executed by the processor so far. In addition the processor has a variable $k_i$ that will store the $i$-th coordinate of the cut. This variable is initialized to ⊥. Since the variables counter only count the instructions of algorithm $A$, the instructions of Distributed Snapshot algorithm do not affect the counter variables. In some sense the snapshot algorithm runs in the “background”. Suppose that there is exactly one processor that can decide to take a snapshot of the distributed system. Upon deciding, the processor “floods” the network with a special message $<\text{Snapshot}>$. Specifically, the processor sends the message to all its neighbors and assigns counter to $k_i$. Whenever a processor $p_j$ receives the message and the variable $k_j$ is still ⊥, then the processor sends $<\text{Snapshot}>$ message to all its neighbors and assigns current to $k_j$. The sending of $<\text{Snapshot}>$ messages and assignment are done by the processor without executing any instruction of $A$ (we can think of Distributed Snapshot algorithm as an “interrupt”). The algorithm calculates a consistent cut.

**Theorem 21.23** Let for any processors $p_i$ and $p_j$, the messages sent from $p_i$ to $p_j$ be received in the order they are sent. The Distributed Snapshot algorithm eventually finds a consistent cut $(k_1, \ldots, k_n)$. The algorithm sends $O(e)$ messages, where $e$ is the number of edges in the graph.

**Proof.** The fact that each variable $k_i$ is eventually different from ⊥ follows from our model, because we assumed that instructions are eventually executed and messages are eventually received, so the $<\text{Snapshot}>$ messages will eventually reach all nodes.

Suppose that $(k_1, \ldots, k_n)$ is not a consistent cut. Then there is a processor $p_j$ such that instruction number $k_j + 1$ or later sends a message $<M>$ other than $<\text{Snapshot}>$, and the message is received on or before a processor $p_i$ executes instruction number $k_i$. So the message $<M>$ must have been sent after the message $<\text{Snapshot}>$ was sent from $p_j$ to $p_i$. But messages are received in the order they
are sent, so \( p_i \) processes \(<\text{Snapshot}>\) before it processes \(<M>\). But then message \(<M>\) arrives after snapshot was taken at \( p_i \). This is a desired contradiction. ■

**Exercises**

21.5-1 Show that logical time preserves the happens before relation. That is show that if \( x <_{HB} y \), then \( LT(x) < LT(y) \).

21.5-2 Show that any vector clock that captures concurrency between \( n \) processors must have at least \( n \) coordinates.

21.5-3 Show that the vector \( K' \) calculated by the Consistent Cut algorithm is in fact a maximal consistent cut majorized by \( K \). That is that there is no \( K'' \) that majorizes \( K' \) and is different from \( K' \), such that \( K'' \) is majorized by \( K \).

21.5-4 Imagine that there are \( n \) banks that are interconnected. Each bank \( i \) starts with an amount of money \( m_i \). Banks do not remember the initial amount of money. Banks keep on transferring money among themselves by sending messages of type \(<10>\) that represent the value of a transfer. At some point of time a bank decides to find the total amount of money in the system. Design an algorithm for calculating \( m_1 + \ldots + m_n \) that does not stop monetary transactions.

### 21.6. Communication services

Among the fundamental problems in distributed systems where processors communicate by message passing are the tasks of spreading and gathering information. Many distributed algorithms for communication networks can be constructed using building blocks that implement various broadcast and multicast services. In this section we present some basic communication services in the message-passing model. Such services typically need to satisfy some quality of service requirements dealing with ordering of messages and reliability. We first focus on broadcast services, then we discuss more general multicast services.

#### 21.6.1. Properties of Broadcast Services

In the broadcast problem, a selected processor \( p_s \), called a source or a sender, has the message \( m \), which must be delivered to all processors in the system (including the source). The interface of the broadcast service is specified as follows:

- \( \text{bc-send}_i(m, \text{qos}) \) : an event of processor \( p_i \) that sends a message \( m \) to all processors.
- \( \text{bc-recv}_i(m, j, \text{qos}) \) : an event of processor \( p_i \) that receives a message \( m \) sent by processor \( p_j \).

In above definitions \( \text{qos} \) denotes the quality of service provided by the system. We consider two kinds of quality service:

- **Ordering**: how the order of received messages depends on the order of messages
21.6. Communication services

sent by the source?

Reliability: how the set of received messages depends on the failures in the system?

The basic model of a message-passing distributed system normally does not guarantee any ordering or reliability of messaging operations. In the basic model we only assume that each pair of processors is connected by a link, and message delivery is independent on each link — the order of received messages may not be related to the order of the sent messages, and messages may be lost in the case of crashes of senders or receivers.

We present some of the most useful requirements for ordering and reliability of broadcast services. The main question we address is how to implement a stronger service on top of the weaker service, starting with the basic system model.

Variants of ordering requirements

Applying the definition of happens before to messages, we say that message $m$ happens before message $m'$ if either $m$ and $m'$ are sent by the same processor and $m$ is sent before $m'$, or the bc-recv event for $m$ happens before the bc-send event for $m'$.

We identify four common broadcast services with respect to the message ordering properties:

Basic Broadcast: no order of messages is guaranteed.

Single-Source FIFO (first-in-first-out): messages sent by one processor are received by each processor in the same order as sent; more precisely, for all processors $p_i, p_j$ and messages $m, m'$, if processor $p_i$ sends $m$ before it sends $m'$ then processor $p_j$ does not receive message $m'$ before message $m$.

Causal Order: messages are received in the same order as they happen; more precisely, for all messages $m, m'$ and every processor $p_i$, if $m$ happens before $m'$ then $p_i$ does not receive $m'$ before $m$.

Total Order: the same order of received messages is preserved in each processor; more precisely, for all processors $p_i, p_j$ and messages $m, m'$, if processor $p_i$ receives $m$ before it receives $m'$ then processor $p_j$ does not receive message $m'$ before message $m$.

It is easy to see that Causal Order implies Single-Source FIFO requirements (since the relation “happens before” for messages includes the order of messages sent by one processor), and each of the given services trivially implies Basic Broadcast. There are no additional relations between these four services. For example, there are executions that satisfy Single-Source FIFO property, but not Causal Order. Consider two processors $p_0$ and $p_1$. In the first event $p_0$ broadcasts message $m$, next processor $p_1$ receives $m$, and then $p_1$ broadcasts message $m'$. It follows that $m$ happens before $m'$. But if processor $p_0$ receives $m'$ before $m$, which may happen, then this execution violates Causal Order. Note that trivially Single-Source FIFO requirement is preserved, since each processor broadcasts only one message.

We denote by $bb$ the Basic Broadcast service, by $ssf$ the Single-Source FIFO, by $co$ the Causal Order and by $to$ the Total Order service.
Reliability requirements
In the model without failures we would like to guarantee the following properties of broadcast services:

**Integrity:** each message $m$ received in event bc-receiver has been sent in some bc-send event.

No-Duplicates: each processor receives a message not more than once.

**Liveness:** each message sent is received by all processors.

In the model with failures we define the notion of reliable broadcast service, which satisfies Integrity, No-Duplicates and two kinds of Liveness properties:

Nonfaulty Liveness: each message $m$ sent by non-faulty processor $p_i$ must be received by every non-faulty processor.

Faulty Liveness: each message sent by a faulty processor is either received by all non-faulty processors or by none of them.

We denote by $rbb$ the Reliable Basic Broadcast service, by $rsff$ the Reliable Single-Source FIFO, by $rco$ the Reliable Causal Order, and by $rto$ the Reliable Total Order service.

21.6.2. Ordered Broadcast Services
We now describe implementations of algorithms for various broadcast services.

Implementing Basic Broadcast on top of asynchronous point-to-point messaging
The $bb$ service is implemented as follows. If event bc-send$_i(m, bb)$ occurs then processor $p_i$ sends message $m$ via every link from $p_i$ to $p_j$, where $0 \leq i \leq n - 1$. If a message $m$ comes to processor $p_j$ then it enables event bc-receiver$_j(m, i, bb)$.

To provide reliability we do the following. We build the reliable broadcast on the top of Basic Broadcast service. When bc-send$_i(m, rbb)$ occurs, processor $p_i$ enables event bc-send$_i((m, i), bb)$. If event bc-receiver$_j((m, i), k, bb)$ occurs and message-coordinate $m$ appears for the first time then processor $p_j$ first enables event bc-send$_j((m, i), bb)$ (to inform other non-faulty processors about message $m$ in case when processor $p_i$ is faulty), and next enables event bc-receiver$_j((m, i, rbb)$.

We prove that the above algorithm provides reliability for the Basic Broadcast service. First observe that Integrity and No-Duplicates properties follow directly from the fact that each processor $p_j$ enables bc-receiver$_j(m, i, rbb)$ only if message-coordinate $m$ is received for the first time. Nonfaulty Liveness is preserved since links between non-faulty processors enables events bc-receiver$_j(\cdot, \cdot, bb)$ correctly. Faulty Liveness is guaranteed by the fact that if there is a non-faulty processor $p_j$ which receives message $m$ from the faulty source $p_i$, then before enabling bc-receiver$_j(m, i, rbb)$ processor $p_j$ sends message $m$ using bc-send$_j$ event. Since $p_j$ is non-faulty, each non-faulty processor $p_k$ gets message $m$ in some bc-receiver$_k((m, i), bb)$ event, and then accepts it (enabling event bc-receiver$_k(m, i, rbb)$) during the first such event.
21.6. Communication services

Implementing Single-Source FIFO on top of Basic Broadcast service
Each processor $p_i$ has its own counter (timestamp), initialized to 0. If event bc-send$_i(m, ssf)$ occurs then processor $p_i$ sends message $m$ with its current timestamp attached, using bc-send$_i(<m, timestamp>, bb)$. If an event bc-recev$_j(<m, t >, i, bb)$ occurs then processor $p_j$ enables event bc-recev$_j(m, i, ssf)$ just after events bc-recev$_j(m_0, i, ssf), \ldots, bc-recev$_j(m_{t-1}, i, ssf)$ have been enabled, where $m_0, \ldots, m_{t-1}$ are the messages such that events bc-recev$_j(<m_0, 0 >, i, bb), \ldots, bc-recev$_j(<m_{t-1}, t-1 >, i, bb)$ have been enabled.

Note that if we use reliable Basic Broadcast instead of Basic Broadcast as the background service, the above implementation of Single-Source FIFO becomes Reliable Single-Source FIFO service. We leave the proof to the reader as an exercise.

Implementing Causal Order and Total Order on the top of Single-Source FIFO service
We present an Ordered Broadcast Algorithm which works in the asynchronous message-passing system providing Single-Source FIFO broadcast service. It uses the idea of timestamps, but in more advanced way than in the implementation of ssf. We denote by $cto$ the service satisfying causal and total orders requirements.

Each processor $p_i$ maintains in a local array $T$ its own increasing counter (timestamp), and the estimated values of timestamps of other processors. Timestamps are used to mark messages before sending — if $p_i$ is going to broadcast a message, it increases its timestamp and uses it to tag this message (lines 11-13). During the execution processor $p_i$ estimates values of timestamps of other processors in the local vector $T$ — if processor $p_i$ receives a message from processor $p_j$ with a tag $t$ (timestamp of $p_j$), it puts $t$ into $T[j]$ (lines 23 and 32). Processor $p_i$ sets its current timestamp to be the maximum of the estimated timestamps in the vector $T$ plus one (lines 24-26). After updating the timestamp processor sends an update message. Processor accepts a message $m$ with associated timestamp $t$ from processor $j$ if pair $(t, j)$ is the smallest among other received messages (line 42), and each processor has at least as large a timestamp as known by processor $p_i$ (line 43). The details are given in the code below.
**Ordered Broadcast Algorithm**

Code for any processor $p_i$, $0 \leq i \leq n - 1$

01 **initialization**
02 $T[j] := 0$ for every $0 \leq j \leq n - 1$

11 if bc-send$_i(m, cto)$ occurs then
12 $T[i] := T[i] + 1$
13 enable bc-send$_i(< m, T[i] >, ssf)$

21 if bc-recv$_i(< m, t >, j, ssf)$ occurs then
22 add triple $(m, t, j)$ to pending
23 $T[j] := t$
24 if $t > T[i]$ then
25 $T[i] := t$
26 enable bc-send$_i(< update, T[i] >, ssf)$

31 if bc-recv$_i(< update, t >, j, ssf)$ occurs then
32 $T[j] := t$

41 if
42 $(m, t, j)$ is the pending triple with the smallest $(t, j)$, and
43 $t \leq T[k]$ for every $0 \leq k \leq n - 1$
44 then
45 enable bc-recv$_i(m, j, cto)$
46 remove triple $(m, t, j)$ from pending

Algorithm Ordered Broadcast satisfies the Causal Order requirement. We leave the proof to the reader as an exercise (in the latter part we show how to achieve stronger Reliable Causal Order service and provide the proof for that stronger case).

**Theorem 21.24** Algorithm Ordered Broadcast satisfies the Total Order requirement.

**Proof:** Integrity follows from the fact that each processor can enable event bc-recv$_i(m, j, cto)$ only if the triple $(m, t, j)$ is pending (lines 41-45), which may happen after receiving a message $m$ from processor $j$ (lines 21-22). No-Duplicates property is guaranteed by the fact that there is at most one pending triple containing message $m$ sent by processor $j$ (lines 13 and 21-22).

Liveness follows from the fact that each pending triple satisfies conditions in lines 42-43 in some moment of the execution. The proof of this fact is by induction on the events in the execution — suppose to the contrary that $(m, t, j)$ is the triple with smallest $(t, j)$ which does not satisfy conditions in lines 42-43 at any moment of the execution. It follows that there is a moment from which triple $(m, t, j)$ has smallest $(t, j)$ coordinates among pending triples in processor $p_i$. Hence, starting from this moment, it must violate condition in line 43 for some $k$. Note that $kNEi,j$, by updating rules in lines 23-25. It follows that processor $p_i$ never receives a message from $p_k$ with timestamp greater than $t - 1$, which by updating rules in lines 24-26.
means that processor \( p_k \) never receives a message \(< m, t >\) from \( j \), which contradicts the liveness property of \( ssf \) broadcast service.

To prove Total Order property it is sufficient to prove that for every processor \( p_i \) and messages \( m, m' \) sent by processors \( p_k, p_l \) with timestamps \( t, t' \) respectively, each of the triples \((m, t, k), (m', t', l)\) are accepted according to the lexicographic order of \((t, k), (t', l)\). There are two cases.

**Case 1.** Both triples are pending in processor \( p_i \) at some moment of the execution. Then condition in line 42 guarantees acceptance in order of \((t, k), (t', l)\).

**Case 2.** Triple \((m, t, k)\) (without loss of generality) is accepted by processor \( p_i \) before triple \((m', t', l)\) is pending. If \((t, k) < (t', l)\) then still the acceptance is according to the order of \((t, k), (t', l)\). Otherwise \((t, k) > (t', l)\), and by condition in line 43 we get in particular that \( t \leq T[l] \), and consequently \( t' \leq T[l] \). This can not happen because of the \( ssf \) requirement and the assumption that processor \( p_i \) has not yet received message \(< m', t' >\) from \( l \) via the \( ssf \) broadcast service.

Now we address reliable versions of Causal Order and Total Order services. A Reliable Causal Order requirements can be implemented on the top of Reliable Basic Broadcast service in asynchronous message-passing system with processor crashes using the following algorithm. It uses the same data structures as previous Ordered broadcast Algorithm. The main difference between Reliable Causally Ordered Broadcast Algorithm and Ordered Broadcast Algorithm are as follows: instead of using integer timestamps processors use vector timestamps \( T \), and they do not estimate timestamps of other processors, only compare in lexicographic order their own (vector) timestamps with received ones. The intuition behind vector timestamp of processor \( p_i \) is that it stores information how many messages have been sent by \( p_i \) and how many have been accepted by \( p_i \) from every \( p_k \), where \( k \neq i \).

In the course of the algorithm processor \( p_i \) increases corresponding position \( i \) in its vector timestamp \( T \) before sending a new message (line 12), and increases \( j \)th position of its vector timestamp after accepting new message from processor \( p_j \) (line 38). After receiving a new message from processor \( p_j \) together with its vector timestamp \( \hat{T} \), processor \( p_i \) adds triple \((m, \hat{T}, j)\) to pending and accepts this triple if it is first not accepted message received from processor \( p_j \) (condition in line 33) and the number of accepted messages (from each processor \( p_k \neq p_i \)) by processor \( p_j \) was not bigger in the moment of sending \( m \) than it is now in processor \( p_i \) (condition in line 34). Detailed code of the algorithm follows.
**Reliable Causally Ordered Broadcast Algorithm**

Code for any processor $p_i$, $0 \leq i \leq n - 1$

01 **initialization**

02 $T[j] := 0$ for every $0 \leq j \leq n - 1$

03 pending list is empty

11 **if** bc-send$_i(m, rco)$ occurs **then**

12 $T[i] := T[i] + 1$

13 **enable** bc-send$_i(< m, T >, rbb)$

21 **if** bc-recv$_i(< m, T', j, rbb)$ occurs **then**

22 **add** triple $(m, T, j)$ to pending

31 **if**

32 $(m, T, j)$ is the pending triple, **and**

33 $T[j] = T[j] + 1$, **and**

34 $T[k] \leq T[k]$ for every $kNEi$

35 **then**

36 **enable** bc-recv$_i(m, j, rco)$

37 **remove** triple $(m, T, j)$ from pending

38 $T[j] := T[j] + 1$

We argue that Reliable Causally Ordered Broadcast algorithm provides Reliable Causal Order broadcast service on the top of the system equipped with the Reliable Basic Broadcast service. Integrity and No-Duplicate properties are guaranteed by $rbb$ broadcast service and facts that each message is added to pending at most once and non-received message is never added to pending. Nonfaulty and Faulty Liveness can be proved by one induction on the execution, using facts that non-faulty processors have received all messages sent, which guarantees that conditions in lines 33-34 are eventually satisfied. Causal Order requirement holds since if message $m$ happens before message $m'$ then each processor $p_i$ accepts messages $m, m'$ according to the lexicographic order of $T, T'$, and these vector-arrays are comparable in this case. Details are left to the reader.

Note that Reliable Total Order broadcast service can not be implemented in the general asynchronous setting with processor crashes, since it would solve consensus in this model — first accepted message would determine the agreement value (against the fact that consensus is not solvable in the general model).

### 21.6.3. Multicast services

Multicast services are similar to the broadcast services, except each multicast message is destined for a specified subset of all processors. In the multicast service we provide two types of events, where $qos$ denotes a quality of service required:

**mc-send$_i(m, D, qos)$** : an event of processor $p_i$ which sends a message $m$ together with its id to all processors in a destination set $D \subseteq \{0, \ldots, n - 1\}$. 
mc-receiv \textsubscript{i}(m, j, qos) \textbf{:} an event of processor \( p_i \) which receives a message \( m \) sent by processor \( p_j \).

Note that the event \( \text{mc-receiv} \) is similar to \( \text{bc-receiv} \).

As in case of a broadcast service, we would like to provide useful ordering and reliable properties of the multicast services. We can adapt ordering requirements from the broadcast services. Basic Multicast does not require any ordering properties. Single-Source FIFO requires that if one processor multicasts messages (possibly to different destination sets), then the messages received in each processors (if any) must be received in the same order as sent by the source. Definition of Causal Order remains the same. Instead of Total Order, which is difficult to achieve since destination sets may be different, we define another ordering property:

\textbf{Sub-Total Order:} orders of received messages in all processors may be extended to the total order of messages; more precisely, for any messages \( m, m' \) and processors \( p_i, p_j \), if \( p_i \) and \( p_j \) receives both messages \( m, m' \) then they are received in the same order by \( p_i \) and \( p_j \).

The reliability conditions for multicast are somewhat different from the conditions for reliable broadcast.

\textbf{Integrity:} each message \( m \) received in event \( \text{mc-receiv} \subscript{i} \) was sent in some \( \text{mc-send} \) event with destination set containing processor \( p_i \).

\textbf{No Duplicates:} each processor receives a message not more than once.

\textbf{Nonfaulty Liveness:} each message \( m \) sent by non-faulty processor \( p_i \) must be received in every non-faulty processor in the destination set.

\textbf{Faulty Liveness:} each message sent by a faulty processor is either received by all non-faulty processors in the destination set or by none of them.

One way of implementing ordered and reliable multicast services is to use the corresponding broadcast services (for Sub-Total Order the corresponding broadcast requirement is Total Order). More precisely, if event \( \text{mc-send} \subscript{i}(m, D, qos) \) occurs processor \( p_i \) enables event \( \text{bc-send} \subscript{i}(<m, D>, qos) \). When an event \( \text{bc-receiv} \subscript{j}(<m, D>, i, qos) \) occurs, processor \( p_j \) enables event \( \text{mc-receiv} \subscript{j}(m, i, qos) \) if \( p_j \in D \), otherwise it ignores this event. The proof that such method provides required multicast quality of service is left as an exercise.

\section*{21.7. Rumor Collection Algorithms}

Reliable multicast services can be used as building blocks in constructing algorithms for more advanced communication problems. In this section we illustrate this method for the problem of collecting rumors by synchronous processors prone to crashes. (Since we consider only fair executions, we assume that at least one processor remains operational to the end of the computation).
21.7.1. Rumor Collection (Gossip) Problem and Requirements

The classic problem of collecting rumors, or gossip, is defined as follows:

At the beginning, each processor has its distinct piece of information, called a rumor, the goal is to make every processor know all the rumors.

However in the model with processor crashes we need to re-define the gossip problem to respect crash failures of processors. Both Integrity and No-Duplicates properties are the same as in the reliable broadcast service, the only difference (which follows from the specification of the gossip problem) is in Liveness requirements:

Non-faulty Liveness: the rumor of every non-faulty processor must be known by each non-faulty processor.

Faulty Liveness: if processor $p_i$ has crashed during execution then each non-faulty processor either knows the rumor of $p_i$, or knows that $p_i$ is crashed.

The efficiency of gossip algorithms is measured in terms of time and message complexity. Time complexity measures number of (synchronous) steps from the beginning to the termination. Message complexity measures the total number of point-to-point messages sent (more precisely, if a processor sends a message to three other processors in one synchronous step, it contributes three to the message complexity).

The following simple algorithm completes gossip in just one synchronous step: each processor broadcasts its rumor to all processors. The algorithm is correct, because each message received contains a rumor, and a message not received means the failure of its sender. A drawback of such a solution is that a quadratic number of messages could be sent, which is quite inefficient.

We would like to perform gossip not only quickly, but also with fewer point-to-point messages. There is a natural trade-off between time and communication. Note that in the system without processor crashes such a trade-off may be achieved, e.g., sending messages over the (almost) complete binary tree, and then time complexity is $O(\log n)$, while the message complexity is $O(n \log n)$. Hence by slightly increasing time complexity we may achieve almost linear improvement in message complexity. However, if the underlying communication network is prone to failures of components, then irregular failure patterns disturb a flow of information and make gossiping last longer. The question we address in this section is what is the best trade-off between time and message complexity in the model with processor crashes?

21.7.2. Efficient Gossip Algorithms

In this part we describe the family of gossip algorithms, among which we can find some efficient ones. They are all based on the same generic code, and their efficiency depends on the quality of two data structures put in the generic algorithm. Our goal is to prove that we may find some of those data structures that obtained algorithm is always correct, and efficient if the number of crashes in the execution is at most $f$, where $f \leq n - 1$ is a parameter.

We start with description of these structures: communication graph and communication schedules.
21.7. Rumor Collection Algorithms

Communication graph
A graph $G = (V, E)$ consists of a set $V$ of vertices and a set $E$ of edges. Graphs in this paper are always simple, which means that edges are pairs of vertices, with no direction associated with them. Graphs are used to describe communication patterns. The set $V$ of vertices of a graph consists of the processors of the underlying distributed system. Edges in $E$ determine the pairs of processors that communicate directly by exchanging messages, but this does not necessarily mean an existence of a physical link between them. We abstract form the communication mechanism: messages that are exchanged between two vertices connected by an edge in $E$ may need to be routed and traverse a possibly long path in the underlying physical communication network. Graph topologies we use, for a given number $n$ of processors, vary depending on an upper bound $f$ on the number of crashes we would like to tolerate in an execution. A graph that matters, at a given point in an execution, is the one induced by the processors that have not crashed till this step of the execution.

To obtain an efficient gossip algorithm, communication graphs should satisfy some suitable properties, for example the following $\mathcal{R}(n, f)$ property:

**Definition 21.3** Let $f < n$ be a pair of positive integers. Graph $G$ is said to satisfy property $\mathcal{R}(n, f)$, if $G$ has $n$ vertices, and if, for each subgraph $R \subseteq G$ of size at least $n - f$, there is a subgraph $P(R)$ of $G$, such that the following hold:

1. $P(R) \subseteq R$ - inheritance
2. $|P(R)| = |R|/7$ - large size
3. The diameter of $P(R)$ is at most $2 + 30 \ln n$ - logarithmic communication
4. If $R_1 \subseteq R_2$, then $P(R_1) \subseteq P(R_2)$ - monotonicity.

Observe that graph $P(R)$ is connected, even if $R$ is not, since its diameter is finite. The following result justifies that graphs satisfying property $\mathcal{R}(n, f)$ can be constructed.

**Theorem 21.25** For each $f < n$, there exists a graph $G(n, f)$ satisfying property $\mathcal{R}(n, f)$. The maximum degree $\Delta$ of graph $G(n, f)$ is $O\left(\frac{n}{n - f}\right)^{1.837}$.

Communication schedules
A local permutation is a permutation of all the integers in the range $[0..n - 1]$. We assume that prior the computation there is given set $\Pi$ of $n$ local permutations. Each processor $p_i$ has such a permutation $\pi_i$ from $\Pi$. For simplicity we assume that $\pi_i(0) = p_i$. Local permutation is used to collect rumor in systematic way according to the order given by this permutation, while communication graphs are rather used to exchange already collected rumors within large and compact non-faulty graph component.

Generic algorithm
We start with specifying a goal that gossiping algorithms need to achieve. We say that processor $p_i$ has heard about processor $p_j$ if either $p_i$ knows the original input rumor of $p_j$ or $p$ knows that $p_j$ has already failed. We may reformulate correctness of a gossiping algorithm in terms of hearing about other processors: algorithm is correct if Integrity and No-Duplicates properties are satisfied and if each processor
has hard about any other processor by the termination of the algorithm.

The code of a gossiping algorithm includes objects that depend on the number $n$ of processors in the system, and also on the bound $f < n$ on the number of failures which are "efficiently tolerated" (if the number of failures is at most $f$ then message complexity of design algorithm is small). The additional parameter is a termination threshold $\tau$ which influences time complexity of the specific implementation of the generic gossip scheme. Our goal is to construct the generic gossip algorithm which is correct for any additional parameters $f, \tau$ and any communication graph and set of schedules, while efficient for some values $f, \tau$ and structures $G(n, f)$ and $\Pi$.

Each processor starts gossiping as a collector. Collectors seek actively information about rumors of the other processors, by sending direct inquiries to some of them. A collector becomes a disseminator after it has heard about all the processors. Processors with this status disseminate their knowledge by sending local views to selected other processors.

**Local views.** Each processor $p_i$ starts with knowing only its ID and its input information $\text{rumor}_i$. To store incoming data, processor $p_i$ maintains the following arrays:

\[
\text{Rumors}_i, \text{Active}_i, \text{Pending}_i,
\]
each of size $n$. All these arrays are initialized to store the value $\text{nil}$. For an array $X_i$ of processor $p_i$, we denote its $j$th entry by $X_i[j]$ - intuitively this entry contains some information about processor $p_j$. The array $\text{Rumor}$ is used to store all the rumors that a processor knows. At the start, processor $p_i$ sets $\text{Rumors}_i[i]$ to its own input $\text{rumor}_i$. Each time processor $p_i$ learns some $\text{rumor}_j$, it immediately sets $\text{Rumors}_i[j]$ to this value. The array $\text{Active}$ is used to store a set of all the processors that the owner of the array knows as crashed. Once processor $p_i$ learns that some processor $p_j$ has failed, it immediately sets $\text{Active}_i[j]$ to $\text{failed}$. Notice that processor $p_i$ has heard about processor $p_j$, if one among the values $\text{Rumors}_i[j]$ and $\text{Active}_i[j]$ is not equal to $\text{nil}$.

The purpose of using the array $\text{Pending}$ is to facilitate dissemination. Each time processor $p_i$ learns that some other processor $p_j$ is fully informed, that is, it is either a disseminator itself or has been notified by a disseminator, then it marks this information in $\text{Pending}_i[j]$. Processor $p_i$ uses the array $\text{Pending}_i$ to send dissemination messages in a systematic way, by scanning $\text{Pending}_i$ to find those processors that possibly still have not heard about some processor.

The following is a useful terminology about the current contents of the arrays $\text{Active}$ and $\text{Pending}$. Processor $p_j$ is said to be active according to $p_i$, if $p_i$ has not yet received any information implying that $p_j$ crashed, which is the same as having $\text{nil}$ in $\text{Active}_i[j]$. Processor $p_j$ is said to need to be notified by $p_i$, if it is active according to $p_i$ and $\text{Pending}_i[j]$ is equal to $\text{nil}$.

**Phases.** An execution of a gossiping algorithm starts with the processors initializing all the local objects. Processor $p_i$ initializes its list $\text{Rumors}_i$ with $\text{nil}$ at all the locations, except for the $i$th one, which is set equal to $\text{rumor}_i$. The remaining part of execution is structured as a loop, in which phases are iterated. Each phase consists of three parts: receiving messages, local computation, and multicasting messages. Phases are of two kinds: regular phase and ending phase. During regular phases
processor: receives messages, updates local knowledge, checks its status, sends its knowledge to neighbors in communication graphs as well as inquiries about rumors and replies about its own rumor. During ending phases processor: receives messages, sends inquiries to all processors from which it has not heard yet, and replies about its own rumor. The regular phases are performed \( \tau \) times; the number \( \tau \) is a termination threshold. After this, the ending phase is performed four times. This defines a generic gossiping algorithm.

**Generic Gossip Algorithm**

Code for any processor \( p_i, 0 \leq i \leq n - 1 \)

01 **initialization**
02 processor \( p_i \) becomes a collector
03 initialization of arrays \( \text{Rumors}_i, \text{Active}_i \) and \( \text{Pending}_i \),

11 **repeat** \( \tau \) times
12 **perform** regular phase

20 **repeat** 4 times
21 **perform** ending phase

Now we describe communication and kinds of messages used in regular and ending phases.

**Graph and range messages used during regular phases.** A processor \( p_i \) may send a message to its neighbor in the graph \( G(n, f) \), provided that it is is still active according to \( p_i \). Such a message is called a graph one. Sending these messages only is not sufficient to complete gossiping, because the communication graph may become disconnected as a result of node crashes. Hence other messages are also sent, to cover all the processors in a systematic way. In this kind of communication processor \( p_i \) considers the processors as ordered by its local permutation \( \pi_i \), that is, in the order \( \pi_i(0), \pi_i(1), \ldots, \pi_i(n - 1) \). Some of additional messages sent in this process are called range ones.

During regular phase processors send the following kind of range messages: inquiring, reply and notifying messages. A collector \( p_i \) sends an inquiring message to the first processor about which \( p_i \) has not heard yet. Each recipient of such a message sends back a range message that is called a reply one.

Disseminators send range messages also to subsets of processors. Such messages are called notifying ones. The target processor selected by disseminator \( p_i \) is the first one that still needs to be notified by \( p_i \). Notifying messages need not to be replied to: a sender already knows the rumors of all the processors, that are active according to it, and the purpose of the message is to disseminate this knowledge.
Regular Phase

Code for any processor \( p_i, 0 \leq i \leq n - 1 \)

01 receive messages

11 perform local computation
12 update the local arrays
13 if \( p_i \) is a collector, that has already heard about all the processors
14 then \( p_i \) becomes a disseminator
15 compute set of destination processors: for each processor \( p_j \)
16 if \( p_j \) is active according to \( p_i \) and \( p_j \) is a neighbor of \( p_i \) in graph \( G(n, t) \)
17 then add \( p_j \) to destination set for a graph message
18 if \( p_i \) is a collector and \( p_j \) is the first processor
19 about which \( p_i \) has not heard yet
20 then send an inquiring message to \( p_j \)
21 if \( p_i \) is a disseminator and \( p_j \) is the first processor
22 that needs to be notified by \( p_i \)
23 then send a notifying message to \( p_j \)
24 if \( p_j \) is a collector, from which an inquiring message was received
25 in the receiving step of this phase
26 then send a reply message to \( p_j \)

30 send graph/inquiring/notifying/reply messages to corresponding destination sets

Last-resort messages used during ending phases. Messages sent during the ending phases are called last-resort ones. These messages are categorized into inquiring, replying, and notifying, similarly as the corresponding range ones, which is because they serve a similar purpose. Collectors that have not heard about some processors yet send direct inquiries to all of these processors simultaneously. Such messages are called inquiring ones. They are replied to by the non-faulty recipients in the next step, by way of sending reply messages. This phase converts all the collectors into disseminators. In the next phase, each disseminator sends a message to all the processors that need to be notified by it. Such messages are called notifying ones.

The number of graph messages, sent by a processor at a step of the regular phase, is at most as large as the maximum node degree in the communication graph. The number of range messages, sent by a processor in a step of the regular phase, is at most as large as the number of inquiries received plus a constant - hence the global number of point-to-point range messages sent by all processors during regular phases may be accounted as a constant times the number of inquiries sent (which is one per processor per phase). In contrast to that, there is no \textit{a priori} upper bound on the number of messages sent during the ending phase. By choosing the termination threshold \( \tau \) to be large enough, one may control how many rumors still needs to be collected during the ending phases.

Updating local view. A message sent by a processor carries its current local knowledge. More precisely, a message sent by processor \( p_i \) brings the following: the ID \( p_i \), the arrays \texttt{Rumors}_i, \texttt{Active}_i, and \texttt{Pending}_i, and a label to notify the
recipient about the character of the message. A label is selected from the following: graph\_message, inquiry\_from\_collector, notification\_from\_disseminator, this\_is\_a\_reply, their meaning is self-explanatory. A processor $p_i$ scans a newly received message from some processor $p_j$ to learn about rumors, failures, and the current status of other processors. It copies each rumor from the received copy of Rumors$_j$ into Rumors$_i$, unless it is already there. It sets Active$_i[k]$ to failed, if this value is at Active$_j[k]$. It sets Pending$_i[k]$ to done, if this value is at Pending$_j[k]$. It sets Pending$_i[j]$ to done, if $p_j$ is a disseminator and the received message is a range one. If $p_i$ is itself a disseminator, then it sets Pending$_i[j]$ to done immediately after sending a range message to $p_j$. If a processor $p_i$ expects a message to come from processor $p_j$, for instance a graph one from a neighbor in the communication graph, or a reply one, and the message does not arrive, then $p_i$ knows that processor $p_j$ has failed, and it immediately sets Active$_i[j]$ to failed.

**Ending Phase**

Code for any processor $p_i$, $0 \leq i \leq n - 1$

01 receive messages

11 perform local computation

12 update the local arrays

13 if $p_i$ is a collector, that has already heard about all the processors

14 then $p_i$ becomes a disseminator

15 compute set of destination processors: for each processor $p_j$

16 if $p_i$ is a collector and it has not heard about $p_j$ yet

17 then send an inquiring message to $p_j$

18 if $p_i$ is a disseminator and $p_j$ needs to be notified by $p_i$

19 then send a notifying message to $p_j$

20 if an inquiring message was received from $p_j$

21 in the receiving step of this phase

22 then send a reply message to $p_j$

30 send inquiring/notifying/reply messages to corresponding destination sets

**Correctness.** Ending phases guarantee correctness, as is stated in the next fact.

**Lemma 21.26** Generic Gossip Algorithm is correct for every communication graph $G(n, f)$ and set of schedules $\Pi$.

**Proof:** Integrity and No-Duplicates properties follow directly from the code and the multicast service in synchronous message-passing system. It remains to prove that each processor has heard about all processors. Consider the step just before the first ending phases. If a processor $p_i$ has not heard about some other processor $p_j$ yet, then it sends a last-resort message to $p_j$ in the first ending phase. It is replied to in the second ending phase, unless processor $p_j$ has crashed already. In any case, in the third ending phase, processor $p_i$ either learns the input rumor of $p_j$ or it gets to know that $p_j$ has failed. The fourth ending phase provides an opportunity to receive notifying messages, by all the processors that such messages were sent to by $p_i$.  


The choice of communication graph $G(n, f)$, set of schedules II and termination threshold $\tau$ influences however time and message complexities of the specific implementation of Generic Gossip Algorithm. First consider the case when $G(n, f)$ is a communication graph satisfying property $\mathcal{R}(n, f)$ from Definition 21.3 II contains $n$ random permutations, and $\tau = c \log^2 n$ for sufficiently large positive constant $c$. Using Theorem 21.25 we get the following result.

**Satz 21.4** For every $n$ and $f \leq c \cdot n$, for some constant $0 \leq c < 1$, there is a graph $G(n, f)$ such that the implementation of the generic gossip scheme with $G(n, f)$ as a communication graph and a set II of random permutations completes gossip in expected time $O(\log^2 n)$ and with expected message complexity $O(n \log^2 n)$, if the number of crashes is at most $f$.

Consider a small modification of Generic Gossip scheme: during regular phase every processor $p_i$ sends an inquiring message to the first $\Delta$ (instead of one) processors according to permutation $\pi_i$, where $\Delta$ is a maximum degree of used communication graph $G(n, f)$. Note that it does not influence the asymptotic message complexity, since besides inquiring messages in every regular phase each processor $p_i$ sends $\Delta$ graph messages.

**Satz 21.5** For every $n$ there are parameters $f \leq n - 1$ and $\tau = O(\log^2 n)$ and there is a graph $G(n, f)$ such that the implementation of the modified Generic Gossip scheme with $G(n, f)$ as a communication graph and a set II of random permutations completes gossip in expected time $O(\log^2 n)$ and with expected message complexity $O(n^{1.838})$, for any number of crashes.

Since in the above theorem set II is selected prior the computation, we obtain the following existential deterministic result.

**Satz 21.6** For every $n$ there are parameters $f \leq n - 1$ and $\tau = O(\log^2 n)$ and there are graph $G(n, f)$ and set of schedules II such that the implementation of the modified Generic Gossip scheme with $G(n, f)$ as a communication graph and schedules II completes gossip in time $O(\log^2 n)$ and with message complexity $O(n^{1.838})$, for any number of crashes.

**Exercises**

21.7-1 Design executions showing that there is no relation between Causal Order and Total Order and between Single-Source FIFO and Total Order broadcast services. For simplicity consider two processors and two messages sent.

21.7-2 Does broadcast service satisfying Single-Source FIFO and Causal Order requirements satisfy a Total Order property? Does broadcast service satisfying Single-Source FIFO and Total Order requirements satisfy a Causal Order property? If yes provide a proof, if not show a counterexample.

21.7-3 Show that using reliable Basic Broadcast instead of Basic Broadcast in the implementation of Single-Source FIFO service, then we obtain reliable Single-Source FIFO broadcast.
21.8. Mutual Exclusion in Shared Memory

21.7-4 Prove that the Ordered Broadcast algorithm implements Causal Order service on a top of Single-Source FIFO one.
21.7-5 What is the total number of point-to-point messages sent in the algorithm Ordered Broadcast in case of \( k \) broadcasts?
21.7-6 Estimate the total number of point-to-point messages sent during the execution of Reliable Causally Ordered Broadcast, if it performs \( k \) broadcast and there are \( f < n \) processor crashes during the execution.
21.7-7 Show an execution of Reliable Causally Ordered Broadcast Algorithm which violates Total Order requirement.
21.7-8 Write a code of the implementation of reliable Sub-Total Order multicast service.
21.7-9 Show that the described method of implementing multicast services on the top of corresponding broadcast services is correct.
21.7-10 Show that the random graph \( G(n, f) \) - in which each node selects independently at random \( \frac{n}{n-f} \log n \) edges from itself to other processors - satisfies property \( R(n, f) \) from Definition 21.3 and has degree \( O\left(\frac{n}{n-f} \log^2 n \right) \) with probability at least \( 1 - O\left(1/n\right) \).
21.7-11 Leader election problem is as follows: all non-faulty processors must elect one non-faulty processor in the same synchronous step. Show that leader election can not be solved faster than gossip problem in synchronous message-passing system with processors crashes.

21.8. Mutual Exclusion in Shared Memory

We now describe the second main model used to describe Distributed Systems, the Shared Memory Model. To illustrate algorithmic issues in this model we discuss solutions for the mutual exclusion problem.

21.8.1. Shared Memory Systems

We assume the system contains \( n \) processors, \( p_0, \ldots, p_{n-1} \), and \( m \) registers \( R_0, \ldots, R_{m-1} \). Each processor is modeled as a state machine. Each register has a type, which specifies:

1. the values it can hold,
2. the operations that can be performed on it,
3. the value (if any) to be returned by each operation, and
4. the new register value resulting from each operation.

Each register can have an initial value.

For example, an integer valued read/write register \( R \) can take on all integer values and has operations \( \text{read}(R, v) \) and \( \text{write}(R, v) \). The read operation returns the value \( v \) of the last preceding write, leaving \( R \) unchanged. The \( \text{write}(R, v) \) operation has an integer parameter \( v \), returns no value and changes \( R \)'s value to \( v \). A configuration is a vector \( C = (q_0, \ldots, q_{n-1}, r_0, \ldots, r_{m-1}) \), where \( q_i \) is a state of \( p_i \) and \( r_j \) is a
value of register $R_i$. The events are computation steps at the processors where the following happens atomically (indivisibly):

1. $p_i$ chooses a shared variable to access with a specific operation, based on $p_i$'s current state,
2. the specified operation is performed on the shared variable,
3. $p_i$'s state changes based on its transition function, based on its current state and the value returned by the shared memory operation performed.

A finite sequence of configurations and events that begins with an initial configuration is called an execution. In the asynchronous shared memory system, an infinite execution is admissible if it has an infinite number of computation steps.

21.8.2. The Mutual Exclusion Problem

In this problem a group of processors need to access a shared resource that cannot be used simultaneously by more than a single processor. The solution needs to have the following two properties. (1) Mutual exclusion: Each processor needs to execute a code segment called a critical section so that at any given time at most one processor is executing it (i.e., is in the critical section). (2) Deadlock freedom: If one or more processors attempt to enter the critical section, then one of them eventually succeeds as long as no processor stays in the critical section forever. These two properties do not provide any individual guarantees to any processor. A stronger property is (3) No lockout: A processor that wishes to enter the critical section eventually succeeds as long as no processor stays in the critical section forever. Original solutions to this problem relied on special synchronization support such as semaphores and monitors. We will present some of the distributed solutions using only ordinary shared variables.

We assume the program of a processor is partitioned into the following sections:

- **Entry / Try**: the code executed in preparation for entering the critical section.
- **Critical**: the code to be protected from concurrent execution.
- **Exit**: the code executed when leaving the critical section.
- **Remainder**: the rest of the code.

A processor cycles through these sections in the order: remainder, entry, critical and exit. A processor that wants to enter the critical section first executes the entry section. After that, if successful, it enters the critical section. The processor releases the critical section by executing the exit section and returning to the remainder section. We assume that a processor may transition any number of times from the remainder to the entry section. Moreover, variables, both shared and local, accessed in the entry and exit section are not accessed in the critical and remainder section.

Finally, no processor stays in the critical section forever. An algorithm for a shared memory system solves the mutual exclusion problem with no deadlock (or no lockout) if the following hold:

- **Mutual Exclusion**: In every configuration of every execution at most one processor is in the critical section.
21.8. Mutual Exclusion in Shared Memory

- **No deadlock:** In every admissible execution, if some processor is in the entry section in a configuration, then there is a later configuration in which *some* processor is in the critical section.

- **No lockout:** In every admissible execution, if some processor is in the entry section in a configuration, then there is a later configuration in which *that same* processor is in the critical section.

In the context of mutual exclusion, an execution is *admissible* if for every processor $p_i$, $p_i$ either takes an infinite number of steps or $p_i$ ends in the remainder section. Moreover, no processor is ever stuck in the exit section (unobstructed exit condition).

### 21.8.3. Mutual Exclusion Using Powerful Primitives

A single bit suffices to guarantee mutual exclusion with no deadlock if a powerful test&set register is used. A test&set variable $V$ is a binary variable which supports two atomic operations, test&set and reset, defined as follows:

\[
\text{test}&\text{set}(V: \text{memory address}) \text{ returns binary value:}
\]

\[
\text{temp} := V \\
V := 1 \\
\text{return (temp)}
\]

\[
\text{reset}(V: \text{memory address}) : \\
V := 0
\]

The test&set operation atomically reads and updates the variable. The reset operation is merely a write. There is a simple mutual exclusion algorithm with no deadlock, which uses one test&set register.

**Mutual Exclusion using one test&set register**

Initially $V$ equals 0

\[
\langle \text{Entry} \rangle : \\
1 \text{ wait until test}&\text{set}(V) = 0 \langle \text{Critical Section} \rangle \\
\langle \text{Exit} \rangle : \\
2 \text{ reset}(V) \langle \text{Remainder} \rangle
\]

Assume that the initial value of $V$ is 0. In the entry section, processor $p_i$ repeatedly tests $V$ until it returns 0. The last such test will assign 1 to $V$, causing any following test by other processors to return 1, prohibiting any other processor from entering the critical section. In the exit section $p_i$ resets $V$ to 0; another processor waiting in the entry section can now enter the critical section.

**Theorem 21.27** *The algorithm using one test & set register provides mutual exc-
fusion without deadlock.

21.8.4. Mutual Exclusion Using Read/Write Registers

If a powerful primitive such as test&set is not available, then mutual exclusion must be implemented using only read/write operations.

The Bakery Algorithm

Lamport’s Bakery Algorithm [10] for mutual exclusion is an early, classical example of such an algorithm that uses only shared read/write registers. The algorithm guarantees mutual exclusion and no lockout for \( n \) processors using \( O(n) \) registers (but the registers may need to store integer values that cannot be bounded ahead of time).

Processors wishing to enter the critical section behave like customers in a bakery. They all get a number and the one with the smallest number in hand is the next one to be “served”. Any processor not standing in line has number 0, which is not counted as the smallest number.

The algorithm uses the following shared data structures: \textit{Number} is an array of \( n \) integers, holding in its \( i \)-th entry the current number of processor \( p_i \). \textit{Choosing} is an array of \( n \) boolean values such that \( \text{Choosing}[i] \) is true while \( p_i \) is in the process of obtaining its number. Any processor \( p_i \) that wants to enter the critical section attempts to choose a number greater than any number of any other processor and writes it into \textit{Number}[i]. To do so, processors read the array \textit{Number} and pick the greatest number read +1 as their own number. Since however several processors might be reading the array at the same time, symmetry is broken by choosing \((\text{Number}[i], i)\) as \( i \)'s ticket. An ordering on tickets is defined using the lexicographical ordering on pairs. After choosing its ticket, \( p_i \) waits until its ticket is minimal: For all other \( p_j \), \( p_i \) waits until \( p_j \) is not in the process of choosing a number and then compares their tickets. If \( p_j \)'s ticket is smaller, \( p_i \) waits until \( p_j \) executes the critical section and leaves it.
21.8. Mutual Exclusion in Shared Memory

The bakery algorithm:

Code for processor $p_i$, $0 \leq i \leq n - 1$.
Initially $Number[i] = 0$ and
$Choosing[i] = false$, for $0 \leq i \leq n - 1$

<Entry>:
1 $Choosing[i] := true$
2 $Number[i] := \max(Number[0],...,Number[n - 1]) + 1$
3 $Choosing[i] := false$
4 for $j := 1$ to $n (\neq i)$ do
5 wait until $Choosing[j] = false$
6 wait until $Number[j] = 0$ or ($Number[j],j > (Number[i],i)$ (Critical Section)
<Entry>:
7 $Number[i] := 0$

(Exit)

We leave the proofs of the following theorems as exercises.

Theorem 21.28 The bakery algorithm guarantees mutual exclusion.

Theorem 21.29 The bakery algorithm guarantees no deadlock.

A bounded mutual exclusion algorithm for $n$ processors
Lamport's Bakery Algorithm [10] requires the use of unbounded values. We next
present an algorithm that removes this requirement. In this algorithm, first presented
by Peterson and Fischer [17], processors compete pairwise using a two-processor
algorithm in a tournament tree arrangement. All pairwise competitions are arranged
in a complete binary tree. Each processor is assigned to a specific leaf of the tree.
At each level, the winner in a given node is allowed to proceed to the next higher
level, where it will compete with the winner moving up from the other child of this
node (if such a winner exists). The processor that finally wins the competition at
the root node is allowed to enter the critical section.

Let $k = \lceil \log n \rceil - 1$. Consider a complete binary tree with $2^k$ leaves and a total
of $2^{k+1} - 1$ nodes. The nodes of the tree are numbered inductively in the following
manner: The root is numbered 1; the left child of node numbered $m$ is numbered $2m$
and the right child is numbered $2m + 1$. Hence the leaves of the tree are numbered
$2^k, 2^{k+1}, ..., 2^{k+1} - 1$.

With each node $m$, three binary shared variables are associated: $Want^m[0]$, $Want^m[1]$, and $Priority^m$. All variables have an initial value of 0. The algorithm
is recursive. The code of the algorithm consists of a procedure $Node(m, side)$ which
is executed when a processor accesses node $m$, while assuming the role of processor
side. Each node has a critical section. It includes the entry section at all the nodes
on the path from the nodes parent to the root, the original critical section and the
exit code on all nodes from the root to the nodes parent. To begin, processor $p_i$
executes the code of node $(2^k + \lfloor i/2 \rfloor, i \mod 2)$. 


The tournament tree algorithm: A bounded mutual exclusion algorithm for \(n\) processors.

```plaintext
procedure Node(m: integer; side: 0..1
1 \(Want^m[side] := 0\)
2 wait until (\(Want^m[1 - side] = 0\) or \(Priority^m = side\))
3 \(Want^m[side] := 1\)
4 if \(Priority^m = 1 - side\) then
5 \quad if \(Want^m[1 - side] = 1\) then goto line 1
6 else wait until \(Want^m[1 - side] = 0\)
7 if \((v = 1)\) then
8 \quad (Critical Section)
9 else Node(\([m/2], m \mod 2\))
10 \(Priority^m = 1 - side\)
11 \(Want^m[side] := 0\)
end procedure
```

This algorithm uses bounded values and as the next theorem shows, satisfies the mutual exclusion, no lockout properties:

**Theorem 21.30**  The tournament tree algorithm guarantees mutual exclusion.

**Proof.** Consider any execution. We begin at the nodes closest to the leaves of the tree. A processor enters the critical section of this node if it reaches line 9 (it moves up to the next node). Assume we are at a node \(m\) that connects to the leaves where \(p_i\) and \(p_j\) start. Assume that two processors are in the critical section at some point. It follows from the code that then \(Want^m[0] = Want^m[1] = 1\) at this point. Assume, without loss of generality that \(p_i\)'s last write to \(Want^m[0]\) before entering the critical section follows \(p_j\)'s last write to \(Want^m[1]\) before entering the critical section. Note that \(p_i\) can enter the critical section (of \(m\)) either through line 5 or line 6. In both cases \(p_i\) reads \(Want^m[1] = 0\). However \(p_i\)'s read of \(Want^m[1]\), follows \(p_j\)'s write to \(Want^m[0]\), which by assumption follows \(p_j\)'s write to \(Want^m[1]\). Hence \(p_i\)'s read of \(Want^m[1]\) should return 1, a contradiction.

The claim follows by induction on the levels of the tree.

**Theorem 21.31**  The tournament tree algorithm guarantees no lockout.

**Proof.** Consider any admissible execution. Assume that some processor \(p_i\) is starved. Hence from some point on \(p_i\) is forever in the entry section. We now show that \(p_i\) cannot be stuck forever in the entry section of a node \(m\). The claim then follows by induction.

*Case 1:* Suppose \(p_j\) executes line 10 setting \(Priority^m\) to 0. Then \(Priority^m\) equals 0 forever after. Thus \(p_i\) passes the test in line 2 and skips line 5. Hence \(p_i\) must be waiting in line 6, waiting for \(Want^m[1]\) to be 0, which never occurs. Thus \(p_j\) is always executing between lines 3 and 11. But since \(p_j\) does not stay in the critical section forever, this would mean that \(p_j\) is stuck in the entry section forever which is impossible since \(p_j\) will execute line 5 and reset \(Want^m[1]\) to 0.

*Case 2:* Suppose \(p_j\) never executes line 10 at some later point. Hence \(p_j\) must
be waiting in line 6 or be in the remainder section. If it is in the entry section, $p_j$
passes the test in line 2 ($Priority^m$ is 1). Hence $p_i$ does not reach line 6. Therefore $p_i$
wants in line 2 with $Want^m[0] = 0$. Hence $p_j$ passes the test in line 6. So $p_j$ cannot
be forever in the entry section. If $p_j$ is forever in the remainder section $Want^m[1]$ equals 0 henceforth. So $p_i$ cannot be stuck at line 2, 5 or 6, a contradiction.

The claim follows by induction on the levels of the tree.

**Lower Bound on the Number of Read/Write Registers**

So far, all deadlock-free mutual exclusion algorithms presented require the use of at
least $n$ shared variables, where $n$ is the number of processors. Since it was possible to
develop an algorithm that uses only bounded values [17], the question arises whether
there is a way of reducing the number of shared variables used. Burns and Lynch [3]
first showed that any deadlock-free mutual exclusion algorithm using only shared
read/write registers must use at least $n$ shared variables, regardless of their size. The
proof of this theorem allows the variables to be multi-writer variables. This means
that each processor is allowed to write to each variable. Note that if the variables
are single writer, that the theorem is obvious since each processor needs to write
something to a (separate) variable before entering the critical section. Otherwise
a processor could enter the critical section without any other processor knowing,
allowing another processor to enter the critical section concurrently, a contradiction
to the mutual exclusion property.

The proof by Burns and Lynch [3] introduces a new proof technique, a covering
argument: Given any no deadlock mutual exclusion algorithm $A$, it shows that there
is some reachable configuration of $A$ in which each of the $n$ processors is about to
write to a distinct shared variable. This is called a covering of the shared variables.
The existence of such a configuration can be shown using induction and it exploits
the fact that any processor before entering the critical section, must write to at
least one shared variable. The proof constructs a covering of all shared variables. A
processor then enters the critical section. Immediately thereafter the covering writes
are released so that no processor can detect the processor in the critical section.
Another processor now concurrently enters the critical section, a contradiction.

**Theorem 21.32** Any no deadlock mutual exclusion algorithm using only
read/write registers must use at least $n$ shared variables.

21.8.5. Lamport’s Fast Mutual Exclusion Algorithm

In all mutual exclusion algorithms presented so far, the number of steps taken by
processors before entering the critical section depends on $n$, the number of processors
even in the absence of contention (where multiple processors attempt to concurrently
enter the critical section), when a single processor is the only processor in the entry
section. In most real systems however, the expected contention is usually much
smaller than $n$.

A mutual exclusion algorithm is said to be fast if a processor enters the critical
section within a constant number of steps when it is the only processor trying to
enter the critical section. Note that a fast algorithm requires the use of multi-writer,
multi-reader shared variables. If only single writer variables are used, a processor
would have to read at least \( n \) variables.

Such a Fast Mutual Exclusion Algorithm is presented by Lamport \[11\].

**Lamport’s Fast Mutual Exclusion Algorithm:**

Code for processor \( p_i \), \( 0 \leq i \leq n - 1 \). Initially *Fast-Lock* and *Slow-Lock* are 0, and *Want\[i\]* is false for all \( i, \ 0 \leq i \leq n - 1 \)

\[
\langle \text{Entry} \rangle:
\begin{align*}
1 & \text{ Want}[i] := \text{true} \\
2 & \text{ Fast-Lock} := i \\
3 & \text{ if } \text{ Slow-Lock} \neq 0 \text{ then} \\
4 & \quad \text{ Want}[i] := \text{false} \\
5 & \quad \text{ wait until Slow-Lock = 0} \\
6 & \quad \text{ goto 1} \\
7 & \text{ Slow-Lock} := i \\
8 & \text{ if } \text{ Fast-Lock} \neq i \text{ then} \\
9 & \quad \text{ Want}[i] := \text{false} \\
10 & \quad \text{ for all } j, \text{ wait until Want}[j] = \text{false} \\
11 & \quad \text{ if Slow-Lock \neq i then} \\
12 & \quad \quad \text{ wait until Slow-Lock = 0} \\
13 & \quad \quad \text{ goto 1} \\
\end{align*}
\]

\langle \text{Critical Section} \rangle

\langle \text{Exit} \rangle:

\begin{align*}
14 & \text{ Slow-Lock} := 0 \\
15 & \text{ Want}[i] := \text{false} \\
\end{align*}

\langle \text{Remainder} \rangle

Lamport’s algorithm is based on the correct combination of two mechanisms, one for allowing fast entry when no contention is detected, and the other for providing deadlock freedom in the case of contention. Two variables, *Fast-Lock* and *Slow-Lock* are used for controlling access when there is no contention. In addition, each processor \( p_i \) has a boolean variable *Want\[i\]* whose value is true if \( p_i \) is interested in entering the critical section and false otherwise. A processor can enter the critical section by either finding *Fast-Lock* = \( i \) - in this case it enters the critical section on the fast path - or by finding *Slow-Lock* = \( i \) in which case it enters the critical section along the slow path.

Consider the case where no processor is in the critical section or in the entry section. In this case, *Slow-Lock* is 0 and all *Want* entries are 0. Once \( p_i \) now enters the entry section, it sets *Want\[i\]* to 1 and *Fast-Lock* to \( i \). Then it checks *Slow-Lock* which is 0, then it checks *Fast-Lock* again and since no other processor is in the entry section it reads \( i \) and enters the critical section along the fast path with three writes and two reads.

If *Fast-Lock* \( \neq i \) then \( p_i \) waits until all *Want* flags are reset. After some processor executes the for loop in line 10, the value of *Slow-Lock* remains unchanged until
some processor leaving the critical section resets it. Hence at most one processor \( p_j \) may find \( \text{Slow-Lock} = j \) and this processor enters the critical section along the slow path. Note that the Lamport’s Fast Mutual Exclusion algorithm does not guarantee lockout freedom.

**Theorem 21.33** Lamport’s Fast Mutual Exclusion algorithm guarantees mutual exclusion without deadlock.

**Exercises**

21.8-1 An algorithm solves the 2-mutual exclusion problem if at any time at most two processors are in the critical section. Present an algorithm for solving the 2-mutual exclusion problem using test & set registers.

21.8-2 Prove that bakery algorithm satisfies the mutual exclusion property.

21.8-3 Prove that bakery algorithm provides no lockout.

21.8-4 Isolate a bounded mutual exclusion algorithm with no lockout for two processors from the tournament tree algorithm. Show that your algorithm has the mutual exclusion property. Show that it has the no lockout property.

21.8-5 Prove that Lamport’s Fast Mutual Exclusion Algorithm has the mutual exclusion property.

21.8-6 Prove that Lamport’s Fast Mutual Exclusion Algorithm has the no deadlock property.

21.8-7 Show that Lamport’s Fast Mutual Exclusion Algorithm does not satisfy the no lockout property, i.e. construct an execution in which a processor is locked out of the critical section.

21.8-8 Construct an execution of Lamport’s Fast Mutual Exclusion Algorithm in which two processors are in the entry section and both read at least \( \Omega(n) \) variables before entering the critical section.

**Chapter notes**
Bibliography


Subject Index

E, Ê
  efficiency, 2028

relative speed, 2028

W
  work, 2028

R
Name index

A, Á
Attiya, Hağı, 2002
Awerbuch, B., 2002

B
Berman, P., 2002
Burns, J. E., 2002

D
Dolev, D., 2002
Dwork, Cynthia, 2002

F
Fischer, M. J., 2002

G
Garay, J., 2002
Gray, J. N., 2002
Gries, David, 2002

L
Lamport, Leslie, 2002
Lynch, Nancy Ann, 2002

O, Ö
Owicki, Susan Speer, 2002

P
Paterson, M. S., 2002
Pean, M., 2002
Peterson, G., 2002

S
Shostak, R., 2002
Stockmeyer, Larry, 2002
Strong, R., 2002

W
Welch, Jennifer Lundelius, 2002
Contents

21. Distributed Algorithms (Burkhard Englert, Dariusz Kowalski, Grzegorz Malewicz, Alexander Shvartsman) ................................................................. 2000
    21.1.3. Synchronous systems ........................................................... 2002
  21.2. Basic algorithms ............................................................................. 2003
    21.2.1. Broadcast ................................................................................. 2003
    21.2.2. Construction of a spanning tree ............................................... 2004
  21.3. Ring Algorithms ............................................................................. 2009
    21.3.1. The leader election problem ...................................................... 2009
    21.3.2. The Leader Election Algorithm ................................................ 2009
    21.3.3. Analysis of the Leader Election Algorithm .................................. 2013
  21.4. Fault-Tolerant Consensus ............................................................... 2015
    21.4.1. The Consensus Problem ........................................................... 2016
    21.4.2. Consensus with Crash Failures .................................................. 2016
    21.4.3. Consensus with Byzantine Failures .......................................... 2018
    21.4.4. Lower Bound on the Ratio of Faulty Processors ......................... 2018
    21.4.5. A Polynomial Algorithm .......................................................... 2018
    21.4.6. Impossibility in Asynchronous Systems ..................................... 2020
  21.5. Logical Time, Causality, and Consistent State ................................... 2021
    21.5.1. Logical time .............................................................................. 2022
    21.5.2. Causality .................................................................................. 2023
    21.5.3. Consistent state ....................................................................... 2026
  21.6. Communication services ............................................................... 2028
    21.6.1. Properties of Broadcast Services .............................................. 2028
      Variants of ordering requirements ..................................................... 2029
      Reliability requirements ..................................................................... 2030
    21.6.2. Ordered Broadcast Services ...................................................... 2030
      Implementing Basic Broadcast on top of asynchronous point-to-point messaging ....................................................... 2030
      Implementing Single-Source FIFO on top of Basic Broadcast service .......................................................... 2031
Implementing Causal Order and Total Order on the top of Single-Source FIFO service .................................. 2031

21.6.3. Multicast services .......................................................... 2034

21.7. Rumor Collection Algorithms .......................................... 2035

21.7.1. Rumor Collection (Gossip) Problem and Requirements ... 2036

21.7.2. Efficient Gossip Algorithms ....................................... 2036

Communication graph ......................................................... 2037

Communication schedules ................................................... 2037

Generic algorithm ............................................................. 2037

21.8. Mutual Exclusion in Shared Memory .................................. 2043

21.8.1. Shared Memory Systems ........................................... 2043

21.8.2. The Mutual Exclusion Problem .................................. 2044

21.8.3. Mutual Exclusion Using Powerful Primitives .............. 2045

21.8.4. Mutual Exclusion Using Read/Write Registers ............. 2046

The Bakery Algorithm ....................................................... 2046

A bounded mutual exclusion algorithm for \( n \) processors ... 2047

Lower Bound on the Number of Read/Write Registers .......... 2049

21.8.5. Lamport's Fast Mutual Exclusion Algorithm ............... 2049

Bibliography ........................................................................ 2052

Subject Index ....................................................................... 2053

Name index ......................................................................... 2054