Project 6: Distributed Key-Value Database

Achieve consistent replication in a hostile network

Description

In his project, you will build a (relatively) simple, distributed, replicated key-value datastore. A key-value datastore is a very simple type of database that supports two API calls from clients: put(key, value) and get(key). The former API allows a client application to store a key-value pair in the database, while the latter API allows a client to retrieve a previously stored value by supplying its key. Real-world examples of distributed key-value datastores include memcached, Redis, DynamoDB, etc.

Of course, it would be simple to build a key-value store if it was a single process. However, your system must be replicated and support strong consistency guarantees. Thus, you will be implementing a simplified version of the Raft consensus protocol. Your datastore will be run multiple times, in parallel, and it will use the Raft protocol to maintain consensus among the replicas.

Your datastore will be tested for both correctness and performance. We will provide a simulator for your datastore that will simulate clients who execute put() and get() commands, as well as an unreliable network that can drop packets or make hosts unavailable. Part of your grade will come from the overhead your system has (i.e., fewer packets will result in a higher score), while another part will depend on the speed at which your datastore answers client queries (i.e., what is the query latency).

Your Program

For this project, you will submit source code for one program named 3700kvstore that implements your replicated datastore. You may use any language of your choice, and we will give you basic starter code in Python. Keep in mind that you are writing a program that will be run multiple times, in parallel, to form a distributed system.

