This pattern is part of Patterns of Distributed Systems

Replicated Log

Keep the state of multiple nodes synchronized by using a write-ahead log that is replicated to all the cluster nodes.

11 January 2022

Problem

When multiple nodes share a state, the state needs to be synchronized. All cluster nodes need to agree on the same state, even when some nodes crash or get disconnected. This requires achieving consensus for each state change request.

But achieving consensus on individual requests is not enough. Each replica also needs to execute requests in the same order, otherwise different replicas can get into a different final state, even if they have consensus on an individual request.

Solution

Cluster nodes maintain a Write-Ahead Log. Each log entry stores the state required for consensus along with the user request. They coordinate to build consensus over log entries, so that all cluster nodes have exactly the same Write-Ahead log. The requests are then executed sequentially following the log. Because all cluster nodes agree on each log entry, they execute the same requests in the same order. This ensures that all the cluster nodes share the same state.

A fault tolerant consensus building mechanism using Quorum needs two phases.

  • A phase to establish a Generation Clock and to know about the log entries replicated in the previous Quorum.
  • A phase to replicate requests on all the cluster nodes.

Executing two phases for each state change request is not efficient. So cluster nodes select a leader at startup. The leader election phase establishes the Generation Clock and detects all log entries in the previous Quorum. (The entries the previous leader might have copied to the majority of the cluster nodes.) Once there is a stable leader, only the leader co-ordinates the replication. Clients communicate with the leader. The leader adds each request to the log and makes sure that it's replicated on all the followers. Consensus is reached once a log entry is successfully replicated to the majority of the followers. This way, only one phase execution to reach consensus is needed for each state change operation when there is a stable leader.

Multi-Paxos and Raft

Multi-Paxos and Raft are the most popular algorithms to implement replicated-log. Multi-Paxos is only vaguely described in academic papers. Cloud databases such as Spanner and Cosmos DB use Multi-Paxos, but the implementation details are not well documented. Raft very clearly documents all the implementation details, and is a preferred implementation choice in most open source systems, even though Paxos and its variants are discussed a lot more in academia.

Following sections describe how Raft implements a replicated log.

Replicating client requests

Figure 1: Replication

For each log entry, the leader appends it to its local Write-Ahead log and then sends it to all the followers.

leader (class ReplicatedLog...)

  private Long appendAndReplicate(byte[] data) {
      Long lastLogEntryIndex = appendToLocalLog(data);
      replicateOnFollowers(lastLogEntryIndex);
      return lastLogEntryIndex;
  }


  private void replicateOnFollowers(Long entryAtIndex) {
      for (final FollowerHandler follower : followers) {
          replicateOn(follower, entryAtIndex); //send replication requests to followers
      }
  }

The followers handle the replication request and append the log entries to their local logs. After successfully appending the log entries, they respond to the leader with the index of the latest log entry they have. The response also includes the current Generation Clock of the server.

The followers also check if the entries already exist or there are entries beyond the ones which are being replicated. It ignores entries which are already present. But if there are entries which are from different generations, they remove the conflicting entries.

follower (class ReplicatedLog...)

  void maybeTruncate(ReplicationRequest replicationRequest) {
      replicationRequest.getEntries().stream()
              .filter(entry -> wal.getLastLogIndex() >= entry.getEntryIndex() &&
                      entry.getGeneration() != wal.readAt(entry.getEntryIndex()).getGeneration())
              .forEach(entry -> wal.truncate(entry.getEntryIndex()));
  }

follower (class ReplicatedLog...)

  private ReplicationResponse appendEntries(ReplicationRequest replicationRequest) {
      List<WALEntry> entries = replicationRequest.getEntries();
      entries.stream()
              .filter(e -> !wal.exists(e))
              .forEach(e -> wal.writeEntry(e));
      return new ReplicationResponse(SUCCEEDED, serverId(), replicationState.getGeneration(), wal.getLastLogIndex());
  }

The follower rejects the replication request when the generation number in the request is lower than the latest generation the server knows about. This notifies the leader to step down and become a follower.

follower (class ReplicatedLog...)

  Long currentGeneration = replicationState.getGeneration();
  if (currentGeneration > request.getGeneration()) {
      return new ReplicationResponse(FAILED, serverId(), currentGeneration, wal.getLastLogIndex());
  }

