Project 6: Distributed Key-Value Database

Achieve consistent replication in a hostile network

Description

In his project, you will build a (relatively) simple, distributed, replicated key-value datastore. A key-value datastore is a very simple type of database that supports two API calls from clients: put(key, value) and get(key). The former API allows a client application to store a key-value pair in the database, while the latter API allows a client to retrieve a previously stored value by supplying its key. Real-world examples of distributed key-value datastores include memcached, Redis, DynamoDB, etc.

Of course, it would be simple to build a key-value store if it was a single process. However, your system must be replicated and support strong consistency guarantees. Thus, you will be implementing a simplified version of the Raft consensus protocol. Your datastore will be run multiple times, in parallel, and it will use the Raft protocol to maintain consensus among the replicas.

Your datastore will be tested for both correctness and performance. We will provide a simulator for your datastore that will simulate clients who execute put() and get() commands, as well as an unreliable network that can drop packets or make hosts unavailable. Part of your grade will come from the overhead your system has (i.e., fewer packets will result in a higher score), while another part will depend on the speed at which your datastore answers client queries (i.e., what is the query latency).

Your Program

For this project, you will submit source code for one program named 3700kvstore that implements your replicated datastore. You may use any language of your choice, and we will give you basic starter code in Python. Keep in mind that you are writing a program that will be run multiple times, in parallel, to form a distributed system.