If you use C or any other compiled language, your executable should be named 3700kvstore. If you use an interpreted language, your script must be called 3700kvstore, must include an appropriate shebang, and be marked as executable. If you use a virtual machine-based language (like Java or C#), you may need to write a brief Bash shell script, named 3700kvstore, that conforms to the input syntax given below and then launches your program using whatever incantations are necessary. For example, if you write your solution in Java, your Bash script might resemble

#!/usr/bin/perl -w
$args = join(' ', @ARGV);
print 'java -jar 3700kvstore.jar $args';

Or, if you use python, your script might start with

#!/usr/bin/env python3
import foo from bar

and should be marked executable.

Language and Libraries

You can write your code in whatever language you choose, as long as your code compiles and runs on Gradescope. Do not use libraries that are disallowed for this project. Similarly, your code must compile and run on the command line. You may use IDEs (e.g., Eclipse) during development, but do not turn in your project without a Makefile. Make sure you code has no dependencies on your IDE. We provide starter code in Python; you are welcome to use this, but if you decide use another language, you will need to port the starter code to your language yourself.

You may not use libraries or modules that implement consensus protocols. This includes any library that implements Raft, Paxos, Replicated View-state, or similar protocols. Obviously, you cannot use any libraries or software packages that implement a replicated key-value datastore. For example, your program cannot be a thin wrapper around memcached, etc. You may use libraries or modules that implement local database storage (e.g., SQLite, BerkeleyDB, LevelDB) if you want to use them for persistent storage within each replica. If you have any questions about whether a particular library or module is allowed, post on Piazza.

Starter Code

Very basic starter code for the assignment in python is available on the Khoury GitHub server. The included starter code and simulator and written in Python 3, and should be able to run on any system with a relatively modern version of Python installed (e.g., Python 3.6 or greater). To get started, you should create a copy of the starter code on your local machine (or on the Khoury Linux machines if you wish):

git clone https://github.khoury.northeastern.edu/cs3700/raft-starter-code.git

The starter code provides a bare-bones implementation of a datastore that simply connects to the LAN and broadcasts a “no-op” message once every second. You may use this code as a basis for your project if you wish, but it is strongly recommended that you do not do so unless you are comfortable with Python.

Testing Your Code

To evaluate your replicated datastore, we have provided a simulated test environment. The simulator will create an emulated network and all necessary sockets, execute several copies of your datastore with the appropriate command line arguments, route messages between the datastore replicas, and generate requests from clients. You will not need to modify the simulator during this project, although you may look at its source code if you wish.

The capabilities of the run script are as follows:

$ ./run --help
usage: run [-h] [--replica REPLICA] [--silence] [--config_directory CONFIG_DIRECTORY] test

positional arguments:
  test                  Path to a test config file, or "all" to run all tests.

optional arguments:
  -h, --help            show this help message and exit
  --replica REPLICA, -r REPLICA
                        Fully qualified path to your replica program (Default: ./3700kvstore)
  --silence, -s         Pipe stdout and stderr of replicas to /dev/null. (Default: False)
  --config_directory CONFIG_DIRECTORY, -c CONFIG_DIRECTORY
                        Path to a directory containing test configs. Only used when running "all" tests. (Default: ./configs/)

By default, the simulator expects your replica program to be named 3700kvstore and for it to be in the same directory as the simulator. This default can be overridden using the --replica command line argument, which we have provided as a convenience to aid you in testing. Note that when we grade, your replica program must be named 3700kvstore and we will not use the --replica option.

To execute a single test in the simulator, run run with a single command line argument: the path to the desired JSON test configuration.

$ ./run <path-to-test>

As a convenience, if you want the simulator to run all test cases the last argument may be replaced with “all” (no quotes). In this case, the simulator will assume that the test configuration files are available in a folder named ./configs/, although this default may be overridden using the option command line parameter --config\_directory.

$ ./run all

The simulator has one additional, optional command line parameters. The simulator executes copies of your replica during each test. If you want to suppress the console output of your replicas, i.e., what they print to STDOUT and STDERR, pass the --silence option to the simulator.

Config File Format

The configuration file(s) that you pass to run contains a number of parameters that control the simulation. We have provided a number of test configurations in the ./configs/ folder. Your replicas do not need to read, modify, or parse these configurations; all configuration is handled by the simulator.

Each configuration file is formatted in JSON and has the following elements

  • lifetime (Required): The number of seconds the simulation should run for. Must be at least 5.

  • replicas (Required): The number of replicas to execute, i.e., copies of your 3700kvstore program. Must be at least 3.

  • requests (Required): The number of get() and put() requests to randomly generate from clients.

  • mix (Optional): Float between 0 and 1 representing the fraction of client queries that are get()s. Defaults to 0.8.

  • wait (Optional): The number of seconds to wait before sending any client requests. Defaults to 2 seconds.

  • end_wait (Optional) :The number of seconds to wait at the end of the simulation before measuring performance. Defaults to 2 seconds.

  • seed (Optional): The random seed to choose. If not specified, a random value is chosen. Setting this value will allow for a semi-reproducible set of clients and requests.

  • drops (Optional): Float between 0 and 1 representing the fraction of messages between replicas to drop. Defaults to 0.

  • events (Optional): A list of events that will occur during the simulation. Each event has a type and a time when it will trigger:

    • type (Required): The type of event. Valid types are:

      • kill_non_leader: will crash fail a random non-leader replica
      • kill_leader: will crash fail the current leader.
      • part_easy: partition the network, such that the leader has a quorum
      • part_hard: partition the network, such that the leader does not have a quorum
      • part_end: remove all network partitions
    • time (Required): The timestamp, in seconds, when the event should occur.

  • tests (Required): Information about how to test the replicas for performance and correctness:

    • maximum_get_fail_fraction (Optional): Float between 0 and 1 specifying the maximum fraction of get() requests that may fail. Defaults to 0.5.

    • maximum_put_fail_fraction (Optional): Float between 0 and 1 specifying the maximum fraction of put() requests that may fail. Defaults to 0.5.

    • maximum_get_generation_fail_fraction (Optional): Float between 0 and 1 specifying the maximum fraction of get() requests that may fail to be generated (due to insufficient put() requests succeeding). Defaults to 0.1.

    • maximum_appends_batched_fraction (Optional): Float between 0 and 1 specifying the maximum fraction of append messages that may be batched together. Defaults to 0.5.

    • benchmarks (Required): Sets thresholds across four categories for assessing the performance of the replicas. In each category, three thresholds are specified, which separate the extra credit, full credit, partial credit, and no credit performance ranges:

      • total_msgs: how many total messages were sent between the replicas (lower is better)?
      • failures: how many fail messages were sent to clients AND how many client queries were unanswered (lower is better)?
      • duplicates: how many duplicate responses were returned to clients (e.g., the same get() or put() was answered more than once) (lower is better)?
      • median_latency: median latency of answering client requests (lower is better)

For example, a simple configuration with no events and a read-heavy workload might look like the following

{
  "lifetime": 30,
  "replicas": 5,
  "requests": 500,
  "mix": 0.9,
  "tests" : {
    "benchmarks" : {
      "total_msgs"     : [1200, 3000, 5000],
      "failures"       : [0, 1, 2],
      "duplicates"     : [0, 2, 5],
      "median_latency" : [0.0004, 0.002, 0.05]
    }
  }
}

and a more complex configuration with events and a lossy network might be

{
  "lifetime": 30,
  "replicas": 5,
  "requests": 300,
  "mix" : 0.2,
  "drops" : 0.15,
  "end_wait" : 5,
  "events" : [{"type": "kill_leader", "time": 8},
              {"type": "kill_leader", "time": 16}],
  "tests" : {
    "benchmarks" : {
      "total_msgs"     : [1000, 3000, 4000],
      "failures"       : [1, 10, 100],
      "duplicates"     : [0, 2, 10],
      "median_latency" : [0.00015, 0.005, 0.05]
    }
  }
}

run Output, Single Test Configuration

The run script will output any errors it encounters during the simulation, including malformed messages, messages to unknown destinations, replicas that unexpectedly quit, etc. Once the simulation completes, run prints (1) some statistics about your datastore’s performance and behavior, (2) whether your datastore passed the correctness checks, and if not, why not, (3) how your datastore faired on the performance benchmarks. Note that performance is assessed only if the datastore passes the correctness checks.

Here is an example of the simulator’s output when a datastore fails the correctness checks:

$ ./run config.json
...
# Simulation Finished

## Useful Information and Statistics
Leaders: FFFF 0001 FFFF 0003
Replicas that died/were killed: 0/2
Total messages sent: 6370
Total messages dropped: 183
Total client get()/put() requests: 60/40
Total duplicate responses: 3
Total unanswered get()/put() requests: 33/3
Total redirects: 19
Total get()/put() failures: 15/31
Total get() with incorrect response: 7

## Correctness Checks
Error: >0 incorrect responses to get()
Error: insufficient get() requests answered (33 > 60 * 0.50)

## Correctness Checks Failed, Skipping Performance Tests

Ideally, you would like all get() and put() requests to succeed without failing and for them to have low latency. Obviously, if your system is returning incorrect values to get() requests then your datastore has consistency issues. Furthermore, you would like the total number of packets to be as low as possible, i.e., the overhead of your datastore on the network should be low.

Here is another example when the correctness checks pass; notice the performance results are now printed:

$ ./run config.json
...
# Simulation Finished
        
## Useful Information and Statistics
Leaders: FFFF 0001 FFFF 0003
Replicas that died/were killed: 0/2
Total messages sent: 6370
Total messages dropped: 183
Total client get()/put() requests: 60/40
Total duplicate responses: 3
Total unanswered get()/put() requests: 0/3
Total redirects: 19
Total get()/put() failures: 15/31
Total get() with incorrect response: 0
        
## Correctness Checks
All correctness tests passed

## Performance Tests
## <test metric>: <your score> <benchmark score>, <test result>
Total Messages Between Replicas: 6370 >= 1000, Failed
Total Failures and Unanswered Requests: 49 < 60, Passed
Duplicate Responses to Clients: 3 < 4, Partial credit, needs improvement
Median Response Latency to Clients: 0.0001 < 0.0002, Bonus!

In this case, the performance results of the datastore are mixed. This implementation has extremely low median latency and is earning bonus point, and the number of failures/unanswered requests is acceptable, but the datastore could be improved by sending fewer duplicate requests and many fewer messages overall.

run Output, All Test Configurations

Additionally, run can be run in “all” mode, which tests your replica with all available test cases. “all” mode is equivalent to what we will use when we grade your project submission. If your replica fails when the simulator is run in “all” mode, you can be assured that your replica will fare poorly when run under the grading script. To run the simulator in “all” mode, simply execute:

$ ./run all
Basic tests (5 replicas, 30 seconds, 100 requests):
No drops, no failures, 80% read		[PASS]     Performance Tiers: 3 1 2 0
No drops, no failures, 60% read		[PASS]     Performance Tiers: 2 1 2 0
No drops, no failures, 40% read		[PASS]     Performance Tiers: 2 1 1 0
No drops, no failures, 20% read		[PASS]     Performance Tiers: 3 2 2 1
Unreliable network tests (5 replicas, 30 seconds, 150 requests):
10% drops, no failures, 80% read	[FAIL]
...

This will run your replica on a number of test configurations, and will output whether your program performs sufficiently in each case. Note that the performance information is only printed if your replica passes the correctness checks for a given test. The performance tier numbers correspond to Bonus (0), Passed (1), Needs Improvement (2), and Failed (3) with respect to total messages, failures/unanswered, duplicates, and median latency, respectively.

Message Format

To simplify this project, instead of using real packet formats, we will be sending our data across the wire in JSON (many languages have utilities to encode and decode JSON, and you are welcome to use these libraries). All messages must be encoded as a dictionary and they must include the following four keys (at a minimum):

  • src: The ID of the source of the message.
  • dst: The ID of the destination of the message.
  • leader: The ID of the leader, or "FFFF" if the leader’s ID is unknown.
  • type: The type of the message.

The simulator uses src and dst instead of IP addresses in order to route and deliver messages. Furthermore, the simulator supports multicast: if dst is set to "FFFF", the message will be delivered to all replicas (use multicast sparingly, since it is expensive). leader is the ID of the replica that the sender of the message believes is the leader. All messages must include the leader so that the simulator can learn which replica is the leader (otherwise, the simulator would have no way of determining this information).

type describes the type of the message. You may define custom types in order to implement your datastore (and you may add custom keys to the message dictionary in these cases). However, there are several message types that your replicas must support in order to handle requests from clients.

  • get: get() messages are read requests from clients. They have the following format:

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "get", "MID": "<a unique string>",
    "key": "<some key>"}
    

    Your replicas may respond with an ok message which include the corresponding value:

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "ok", "MID": "<a unique string>",
    "value": "<value of the key>"}
    

    Or your replicas may respond with a fail message, in which case the client should retry the get():

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "fail", "MID": "<a unique string>"}
    

    If the client issues a get() for a key that has does not exist (i.e., it was never put()), your datastore should return an ok message with an empty value (i.e., an empty string).

  • put: put() messages are write requests from clients. They have the following format:

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "put", "MID": "<a unique string>",
    "key": "<some key>", "value": "<value of the key>"}
    

    Your replicas may respond with an ok message if the write was successful:

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "ok", "MID": "<a unique string>"}
    

    Or your replicas may respond with a fail message, in which case the client should retry the put():

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "fail", "MID": "<a unique string>"}
    
  • redirect: If the client sends any message (get() or put()) to a replica that is not the leader, it should respond with a redirect:

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "redirect", "MID": "<a unique string>"}
    

    In this case, the client will retry the request by sending it to the specified leader.