The Leader keeps track of log indexes replicated at each server, when responses are received. It uses it to track the log entries which are successfully copied to the Quorum and tracks the index as a commitIndex. commitIndex is the High-Water Mark in the log.

leader (class ReplicatedLog...)

  logger.info("Updating matchIndex for " + response.getServerId() + " to " + response.getReplicatedLogIndex());
  updateMatchingLogIndex(response.getServerId(), response.getReplicatedLogIndex());
  var logIndexAtQuorum = computeHighwaterMark(logIndexesAtAllServers(), config.numberOfServers());
  var currentHighWaterMark = replicationState.getHighWaterMark();
  if (logIndexAtQuorum > currentHighWaterMark && logIndexAtQuorum != 0) {
      applyLogEntries(currentHighWaterMark, logIndexAtQuorum);
      replicationState.setHighWaterMark(logIndexAtQuorum);
  }

leader (class ReplicatedLog...)

  Long computeHighwaterMark(List<Long> serverLogIndexes, int noOfServers) {
      serverLogIndexes.sort(Long::compareTo);
      return serverLogIndexes.get(noOfServers / 2);
  }

leader (class ReplicatedLog...)

  private void updateMatchingLogIndex(int serverId, long replicatedLogIndex) {
      FollowerHandler follower = getFollowerHandler(serverId);
      follower.updateLastReplicationIndex(replicatedLogIndex);
  }

leader (class ReplicatedLog...)

  public void updateLastReplicationIndex(long lastReplicatedLogIndex) {
      this.matchIndex = lastReplicatedLogIndex;
  }

Full replication

It is important to ensure that all the cluster nodes receive all the log entries from the leader, even when they are disconnected or they crash and come back up. Raft has a mechanism to make sure all the cluster nodes receive all the log entries from the leader.

With every replication request in Raft, the leader also sends the log index and generation of the log entries which immediately precede the new entries getting replicated. If the previous log index and term do not match with its local log, the followers reject the request. This indicates to the leader that the follower log needs to be synced for some of the older entries.

follower (class ReplicatedLog...)

  if (!wal.isEmpty() && request.getPrevLogIndex() >= wal.getLogStartIndex() &&
           generationAt(request.getPrevLogIndex()) != request.getPrevLogGeneration()) {
      return new ReplicationResponse(FAILED, serverId(), replicationState.getGeneration(), wal.getLastLogIndex());
  }

follower (class ReplicatedLog...)

  private Long generationAt(long prevLogIndex) {
      WALEntry walEntry = wal.readAt(prevLogIndex);

      return walEntry.getGeneration();
  }

So the leader decrements the matchIndex and tries sending log entries at the lower index. This continues until the followers accept the replication request.

leader (class ReplicatedLog...)

  //rejected because of conflicting entries, decrement matchIndex
  FollowerHandler peer = getFollowerHandler(response.getServerId());
  logger.info("decrementing nextIndex for peer " + peer.getId() + " from " + peer.getNextIndex());
  peer.decrementNextIndex();
  replicateOn(peer, peer.getNextIndex());

This check on the previous log index and generation allows the leader to detect two things.

  • If the follower log has missing entries. For example, if the follower log has only one entry and the leader starts replicating the third entry, the requests will be rejected until the leader replicates the second entry.
  • If the previous entries in the log are from a different generation, higher or lower than the corresponding entries in the leader log. The leader will try replicating entries from lower indexes until the requests get accepted. The followers truncate the entries for which the generation does not match.

This way, the leader tries to push its own log to all the followers continuously by using the previous index to detect missing entries or conflicting entries. This makes sure that all the cluster nodes eventually receive all the log entries from the leader even when they are disconnected for some time.

Raft does not have a separate commit message, but sends the commitIndex as part of the normal replication requests. The empty replication requests are also sent as heartbeats. So commitIndex is sent to followers as part of the heartbeat requests.

Log entries are executed in the log order

Once the leader updates its commitIndex, it executes the log entries in order, from the last value of the commitIndex to the latest value of the commitIndex. The client requests are completed and the response is returned to the client once the log entries are executed.

class ReplicatedLog…

  private void applyLogEntries(Long previousCommitIndex, Long commitIndex) {
      for (long index = previousCommitIndex + 1; index <= commitIndex; index++) {
          WALEntry walEntry = wal.readAt(index);
          var responses = stateMachine.applyEntries(Arrays.asList(walEntry));
          completeActiveProposals(index, responses);
      }
  }

