MIT6.824: MapReduce
Contents
This is my record for finish MIT 6.824(spring 2022) first lab MapReduce.
Sequence Version
Before we dive into how to solve this problem, we first need to understand the sequence version. First, we look at how Map operates.
1 | intermediate := []mr.KeyValue{} |
The idea for above code is simple. It just calls the user-provided Map function to generate the key-value pair struct mr.KeyValue and put the results into intermediate.
Next we need to partitioning the intermediate to make it into groups. Actually, we can just sort the results. This is also what the code does.
1 | sort.Sort(ByKey(intermediate)) |
Now next we need to call user-provided Reduce function. This is also easy. We have already sorted the intermediate. Now we just handle the groups one by one.
1 | i := 0 |
Need Analysis
Coordinator
For MapReduce, there should be a coordinator which allocates the tasks. And the coordinator should use data structure to hold the status. The coordinator could not record the result file of the Map operation, because we have used some rules to make a pattern match.
Worker
However, for a worker, things would be complicated. In the code, worker could do both Map and Reduce. I don’t think it is a good idea to make them exclusively. So for a worker, it should start two goroutines, one for Map and another for Reduce.
For Map operation, we should achieve the following functionality:
- Divide the intermediate keys into buckets for
nReducereduce tasks wherenReduceis the number of reduce tasks. - The intermediate files name should be
mr-X-Y, whereXis the Map task number, andYis the reduce task number. - Use
encoding/jsonpackage to write the key/value pairs in JSON format to intermediate files.
For Reduce operation, we should achieve the following functionality:
- Use
encoding/jsonpackage to read the key/value paris in JSON format from intermediate files. - Put the output of the Xth reduce task in the file
mr-out-X.
Data Structure Design
There are the following things we need to consider:
We need to use the semantic words to represent the states both for the coordinator and workers. However, golang doesn’t provide
enumtype like C/C++. So I decide to useconstto emulate theenumtype like the following:1
2
3
4
5
6
7
8type WorkingStatus int32
const (
idle WorkingStatus = 0
processing WorkingStatus = 1
failed WorkingStatus = 2
terminated WorkingStatus = 3
)We need to store the tasks we need to handle in the coordinator and also we need to allocate this task for worker. So we need a way to represent the
MapandReducetask.1
2
3
4
5
6
7
8
9type MapTask struct {
ID int // the current map task id
Filename string // the file which the map task is processing
}
type ReduceTask struct {
ID int // the current reduce task id
MapTaskTotal int // the current total map task num for the reduce task
}We need to store the worker information, because the worker may crash. If we do not store this state, we cannot do any crash recovery. Because a worker will handle
MapandReducetask, so we need to also create states to hold task status. I create aWorkerStateto combine theMapWorkerandReduceWorker.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17type MapWorker struct {
status WorkingStatus // status
task MapTask // current task
startTime time.Time // map task start time
}
type ReduceWorker struct {
status WorkingStatus // status
task ReduceTask // current task
startTime time.Time // reduce task start time
}
type WorkerState struct {
id int // Worker identifier
mapWorker MapWorker // the map worker
reduceWorker ReduceWorker // the reduce worker
}
And we could the following data structure for coordinator:
1 | type Coordinator struct { |
RPC Operations
Next, we need to handle the rpc operations where we need to consider the synchronization between workers and coordinator.
Register
When a worker is initialized, it should call Register RPC. The coordinator should lock the mutex and update the worker field and return the id and the reduce bucket to the worker like the following:
1 | type RegisterRequest struct{} |
Request Map Task
When the workers want to request map task, it should call MapRequest RPC. However, when there is no task available allocated to the worker, we should block the process in RPC call. You could see the following code to understand the detail.
1 | type MapTaskRequest struct { |
Finish Map Task
When the workers finish the Map operation, it should call MapFinish to indicate the coordinator that “I have finished this task”. However, there are so many details we need to consider:
- There are some tasks running too much time which we will consider they are failed. If later the coordinator receives the
MapFinishrequest, we should simply indicate the workers that they should be shutdown. - Because the workers may crash, we should never write the real name here. Instead workers should create temp file name. The coordinator should rename this in the RPC call. So we need to add a new field
Inforepresenting the mapping between the temp file name and the original file name.
1 | type MapTaskRequest struct { |
The ReduceRequest and ReduceFinish functions are like MapRequest and MapFinish. I omit the detail here.
Coordinator Operations
There are two operations we need to consider for coordinator, one is that coordinator needs to check the liveness of the each worker for executing crash recovery. I define a goroutine checkLiveness here.
1 | func (c *Coordinator) checkLiveness() { |
Worker Operations
The entrypoint is the Worker function, we will start two goroutine in this function, one is used for handling mapProcess and the other is used for handling reduceProcess.
1 | func Worker(mapf func(string, string) []KeyValue, |
mapProcess goroutine will handle the following things:
- Firstly uses
MapRequestRPCto request one job from coordinator, and the most two important things got from coordinator is theMapTaskstructure. - We need to maintain the mapping between the temp file name and the original filename.
1 | func mapProcess(mapf func(string, string) []KeyValue, id int, nReduce int) { |
reduceProcess is just the same as mapProcess. Actually, from my perspective, there is nothing different.
1 | func reduceProcess(reducef func(string, []string) string, id int, nReduce int) { |