Note that in all of the above cases, the MID in a request must match the MID in the response. For example, the following would be a legal series of requests and responses, where 001A is a client and 0000 and 0001 are replicas:

Request 1     {"src": "001A", "dst": "0001", "leader": "FFFF",
               "type": "get", "MID": "4D61ACF83027", "key": "name"}
Response 1    {"src": "0001", "dst": "001A", "leader": "0000",
               "type": "redirect", "MID": "4D61ACF83027"}

Request 2     {"src": "001A", "dst": "0000", "leader": "0000",
               "type": "get", "MID": "9AB4CE50023", "key": "name"}
Response 2    {"src": "0000", "dst": "001A", "leader": "0000", "type": "ok",
               "MID": "9AB4CE50023", "value": "Christo Wilson"}

Again, you will need to develop additional, custom message types in order to implement the Raft consensus protocol. As long as your messages include the four minimum required fields (src, dst, leader, type) the simulator will ensure that your messages are delivered.

Command Line Specification

The command line syntax for your 3700kvstore program is given below. The simulator will pass parameters to each replica representing (1) the UDP port number the replica should connect to, (2) the ID of the replica, and (3) the IDs of all other replicas in the system. The syntax for launching your datastore is therefore:

./3700kvstore <UDP port> <your ID> <ID of second replica> [<ID of third replica> ...]]