If you use C or any other compiled language, your executable should be named 3700kvstore. If you use an interpreted language, your script must be called 3700kvstore, must include a shebang, and be marked as executable. If you use a virtual machine-based language (like Java or C#), you must write a brief Bash shell script, named 3700kvstore, that conforms to the input syntax given below and then launches your program using whatever incantations are necessary. For example, if you write your solution in Java, your Bash script might resemble

#!/usr/bin/perl -w
$args = join(' ', @ARGV);
print 'java -jar 3700kvstore.jar $args';

Or, if you use python, your script might start with

#!/usr/bin/env python3
import foo from bar

and should be marked executable.

Language and Libraries

You may write your code in whatever language you choose, as long as your code compiles and runs on unmodified Khoury College Linux machines on the command line. Do not use libraries that are not installed by default on the Khoury College Linux machines. Similarly, your code must compile and run on the command line. You may use IDEs (e.g., Eclipse) during development, but do not turn in your IDE project without a Makefile. Make sure you code has no dependencies on your IDE.

You may not use libraries or modules that implement consensus protocols. This includes any library that implements Raft, Paxos, Replicated View-state, or similar protocols. Obviously, you cannot use any libraries or software packages that implement a replicated key-value datastore. For example, your program cannot be a thin wrapper around memcached, etc. You may use libraries or modules that implement local database storage (e.g., SQLite, BerkeleyDB, LevelDB) if you want to use them for persistent storage within each replica. If you have any questions about whether a particular library or module is allowed, post on Piazza.

Starter Code

Very basic starter code for the assignment in Python, as well as the simulator, are available here. To get started, you should download and unpack this archive into your own local directory, since you will need the sim.py script, as well as the example configuration files. The included starter code and simulator and written in Python 3, and should be able to run on any system with a relatively modern version of Python installed (e.g., Python 3.6 or greater).

The starter code provides a bare-bones implementation of a datastore that simply connects to the LAN and broadcasts a “no-op” message once every second. You may use this code as a basis for your project if you wish, but it is strongly recommended that you do not do so unless you are comfortable with Python.

Testing Your Code

To evaluate your replicated datastore, we have provided a simulated test environment. The simulator will create an emulated network and all necessary sockets, execute several copies of your datastore with the appropriate command line arguments, route messages between the datastore replicas, and generate requests from clients. You will not need to modify the simulator during this project, although you may look at its source code if you wish.

The capabilities of the sim.py script are as follows:

$ ./sim.py --help
usage: sim.py [-h] [--replica REPLICA] [--silence] [--config_directory CONFIG_DIRECTORY] test

positional arguments:
  test                  Path to a test config file, or "all" to run all tests.

optional arguments:
  -h, --help            show this help message and exit
  --replica REPLICA, -r REPLICA
                        Fully qualified path to your replica program (Default: ./3700kvstore)
  --silence, -s         Pipe stdout and stderr of replicas to /dev/null. (Default: False)
  --config_directory CONFIG_DIRECTORY, -c CONFIG_DIRECTORY
                        Path to a directory containing test configs. Only used when running "all" tests. (Default: ./tests/)

By default, the simulator expects your replica program to be named 3700kvstore and for it to be in the same directory as the simulator. This default can be overridden using the –replica command line argument, which we have provided as a convenience to aid you in testing. Note that when we grade, your replica program must be named 3700kvstore and we will not use the –replica option.

To execute a single test in the simulator, run sim.py with a single command line argument: the path to the desired JSON test configuration.

$ ./sim.py <path-to-test>

As a convenience, if you want the simulator to run all test cases the last argument may be replaced with “all” (no quotes). In this case, the simulator will assume that the test configuration files are available in a folder named ./tests/, although this default may be overridden using the option command line parameter –config_directory.

$ ./sim.py all

The simulator has one additional, optional command line parameters. The simulator executes copies of your replica during each test. If you want to suppress the console output of your replicas, i.e., what they print to STDOUT and STDERR, pass the –silence option to the simulator.

Config File Format

The configuration file(s) that you pass to ./sim.py contains a number of parameters that control the simulation. We have provided a number of test configurations in the ./tests/ folder. Your replicas do not need to read, modify, or parse these configurations; all configuration is handled by the simulator.

Each configuration file is formatted in JSON and has the following elements

  • lifetime (Required) The number of seconds the simulation should run for. Must be at least 5.

  • replicas (Required) The number of replicas to execute, i.e., copies of your 3700kvstore program. Must be at least 3.

  • requests (Required) The number of get() and put() requests to randomly generate from clients.

  • mix (Optional) Float between 0 and 1 representing the fraction of client queries that are get()s. Defaults to 0.8.

  • wait (Optional) The number of seconds to wait before sending any client requests. Defaults to 2 seconds.

  • end_wait (Optional) The number of seconds to wait at the end of the simulation before measuring performance. Defaults to 2 seconds.

  • seed (Optional) The random seed to choose. If not specified, a random value is chosen. Setting this value will allow for a semi-reproducible set of clients and requests.

  • drops (Optional) Float between 0 and 1 representing the fraction of messages between replicas to drop. Defaults to 0.

  • events (Optional) A list of events that will occur during the simulation. Each event has a type and a time when it will trigger:

    • type (Required) The type of event. Valid types are:

      • kill_non_leader: will crash fail a random non-leader replica
      • kill_leader: will crash fail the current leader.
      • part_easy: partition the network, such that the leader has a quorum
      • part_hard: partition the network, such that the leader does not have a quorum
      • part_end: remove all network partitions
    • time (Required) The timestamp, in seconds, when the event should occur.

  • tests (Required) Information about how to test the replicas for performance and correctness:

    • maximum_get_fail_fraction (Optional) Float between 0 and 1 specifying the maximum fraction of get() requests that may fail. Defaults to 0.5.

    • maximum_put_fail_fraction (Optional) Float between 0 and 1 specifying the maximum fraction of put() requests that may fail. Defaults to 0.5.

    • maximum_get_generation_fail_fraction (Optional) Float between 0 and 1 specifying the maximum fraction of get() requests that may fail to be generated (due to insufficient put() requests succeeding). Defaults to 0.1.

    • maximum_appends_batched_fraction (Optional) Float between 0 and 1 specifying the maximum fraction of append messages that may be batched together. Defaults to 0.5.

    • benchmarks (Required) Sets thresholds across four categories for assessing the performance of the replicas. In each category, three thresholds are specified, which separate the extra credit, full credit, partial credit, and no credit performance ranges:

      • total_msgs: how many total messages were sent between the replicas (lower is better)?
      • failures: how many fail messages were sent to clients AND how many client queries were unanswered (lower is better)?
      • duplicates: how many duplicate responses were returned to clients (e.g., the same get() or put() was answered more than once) (lower is better)?
      • median_latency: median latency of answering client requests (lower is better)

For example, a simple configuration with no events and a read-heavy workload might look like the following

{
  "lifetime": 30,
  "replicas": 5,
  "requests": 500,
  "mix": 0.9,
  "tests" : {
    "benchmarks" : {
      "total_msgs"     : [1200, 3000, 5000],
      "failures"       : [0, 1, 2],
      "duplicates"     : [0, 2, 5],
      "median_latency" : [0.0004, 0.002, 0.05]
    }
  }
}

and a more complex configuration with events and a lossy network might be

{
  "lifetime": 30,
  "replicas": 5,
  "requests": 300,
  "mix" : 0.2,
  "drops" : 0.15,
  "end_wait" : 5,
  "events" : [{"type": "kill_leader", "time": 8},
              {"type": "kill_leader", "time": 16}],
  "tests" : {
    "benchmarks" : {
      "total_msgs"     : [1000, 3000, 4000],
      "failures"       : [1, 10, 100],
      "duplicates"     : [0, 2, 10],
      "median_latency" : [0.00015, 0.005, 0.05]
    }
  }
}

./sim.py Output, Single Test Configuration

The ./sim.py script will output any errors it encounters during the simulation, including malformed messages, messages to unknown destinations, replicas that unexpectedly quit, etc. Once the simulation completes, ./sim.py prints (1) some statistics about your datastore’s performance and behavior, (2) whether your datastore passed the correctness checks, and if not, why not, (3) how your datastore faired on the performance benchmarks. Note that performance is assessed only if the datastore passes the correctness checks.

Here is an example of the simulator’s output when a datastore fails the correctness checks:

$ ./sim.py config.json
...
# Simulation Finished

## Useful Information and Statistics
Leaders: FFFF 0001 FFFF 0003
Replicas that died/were killed: 0/2
Total messages sent: 6370
Total messages dropped: 183
Total client get()/put() requests: 60/40
Total duplicate responses: 3
Total unanswered get()/put() requests: 33/3
Total redirects: 19
Total get()/put() failures: 15/31
Total get() with incorrect response: 7

## Correctness Checks
Error: >0 incorrect responses to get()
Error: insufficient get() requests answered (33 > 60 * 0.50)

## Correctness Checks Failed, Skipping Performance Tests

Ideally, you would like all get() and put() requests to succeed without failing and for them to have low latency. Obviously, if your system is returning incorrect values to get() requests then your datastore has consistency issues. Furthermore, you would like the total number of packets to be as low as possible, i.e., the overhead of your datastore on the network should be low.

Here is another example when the correctness checks pass; notice the performance results are now printed:

$ ./sim.py config.json
...
# Simulation Finished
        
## Useful Information and Statistics
Leaders: FFFF 0001 FFFF 0003
Replicas that died/were killed: 0/2
Total messages sent: 6370
Total messages dropped: 183
Total client get()/put() requests: 60/40
Total duplicate responses: 3
Total unanswered get()/put() requests: 0/3
Total redirects: 19
Total get()/put() failures: 15/31
Total get() with incorrect response: 0
        
## Correctness Checks
All correctness tests passed

## Performance Tests
## <test metric>: <your score> <benchmark score>, <test result>
Total Messages Between Replicas: 6370 >= 1000, Failed
Total Failures and Unanswered Requests: 49 < 60, Passed
Duplicate Responses to Clients: 3 < 4, Partial credit, needs improvement
Median Response Latency to Clients: 0.0001 < 0.0002, Bonus!

In this case, the performance results of the datastore are mixed. This implementation has extremely low median latency and is earning bonus point, and the number of failures/unanswered requests is acceptable, but the datastore could be improved by sending fewer duplicate requests and many fewer messages overall.

./sim.py Output, All Test Configurations

Additionally, ./sim.py can be run in “all” mode, which tests your replica with all available test cases. “all” mode is equivalent to what we will use when we grade your project submission. If your replica fails when the simulator is run in “all” mode, you can be assured that your replica will fare poorly when run under the grading script. To run the simulator in “all” mode, simply execute:

$ ./sim.py all
Basic tests (5 replicas, 30 seconds, 100 requests):
No drops, no failures, 80% read		[PASS]     Performance Tiers: 3 1 2 0
No drops, no failures, 60% read		[PASS]     Performance Tiers: 2 1 2 0
No drops, no failures, 40% read		[PASS]     Performance Tiers: 2 1 1 0
No drops, no failures, 20% read		[PASS]     Performance Tiers: 3 2 2 1
Unreliable network tests (5 replicas, 30 seconds, 150 requests):
10% drops, no failures, 80% read	[FAIL]
...

This will run your replica on a number of test configurations, and will output whether your program performs sufficiently in each case. Note that the performance information is only printed if your replica passes the correctness checks for a given test. The performance tier numbers correspond to Bonus (0), Passed (1), Needs Improvement (2), and Failed (3) with respect to total messages, failures/unanswered, duplicates, and median latency, respectively.

Message Format

To simplify this project, instead of using real packet formats, we will be sending our data across the wire in JSON (many languages have utilities to encode and decode JSON, and you are welcome to use these libraries). All messages must be encoded as a dictionary, they must be terminated with a trailing newline (\n) character), and they must include the following four keys (at a minimum):

  • src - The ID of the source of the message.
  • dst - The ID of the destination of the message.
  • leader - The ID of the leader, or “FFFF” if the leader’s ID is unknown.
  • type - The type of the message.