The leader also sends the commitIndex with the heartbeat requests it sends to the followers. The followers update the commitIndex and apply the entries the same way.

class ReplicatedLog…

  private void updateHighWaterMark(ReplicationRequest request) {
      if (request.getHighWaterMark() > replicationState.getHighWaterMark()) {
          var previousHighWaterMark = replicationState.getHighWaterMark();
          replicationState.setHighWaterMark(request.getHighWaterMark());
          applyLogEntries(previousHighWaterMark, request.getHighWaterMark());
      }
  }

Leader Election

Leader election is the phase where log entries committed in the previous quorum are detected. Every cluster node operates in three states: candidate, leader or follower. The cluster nodes start in a follower state expecting a HeartBeat from an existing leader. If a follower doesn't hear from any leader in a predetermined time period ,it moves to the candidate state and starts leader-election. The leader election algorithm establishes a new Generation Clock value. Raft refers to the Generation Clock as term.

The leader election mechanism also makes sure the elected leader has as many up-to-date log entries stipulated by the quorum. This is an optimization done by Raft which avoids log entries from previous Quorum being transferred to the new leader.

New leader election is started by sending each of the peer servers a message requesting a vote.

class ReplicatedLog…

  private void startLeaderElection() {
      replicationState.setGeneration(replicationState.getGeneration() + 1);
      registerSelfVote();
      requestVoteFrom(followers);
  }

Once a server is voted for in a given Generation Clock, the same vote is returned for that generation always. This ensures that some other server requesting a vote for the same generation is not elected, when a successful election has already happened. The handling of the vote request happens as follows:

class ReplicatedLog…

  VoteResponse handleVoteRequest(VoteRequest voteRequest) {
      //for higher generation request become follower.
      // But we do not know who the leader is yet.
      if (voteRequest.getGeneration() > replicationState.getGeneration()) {
          becomeFollower(LEADER_NOT_KNOWN, voteRequest.getGeneration());
      }

      VoteTracker voteTracker = replicationState.getVoteTracker();
      if (voteRequest.getGeneration() == replicationState.getGeneration() && !replicationState.hasLeader()) {
              if(isUptoDate(voteRequest) && !voteTracker.alreadyVoted()) {
                  voteTracker.registerVote(voteRequest.getServerId());
                  return grantVote();
              }
              if (voteTracker.alreadyVoted()) {
                  return voteTracker.votedFor == voteRequest.getServerId() ?
                          grantVote():rejectVote();

              }
      }
      return rejectVote();
  }

  private boolean isUptoDate(VoteRequest voteRequest) {
      boolean result = voteRequest.getLastLogEntryGeneration() > wal.getLastLogEntryGeneration()
              || (voteRequest.getLastLogEntryGeneration() == wal.getLastLogEntryGeneration() &&
              voteRequest.getLastLogEntryIndex() >= wal.getLastLogIndex());
      return result;
  }

The server which receives votes from the majority of the servers transitions to the leader state. The majority is determined as discussed in Quorum. Once elected, the leader continuously sends a HeartBeat to all of the followers. If the followers don't receive a HeartBeat in a specified time interval, a new leader election is triggered.

Log entries from previous generation

As discussed in the above section, the first phase of the consensus algorithms detects the existing values, which had been copied on the previous runs of the algorithm. The other key aspect is that these values are proposed as the values with the latest generation of the leader. The second phase decides that the value is committed only if the values are proposed for the current generation. Raft never updates generation numbers for the existing entries in the log. So if the leader has log entries from the older generation which are missing from some of the followers, it can not mark those entries as committed just based on the majority quorum. That is because some other server which may not be available now, can have an entry at the same index with higher generation. If the leader goes down without replicating an entry from its current generation, those entries can get overwritten by the new leader. So in Raft, the new leader must commit at least one entry in its term. It can then safely commit all the previous entries. Most practical implementations of Raft try to commit a no-op entry immediately after a leader election, before the leader is considered ready to serve client requests. Refer to [raft-phd] section 3.6.1 for details.

An example leader-election

Consider five servers, athens, byzantium, cyrene, delphi and ephesus. ephesus is the leader for generation 1. It has replicated entries to itself, delphi and athens.

Figure 2: Lost heartbeat triggers an election