The UDP port is the port number on the localhost that you should send UDP packets to in order to communicate with your replicas. For simplicity, all replica IDs are unique four-digit hexadecimal numbers (e.g., 0AA1 or F29A). You will use these IDs as the src and dst in your messages. Clients will also be assigned unique IDs by the simulator.

Connecting to the LAN

You will be using local UDP sockets to emulate a LAN. Each of your replicas will connect to a single UDP socket (the way a server would connect to a single Ethernet cable). You do not need to be intimately familiar with how UDP sockets work, but essentially they are objects that you can read or write. However, rather than sending and receiving packets over the internet, the packets are instead passed between programs on the local machine. In other words, this is how your program will send and receive data from our simulator, which is just another program running locally on the machine. You should constantly be reading from your sockets to make sure you receive all messages (they will be buffered if you don’t read immediately).

In order to let the simualtor know your replica is alive, upon startup you should send a special hello message to the broadcast address ("FFFF"). The starter code does this:

```
{"src": "<ID>", "dst": "FFFF", "leader": "FFFF", "type": "hello", "MID": "<a unique string>"}
```

We encourage you to write your code in an event-driven style using select() or poll(). This will keep your code single-threaded and will make debugging your code significantly easier. Alternatively, you can implement your datastore in a threaded or asynchronous model, but expect it to be significantly more difficult to debug.

Datastore Requirements and Assumptions

The goal of your system is to accept put()s from clients and retrieve the corresponding data when a get() is issued. To ensure that data is not lost when a process crashes, all data from clients must be replicated, which then raises the dueling issues of how to maintain consistency and achieve high-availability. To meet these goals, your datastore will implement the Raft consensus protocol. Ultimately, your datastore should achieve the following two goals:

  1. Consistency - clients should always receive correct answers to get() requests.
  2. Availability - clients should be able to execute put() and get() requests at any time with low latency (i.e., your system should execute requests quickly).

Raft is a complicated protocol, and real-world datastores are extremely complicated artifacts. To simplify this project, there are several things you do not need to implement:

  • True persistence - you do not need to write client updates to disk, or worry about committing data to permanent storage. All of the data from clients and the log of updates can live in memory.
  • Garbage collection - Raft maintains a log of all updates. In a real system, this log periodically needs to be garbage collected, since it cannot grow infinitely long. However, your system will not be running for long periods of time, and therefor you do not need to worry about garbage collection.
  • Restarts - in a real system, replicas might fail for a while then come back online, necessitating snapshots and reconciliation. However, you may assume that replicas in the simulator will crash fail, i.e., they will die completely and never return.

Implementing Raft

The Raft paper is specifically designed to be easy to read. To implement the protocol you should definitely start by reading the paper. Additional papers and resources are available on the Raft Github. I suggest the following series of steps to begin working on your datastore implementation:

  1. Add basic support for responding to client get() and put() requests. At this point, you can respond to all requests with a “type”: “fail” message.
  2. Implement the Raft election protocol (section 5.2 of the Raft paper); add the ability to respond to get() and put() requests with “type”: “redirect” messages.
  3. Add a timeout to detect leader failures (i.e., if you don’t hear from the leader in X milliseconds…) and make sure that the new election proceeds correctly.
  4. Implement a basic, empty version of the AppendEntries RPC call that doesn’t replicate any data, but acts as a keepalive message from the leader to other replicas to prevent unnecessary elections.
  5. Implement the transaction log and the “state machine” (i.e., a dictionary containing the key/value pairs from clients, Section 5.3). Don’t bother replicating the transactions, just ensure that the leader is able to correctly answer get() and put() requests.
  6. Improve your AppendEntries RPC call to actually send data to replicas. Ensure that updates are only committing when a quorum is in agreement.
  7. Add support for retrying failed commits and test it by experimenting with lossy network simulations.
  8. If you haven’t already, modify the leader election to support the additional restrictions in Section 5.4.1; test your implementation on lossy networks with failed leaders.
  9. Implement the subtle commit restriction given in Section 5.4.2.
  10. Improve your AppendEntries RPC call to implement batching, i.e., a single AppendEntries may send multiple outstanding log entries to a given replica.
  11. Test, test, test, and test some more ;)

Step 6 will probably require the most time in terms of writing code and debugging, since it is the crux of the algorithm. Implementing steps 7-10 are necessary to ensure correctness of the protocol, but shouldn’t be too difficult.

Grading

The grading in this project will be broken down as follows:

Item Percentage of Grade
Program correctness 60%
Performance 20%
Style and documentation 15%
Milestone functionality 5%

The final grading in this project will be based on the number of test configurations that your router successfully completes. More weight will be given to more difficult configurations. At a minimum, your code must pass the test suite without errors or crashes, and it must obey the requirements specified above. All student code will be scanned by plagiarism detection software to ensure that students are not copying code from the internet or each other.

Project Submission

Milestone

This is a very challenging project. In order to ensure that you are making sufficient progress, you will have an interim milestone deadline. For the milestone, your 3700kvstore program that is able to pass three of the test cases: simple-1, simple-2, and crash-1.

You should submit your milestone on Gradescope to the Project 6 Milestone project. Be sure to indicate who your teammate is, otherwise, they will not get any credit!

Final project

For the final submission, you should submit your (thoroughly documented) code, a Makefile, and a plain-text (no Word or PDF) README.md file. In this file, you should describe your high-level approach, the challenges you faced, a list of properties/features of your design that you think is good, and an overview of how you tested your code.

You should submit your milestone on Gradescope to the Project 6 Final project. Be sure to indicate who your teammate is, otherwise, they will not get any credit!