MIT6.824: MapReduce
Contents
This is my record for finish MIT 6.824(spring 2022) first lab MapReduce.
Sequence Version
Before we dive into how to solve this problem, we first need to understand the sequence version. First, we look at how Map
operates.
1 | intermediate := []mr.KeyValue{} |
The idea for above code is simple. It just calls the user-provided Map
function to generate the key-value pair struct mr.KeyValue
and put the results into intermediate
.
Next we need to partitioning the intermediate
to make it into groups. Actually, we can just sort the results. This is also what the code does.
1 | sort.Sort(ByKey(intermediate)) |
Now next we need to call user-provided Reduce
function. This is also easy. We have already sorted the intermediate
. Now we just handle the groups one by one.
1 | i := 0 |
Need Analysis
Coordinator
For MapReduce, there should be a coordinator which allocates the tasks. And the coordinator should use data structure to hold the status. The coordinator could not record the result file of the Map
operation, because we have used some rules to make a pattern match.
Worker
However, for a worker, things would be complicated. In the code, worker could do both Map
and Reduce
. I don’t think it is a good idea to make them exclusively. So for a worker, it should start two goroutines, one for Map
and another for Reduce
.
For Map
operation, we should achieve the following functionality:
- Divide the intermediate keys into buckets for
nReduce
reduce tasks wherenReduce
is the number of reduce tasks. - The intermediate files name should be
mr-X-Y
, whereX
is the Map task number, andY
is the reduce task number. - Use
encoding/json
package to write the key/value pairs in JSON format to intermediate files.
For Reduce
operation, we should achieve the following functionality:
- Use
encoding/json
package to read the key/value paris in JSON format from intermediate files. - Put the output of the Xth reduce task in the file
mr-out-X
.
Data Structure Design
There are the following things we need to consider:
We need to use the semantic words to represent the states both for the coordinator and workers. However, golang doesn’t provide
enum
type like C/C++. So I decide to useconst
to emulate theenum
type like the following:1
2
3
4
5
6
7
8type WorkingStatus int32
const (
idle WorkingStatus = 0
processing WorkingStatus = 1
failed WorkingStatus = 2
terminated WorkingStatus = 3
)We need to store the tasks we need to handle in the coordinator and also we need to allocate this task for worker. So we need a way to represent the
Map
andReduce
task.1
2
3
4
5
6
7
8
9type MapTask struct {
ID int // the current map task id
Filename string // the file which the map task is processing
}
type ReduceTask struct {
ID int // the current reduce task id
MapTaskTotal int // the current total map task num for the reduce task
}We need to store the worker information, because the worker may crash. If we do not store this state, we cannot do any crash recovery. Because a worker will handle
Map
andReduce
task, so we need to also create states to hold task status. I create aWorkerState
to combine theMapWorker
andReduceWorker
.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17type MapWorker struct {
status WorkingStatus // status
task MapTask // current task
startTime time.Time // map task start time
}
type ReduceWorker struct {
status WorkingStatus // status
task ReduceTask // current task
startTime time.Time // reduce task start time
}
type WorkerState struct {
id int // Worker identifier
mapWorker MapWorker // the map worker
reduceWorker ReduceWorker // the reduce worker
}
And we could the following data structure for coordinator:
1 | type Coordinator struct { |
RPC Operations
Next, we need to handle the rpc operations where we need to consider the synchronization between workers and coordinator.
Register
When a worker is initialized, it should call Register
RPC. The coordinator should lock the mutex and update the worker
field and return the id and the reduce bucket to the worker like the following:
1 | type RegisterRequest struct{} |
Request Map Task
When the workers want to request map task, it should call MapRequest
RPC. However, when there is no task available allocated to the worker, we should block the process in RPC call. You could see the following code to understand the detail.
1 | type MapTaskRequest struct { |
Finish Map Task
When the workers finish the Map
operation, it should call MapFinish
to indicate the coordinator that “I have finished this task”. However, there are so many details we need to consider:
- There are some tasks running too much time which we will consider they are failed. If later the coordinator receives the
MapFinish
request, we should simply indicate the workers that they should be shutdown. - Because the workers may crash, we should never write the real name here. Instead workers should create temp file name. The coordinator should rename this in the RPC call. So we need to add a new field
Info
representing the mapping between the temp file name and the original file name.
1 | type MapTaskRequest struct { |
The ReduceRequest
and ReduceFinish
functions are like MapRequest
and MapFinish
. I omit the detail here.
Coordinator Operations
There are two operations we need to consider for coordinator, one is that coordinator needs to check the liveness of the each worker for executing crash recovery. I define a goroutine checkLiveness
here.
1 | func (c *Coordinator) checkLiveness() { |
Worker Operations
The entrypoint is the Worker
function, we will start two goroutine in this function, one is used for handling mapProcess
and the other is used for handling reduceProcess
.
1 | func Worker(mapf func(string, string) []KeyValue, |
mapProcess
goroutine will handle the following things:
- Firstly uses
MapRequestRPC
to request one job from coordinator, and the most two important things got from coordinator is theMapTask
structure. - We need to maintain the mapping between the temp file name and the original filename.
1 | func mapProcess(mapf func(string, string) []KeyValue, id int, nReduce int) { |
reduceProcess
is just the same as mapProcess
. Actually, from my perspective, there is nothing different.
1 | func reduceProcess(reducef func(string, []string) string, id int, nReduce int) { |