The simulator uses src and dst instead of IP addresses in order to route and deliver messages. Furthermore, the simulator supports multicast: if dst is set to “FFFF”, the message will be delivered to all replicas (use multicast sparingly, since it is expensive). leader is the ID of the replica that the sender of the message believes is the leader. All messages must include the leader so that the simulator can learn which replica is the leader (otherwise, the simulator would have no way of determining this information).

type describes the type of the message. You may define custom types in order to implement your datastore (and you may add custom keys to the message dictionary in these cases). However, there are several message types that your replicas must support in order to handle requests from clients.

  • get - get() messages are read requests from clients. They have the following format:

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "get", "MID": "<a unique string>",
    "key": "<some key>"}\n
    

    Your replicas may respond with an OK message which include the corresponding value:

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "ok", "MID": "<a unique string>",
    "value": "<value of the key>"}\n
    

    Or your replicas may respond with a failure message, in which case the client will retry the get():

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "fail", "MID": "<a unique string>"}\n
    

    If the client issues a get() for a key that has does not exist (i.e., it was never put()), your datastore should return an empty value (i.e., an empty string).

  • put - put() messages are write requests from clients. They have the following format:

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "put", "MID": "<a unique string>",
    "key": "<some key>", "value": "<value of the key>"}\n
    

    Your replicas may respond with an OK message if the write was successful:

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "ok", "MID": "<a unique string>"}\n
    

    Or your replicas may respond with a failure message, in which case the client will retry the put():

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "fail", "MID": "<a unique string>"}\n
    
  • redirect - If the client sends any message (get() or put()) to a replica that is not the leader, it should respond with a redirect:

    {"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "redirect", "MID": "<a unique string>"}\n
    

    In this case, the client will retry the request by sending it to the specified leader.

Note that in all of the above cases, the MID in a request must match the MID in the response. For example, the following would be a legal series of requests and responses, where 001A is a client and 0000 and 0001 are replicas:

Request 1     {"src": "001A", "dst": "0001", "leader": "FFFF",
               "type": "get", "MID": "4D61ACF83027", "key": "name"}\n
Response 1    {"src": "0001", "dst": "001A", "leader": "0000",
               "type": "redirect", "MID": "4D61ACF83027"}\n

Request 2     {"src": "001A", "dst": "0000", "leader": "0000",
               "type": "get", "MID": "9AB4CE50023", "key": "name"}\n
Response 2    {"src": "0000", "dst": "001A", "leader": "0000", "type": "ok",
               "MID": "9AB4CE50023", "value": "Christo Wilson"}\n

Again, you will need to develop additional, custom message types in order to implement the Raft consensus protocol. As long as your messages include the four minimum required fields (src, dst, leader, type) and end with a newline, the simulator will ensure that your messages are delivered.

Command Line Specification

The command line syntax for your 3700kvstore program is given below. The simulator will pass parameters to each replica representing (1) the ID of the replica, and (2) the IDs of all other replicas in the system. The syntax for launching your datastore is therefore:

./3700kvstore <your ID> <ID of second replica> [ID3 [ID4 ...]]

For simplicity, all replica IDs are unique four-digit hexadecimal numbers (e.g., 0AA1 or F29A). You will use these IDs as the src and dst in your messages. Clients will also be assigned unique IDs by the simulator.

Connecting to the LAN

We will be using UNIX domain sockets to emulate a LAN. Each of your replicas will connect to a single domain socket (the way a server would connect to a single Ethernet cable). A replica will send and receive all messages over this socket (i.e., messages to/from other replicas, as well as messages to/from clients). Your program should be constantly reading from the socket make sure it receives all messages (they will be buffered if you don’t read immediately). The simulator will take care of routing all sent messages to the appropriate destinations; thus, it’s okay if you’re not intimately familiar with how Domain Sockets work, or with how the simulator works.

Each replica should connect to a Domain Socket named “ID” (no-quotes), where ID is the replica’s ID (i.e., the first ID it receives on the command line). We will be using the SOCK_STREAM socket type, which provides a reliable, TCP-like stream.

Handling SOCK_STREAM sockets

When your program receives data from a SOCK_STREAM socket, there is no guarantee that one complete message will be returned. Rather, you might receive a partial (incomplete) message, or multiple complete messages, or anything in between. This is why we mandate that all messages in the project be terminated with a newline (\n) character: so that your program will know once a complete message (or messages) have been read from the socket. It is your responsibility to make sure that your replica handles the socket properly. The starter code already includes functionality that handles partial or multiple messages from the socket.


Exactly how to connect to a UNIX domain socket depends on your programming language. For example, if you were using Perl to complete the project, your code for connecting would look like:

use IO::Socket::UNIX;
my $lan = IO::Socket::UNIX->new(
  Type => SOCK_STREAM,
  Peer => "<lan>"
);

You can then read and write from the $lan variable. In python, your code would look like

from socket import socket, SOCK_STREAM, AF_UNIX
s = socket (AF_UNIX, SOCK_STREAM)
s.connect ('<lan>')

with similar results.

We encourage you to write your code in an event-driven style using select() or poll(). This will keep your code single-threaded and will make debugging your code significantly easier. Alternatively, you can implement your datastore in a threaded or asynchronous model, but expect it to be significantly more difficult to debug.

Datastore Requirements and Assumptions

The goal of your system is to accept put()s from clients and retrieve the corresponding data when a get() is issued. To ensure that data is not lost when a process crashes, all data from clients must be replicated, which then raises the dueling issues of how to maintain consistency and achieve high-availability. To meet these goals, your datastore will implement the Raft consensus protocol. Ultimately, your datastore should achieve the following two goals:

  1. Consistency - clients should always receive correct answers to get() requests.
  2. Availability - clients should be able to execute put() and get() requests at any time with low latency (i.e., your system should execute requests quickly).

Raft is a complicated protocol, and real-world datastores are extremely complicated artifacts. To simplify this project, there are several things you do not need to implement:

  • True persistence - you do not need to write client updates to disk, or worry about committing data to permanent storage. All of the data from clients and the log of updates can live in memory.
  • Garbage collection - Raft maintains a log of all updates. In a real system, this log periodically needs to be garbage collected, since it cannot grow infinitely long. However, your system will not be running for long periods of time, and therefor you do not need to worry about garbage collection.
  • Restarts - in a real system, replicas might fail for a while then come back online, necessitating snapshots and reconciliation. However, you may assume that replicas in the simulator will crash fail, i.e., they will die completely and never return.

Implementing Raft

The Raft paper is specifically designed to be easy to read. To implement the protocol you should definitely start by reading the paper. Additional papers and resources are available on the Raft Github. I suggest the following series of steps to begin working on your datastore implementation:

  1. Add basic support for responding to client get() and put() requests. At this point, you can respond to all requests with a “type”: “fail” message.
  2. Implement the Raft election protocol (section 5.2 of the Raft paper); add the ability to respond to get() and put() requests with “type”: “redirect” messages.
  3. Add a timeout to detect leader failures (i.e., if you don’t hear from the leader in X milliseconds…) and make sure that the new election proceeds correctly.
  4. Implement a basic, empty version of the AppendEntries RPC call that doesn’t replicate any data, but acts as a keepalive message from the leader to other replicas to prevent unnecessary elections.
  5. Implement the transaction log and the “state machine” (i.e., a dictionary containing the key/value pairs from clients, Section 5.3). Don’t bother replicating the transactions, just ensure that the leader is able to correctly answer get() and put() requests.
  6. Improve your AppendEntries RPC call to actually send data to replicas. Ensure that updates are only committing when a quorum is in agreement.
  7. Add support for retrying failed commits and test it by experimenting with lossy network simulations.
  8. If you haven’t already, modify the leader election to support the additional restrictions in Section 5.4.1; test your implementation on lossy networks with failed leaders.
  9. Implement the subtle commit restriction given in Section 5.4.2.
  10. Improve your AppendEntries RPC call to implement batching, i.e., a single AppendEntries may send multiple outstanding log entries to a given replica.
  11. Test, test, test, and test some more ;)

Step 6 will probably require the most time in terms of writing code and debugging, since it is the crux of the algorithm. Implementing steps 7-10 are necessary to ensure correctness of the protocol, but shouldn’t be too difficult.

Project Submission

Submitting Your Milestone

This is a very challenging project. To make sure that students start early, we require that students turn in a milestone. To complete the milestone, you must turn in a 3700kvstore program that is able to pass three of the test cases: simple-1, simple-2, and crash-1. The milestone is worth 1% of your final grade.

To submit the milestone, follow the turn-in instructions below, but submit to the Project 6 Milestone in Gradescope.

Submitting Your Final Project

To turn-in your project, you should submit your (thoroughly documented) code along with two other files:

  • A Makefile that compiles your code. Your Makefile may be blank, but it must exist.
  • A plain-text (no Word or PDF) README.md file. In this file, you should briefly describe your high-level approach, any challenges you faced, and an overview of how you tested your code.

Your README.md, Makefile, source code, etc. should all be placed in the root of a compressed archive (e.g., a .zip or .tar.gz) and then uploaded to Gradescope. Alternatively, you can check these items in to Github and then instruct Gradescope to clone your Github repository.

Double Checking Your Submission

To try and make sure that your submission is (1) complete and (2) will work with our grading scripts, we provide a simple script that checks the formatting of your submission. You can download the script here and invoke it using the following command:

$ ./raft_fmt_chk.py [path to your project directory]

Note that you may need to chmod +x raft_fmt_chk.py to make the script executable.

This script will attempt to make sure that the correct files (e.g., README.md and Makefile) are available in the given directory, that your Makefile will run without errors (or is empty), and that after running the Makefile a program named 3700kvstore exists in the directory. The script will also try to determine if your files use Windows-style line endings (\r\n) as opposed to Unix-style line endings (\n). If your files are Windows-encoded, you should convert them to Unix-encoding using the dos2unix utility before turning in.

Grading

This project is worth 15% of your final grade in total. 1% comes from the milestone and 14% comes from the rest of the project. The final grading in this project will consist of

  • 70% Program correctness, based on passing the correctness checks of each test case
  • 30% Performance, based on the benchmark tiers in each test case

At a minimum, your code must pass the test suite without errors or crashes, and it must obey the requirements specified above. All student code will be scanned by plagiarism detection software to ensure that students are not copying code from the internet or each other.