At this point, ephesus and delphi get disconnected from the rest of the cluster.

byzantium has the least election timeout, so it triggers the election by incrementing its Generation Clock to 2. cyrene has its generation less than 2 and it also has same log entry as byzantium. So it grants the vote. But athens has an extra entry in its log. So it rejects the vote.

Because byzantium can not get a majority 3 votes, it loses the election and moves back to follower state.

Figure 3: Lost election because log is not up-to-date

athens times out and triggers the election next. It increments the Generation Clock to 3 and sends vote request to byzantium and cyrene. Because both byzantium and cyrene have lower generation number and less log entries than athens, they both grant the vote to athens. Once athens gets majority of the votes, it becomes the leader and starts sending HeartBeats to byzantium and cyrene. Once byzantium and cyrene receive a heartbeat from the leader at higher generation, they increment their generation. This confirms the leadership of athens. athens then replicates its own log to byzantium and cyrene.

Figure 4: Node with up-to-date log wins election

athens now replicates Entry2 from generation 1 to byzantium and cyrene. But because it's an entry from the previous generation, it does not update the commitIndex even when Entry2 is successfully replicated on the majority quorum.

athens appends a no-op entry to its local log. After this new entry in generation 3 is successfully replicated, it updates the commitIndex

If ephesus comes back up or restores network connectivity and sends request to cyrene. Because cyrene is now at generation 3, it rejects the requests. ephesus gets the new term in the rejection response, and steps down to be a follower.

Figure 7: Leader step-down

Technical Considerations

Following are some of the important technical considerations for any replicated log mechanism.

  • The first phase of any consensus building mechanism needs to know about the log entries which might be replicated on the previous Quorum. The leader needs to know about all such log entries and make sure that they are replicated on each cluster node.

    Raft makes sure that the cluster node with as up-to-date a log as the quorum of the servers is elected as a leader, so log entries do not need to be passed from other cluster nodes to the new leader.

    It is possible that some entries are conflicting. In this case, the conflicting entries from the follower log are overwritten.

  • It is possible that some cluster nodes in the cluster lag behind, either because they crash and restart or get disconnected from the leader. The leader needs to track each cluster node and make sure that it sends all the missing entries.

    Raft maintains a state per cluster node to know the log index to which the log entries are successfully copied to each node. The replication requests to each cluster node are sent with all the entries from that log index, making sure that each cluster node gets all the log entries.

  • How the client interacts with the replicated log to find the leader is implemented is discussed in the Consistent Core. The cluster detecting duplicate requests in case of client retries is handled by the Idempotent Receiver.

  • The logs are generally compacted by using Low-Water Mark. A snapshot of the store backed by replicated log is taken periodically, say after a few thousand entries are applied. The log is then discarded to the index at which the snapshot is taken. The slow followers or the newly added servers, which need the full log to be sent, are sent the snapshot instead of individual log entries.

  • One of the key assumptions here is that all the requests need to be strictly ordered. This might not be the requirement always. For example a key value store might not require ordering across requests for different keys. In such situations, it is possible to run a different consensus instance per key. It then also removes the need to have a single leader for all the requests.

    EPaxos is an algorithm which does not rely on a single leader for ordering of requests.

    In partitioned databases like MongoDB, a replicated log is maintained per partition. So requests are ordered per partition, but not across partitions.

Push vs Pull

In the Raft replication mechanism explained here, the leader pushes all the log entries to followers. It is also possible to have followers pull the log entries. The Raft implementation in Kafka follows pull-based replication.

What goes in the log?

The replicated log mechanism is used for a wide variety of applications, from a key-value store to blockchain.

For a key-value store, the entries in the log are about setting key-values. For a Lease the entries are about setting up named leases. For a blockchain, the entries are the blocks in a blockchain which need to be served to all the peers in the same order. For databases like MongoDB, the entries are the data that needs to be consistently replicated.

Examples

Replicated log is the mechanism used by Raft, Multi-Paxos, Zab and viewstamped replication protocols. The technique is called state machine replication where replicas execute the same commands in the same order. Consistent Core is often built with state machine replication.

Blockchain implementations like hyperledger fabric have an ordering component which is based on a replicated log mechanism. Previous versions of hyperledger fabric used Kafka for ordering of the blocks in the blockchain. Recent versions use Raft for the same purpose.

Significant Revisions

11 January 2022: