For ITaskSystem::run(), it executes a bulk task launch of num_total_tasks. Task execution is synchronous with the calling thread, so it will return only when the execution of all tasks is complete.
For ITaskSystem::runAsyncWithDeps, Executes an asynchronous bulk task launch of num_total_tasks, but with a dependency on prior launched tasks.
In part A, we do not consider about the ITaskSystem::runAsyncWithDeps.
Serial Program
We first look at class TaskSystemSerial. The function run is defined as follow.
1 2 3 4 5
voidTaskSystemSerial::run(IRunnable* runnable, int num_total_tasks){ for (int i = 0; i < num_total_tasks; i++) { runnable->runTask(i, num_total_tasks); } }
Step 1
The simplest answer is just to make the run function as the master, and create the threads to do the job. And join the thread at last. (However, I use C++14 for better lambda function).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
voidTaskSystemParallelSpawn::run(IRunnable* runnable, int num_total_tasks){ auto thread_func = [runnable_ = runnable, num = _num_threads, total = num_total_tasks](int i) { while(i < total) { runnable_->runTask(i, total); i += num; } }; std::thread threads[_num_threads]; for (int i = 0; i < _num_threads; ++i) { threads[i] = std::move(std::thread(thread_func, i)); } for (int i = 0; i < _num_threads; ++i) { threads[i].join(); } }
Step 2
Well, it is not so easy to write a thread loop. There are so many details we need to deal with. The most important thing here is do remember join all the threads when the class’s lifetime ends.
It may seem that we need to accept every index to dynamic choose what to do. This is a stupid idea. Remember, we should reduce the size of synchronization. So we use the idea of step 1.
And you could see the following code for details. First we should add some private members.
1 2 3 4 5 6 7 8 9 10 11 12
private: int _num_threads; // to store the threads std::vector<std::thread> threads; // thread poll unsignedint jobs = 0x00; // bitmap value for indicating whether there is a job unsignedint bitmap_init_value = 0x00; // initialized bitmap value with 0x1111 IRunnable* runnable_; // we need to record the runnable std::mutex queue_mutex; // the big lock bool terminate = false; // Whether we should terminate the thread int total_tasks = 0; // we should record the total task voidstart(int num_threads); // start the thread pool voidthreadLoop(int i); // thread functionality boolbusy(); // whether the threads are busy doing their jobs
For constructor, we need to initialize the bitmap_init_value and start the thread pool.
voidTaskSystemParallelThreadPoolSpinning::start(int num_threads){ threads.resize(num_threads); for(int i = 0; i < num_threads; ++i) { threads[i] = std::move(std::thread(&TaskSystemParallelThreadPoolSpinning::threadLoop, this, i)); } }
Now, we come to the most important part. For how to tell whether there is a job for the thread, we use jobs as a bit map. And when the job is finished, we make the corresponding to 0.
When the other calls run, it first initialize jobs to bitmap_init_value. And set the corresponding runnable_ and the number of tasks. And if the jobs becomes 0, all the threads have competed their jobs. Thus, we can return.
1 2 3 4 5 6 7 8 9
voidTaskSystemParallelThreadPoolSpinning::run(IRunnable* runnable, int num_total_tasks){ total_tasks = num_total_tasks; runnable_ = runnable; { std::lock_guard<std::mutex> guard{queue_mutex}; jobs = bitmap_init_value; } while(busy()); }
Do remember join the threads at the destructor:
1 2 3 4 5 6 7 8 9 10 11
TaskSystemParallelThreadPoolSpinning::~TaskSystemParallelThreadPoolSpinning() { /* * Here, we don't need to synchronize the code, because * the thread will never write `terminate`. No matter * the thread may read some corrupted value, this doesn't matter. */ terminate = true; for(int i = 0; i < _num_threads; ++i) { threads[i].join(); } }
Step 3
In the step 2, we have pushed all the threads and run spin, which is inefficient. So we should make them sleep. The idea here is simple. We just use condition variables to achieve that. It is just consumer and producer problem. So I omit detail here.
Conclusion for Part A
I wanna say sometimes spin is better than sleep. Because sleep would cause context switch, which may be inefficient when cpu speed is high.
Part B
For part B, the most interesting thing is how should we solve the dependency.
When the user calls runAsyncWithDeps, it will pass a bunch of task ids. So there is an important question: how can we find an efficient data structure to represent the dependency.
For every task, it will have dependencies, so I use unordered_map<TaskID, unordered_set<Task*>> to represent dependencies for the following several reasons:
We can find the dependencies of a specified task.
Because the dependencies are represented as unordered_set, it is efficient to insert or delete.
Because there are different tasks, I define a helper class Task:
1 2 3 4 5 6 7 8 9 10 11 12
classTask { public: TaskID id; IRunnable* runnable; int processing = 0; int finished = 0; int total_tasks; size_t dependencies; std::mutex task_mutex; Task(TaskID id_, IRunnable* runnable_, int total_tasks_, size_t deps) :id(id_), runnable(runnable_), total_tasks(total_tasks_), dependencies(deps) {} };
As you can see, the processing filed is to indicate the next job for this task we should handle, the finished field is used to indicate how many tasks we have finished. We need to protect the variable, so for each task, there is a mutex.
If a task’s dependencies (Task::dependencies) are not zero, we should not run this task. So I use the following data structures:
vector<Task*> ready: the tasks which are ready, so we can handle.
unordered_set<Task*> blocked: the tasks which should not run at now.
The whole idea is when user calls runAsyncWithDeps, we should update the dependency and just sends the task to blocked. And in the thread loop, we first check whether there is a task in the ready. If so, we random choose one task of the ready to handle, if the task is all finished, we should update the dependency again. When all tasks are finished, we should terminate.
It may sound easy, however the correct implementation is hard.
classTaskSystemParallelThreadPoolSleeping: public ITaskSystem { private: bool terminate = false; // To indicate whether to stop the thread pool int _num_threads = 0; // To indicate how many threads int sleepThreadNum = 0; // The number of thread which is sleeping std::unordered_map<TaskID, Task*> finished {}; // To record the finished task std::vector<Task*> ready {}; // The task is ready to be processed std::unordered_set<Task*> blocked {}; // The task is blocked std::vector<std::thread> threads; std::unordered_map<TaskID, std::unordered_set<Task*>> dependency {}; // The dependency information TaskID id = 0; std::mutex queue_mutex; std::condition_variable consumer; std::condition_variable producer; voidstart(int num_threads); voidthreadLoop(int index); voiddeleteFinishedTask(Task* task); voidmoveBlockTaskToReady(); voidsignalSync(); public: TaskSystemParallelThreadPoolSleeping(int num_threads); ~TaskSystemParallelThreadPoolSleeping(); constchar* name(); voidrun(IRunnable* runnable, int num_total_tasks); TaskID runAsyncWithDeps(IRunnable* runnable, int num_total_tasks, const std::vector<TaskID>& deps); voidsync(); };
/** * For simplicity and easy-handling, we just make the new task to the `blocked`, and record * the dependency information and notify all the producers, and immediately return to the * user for async operation. And also we make the implementation more easily. */ TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks, const std::vector<TaskID>& deps){ Task* task = newTask(id, runnable, num_total_tasks, deps.size()); { std::unique_lock<std::mutex> guard{queue_mutex}; // We just simply add the task to the blocked. blocked.insert(task);
// Record dependency information for later processing for (TaskID dep : deps) { if (dependency.count(dep)) { dependency[dep].insert(task); } else { dependency[dep] = std::unordered_set<Task*>{task}; } } // We should notify the producer to continue processing producer.notify_all(); } return id++; }
The main functionality is in the threadLoop function:
voidTaskSystemParallelThreadPoolSleeping::threadLoop(int id_){ while(true) { int index = -1; Task* task = nullptr; { std::unique_lock<std::mutex> guard{queue_mutex}; if(ready.empty()) { if(!blocked.empty()) { // We should check to move the blocked to the ready. moveBlockTaskToReady(); } // If ready is still empty, we should sleep the thread. if(ready.empty()) { sleepThreadNum++; producer.wait(guard); sleepThreadNum--; } } /* * Here, we must tell whether the ready is empty, * when ready.size() == 0, rand() % 0 will cause * float point exception. It sucks. */ if(!ready.empty()) {index = rand() % ready.size(); // Here, we use random to choose the task for each thread // for simplicity. task = ready[index]; }; } if(terminate) { return; } if(task == nullptr) continue; int processing = -1, finished = -1; { std::unique_lock<std::mutex> guard{task->task_mutex}; processing = task->processing; // There are some situations `processing` will exceed // the total number, because we don't know when the // `deleteFinishedTask` is finished. We may choose the // task which is actually finished (or just only one) if(processing >= task->total_tasks) continue; task->processing++; } if(processing < task->total_tasks) { task->runnable->runTask(processing, task->total_tasks); std::unique_lock<std::mutex> guard{task->task_mutex}; task->finished++; finished = task->finished; } if(finished == task->total_tasks) { std::unique_lock<std::mutex> guard{queue_mutex}; deleteFinishedTask(task); // When we signalSync, there are may be some threads which // are processing useless. So it may just return to the // destructor. So in the destructor we must wait for all // the thread going to sleep. And we call `notify_all` to // make all the threads stop. The design here should be // optimized. However, I don't have enough time... signalSync(); } } }
/** * Move blocked task to the ready when the task's dependency is * all finished. */ voidTaskSystemParallelThreadPoolSleeping::moveBlockTaskToReady(){ std::vector<Task*> moved {}; for(auto task : blocked) { if(task->dependencies == 0) { ready.push_back(task); moved.push_back(task); } } for(auto task: moved) { blocked.erase(task); } }
/** * When all the tasks are finished, which means `ready` and `blocked` * are are empty, we could signal the ONLY ONE consumer. */ voidTaskSystemParallelThreadPoolSleeping::signalSync(){ if(ready.empty() && blocked.empty()) { consumer.notify_one(); } }
// It may seem why here we need to use spin to test the sleepThreadNum // Because of the design, there may be some threads who is not sleeping at // this time, in order to make there is no dead-lock. See the `threadLoop` // for more detail. while(true) { std::unique_lock<std::mutex> guard{queue_mutex}; if(sleepThreadNum == _num_threads) break; }
// We should notify all the threads to return producer.notify_all();
for(int i = 0; i < _num_threads; i++) { threads[i].join(); }
// We should free the memory for(auto task : finished) { delete task.second; } }