Commit 3be87eb5 authored by Matthieu Schaller's avatar Matthieu Schaller
Browse files

Merge branch 'parallel_scheduler' into 'master'

Parallel scheduler

Make the scheduler itself use tasks to prepare each step. This is kinda black magic, as the scheduler has to use parts of itself to get itself up and running, but it works. 

See merge request !60
parents e13682e0 b5654b2a
......@@ -34,6 +34,7 @@
#include "runner.h"
/* Local headers. */
#include "atomic.h"
#include "const.h"
#include "engine.h"
#include "error.h"
......@@ -1217,12 +1218,12 @@ void *runner_main(void *data) {
t->rid = r->cpuid;
/* Set super to the first cell that I own. */
if (ci != NULL && ci->super != NULL && ci->super->owner == r->qid)
super = ci->super;
else if (cj != NULL && cj->super != NULL && cj->super->owner == r->qid)
super = cj->super;
else
super = NULL;
if (t->type != task_type_rewait && t->type != task_type_psort) {
if (ci->super != NULL && ci->super->owner == r->qid)
super = ci->super;
else if (cj != NULL && cj->super != NULL && cj->super->owner == r->qid)
super = cj->super;
}
/* Different types of tasks... */
switch (t->type) {
......@@ -1296,6 +1297,13 @@ void *runner_main(void *data) {
case task_type_split_cell:
space_split(e->s, t->ci);
break;
case task_type_rewait:
for (struct task *t2 = (struct task *)t->ci;
t2 != (struct task *)t->cj; t2++) {
for (k = 0; k < t2->nr_unlock_tasks; k++)
atomic_inc(&t2->unlock_tasks[k]->wait);
}
break;
default:
error("Unknown task type.");
}
......
......@@ -885,32 +885,56 @@ void scheduler_reweight(struct scheduler *s) {
void scheduler_start(struct scheduler *s, unsigned int mask) {
int k, j, nr_tasks = s->nr_tasks, *tid = s->tasks_ind;
int nr_tasks = s->nr_tasks, *tid = s->tasks_ind;
struct task *t, *tasks = s->tasks;
// ticks tic;
/* Store the mask */
s->mask = mask;
s->mask = mask | (1 << task_type_rewait);
/* Run through the tasks and set their waits. */
// tic = getticks();
for (k = nr_tasks - 1; k >= 0; k--) {
t = &tasks[tid[k]];
t->wait = 0;
t->rid = -1;
if (!((1 << t->type) & s->mask) || t->skip) continue;
for (j = 0; j < t->nr_unlock_tasks; j++)
atomic_inc(&t->unlock_tasks[j]->wait);
/* Clear all the waits and rids. */
// ticks tic = getticks();
for (int k = 0; k < s->nr_tasks; k++) {
s->tasks[k].wait = 0;
s->tasks[k].rid = -1;
}
// message( "waiting tasks took %.3f ms." , (double)( getticks() - tic ) /
// CPU_TPS * 1000 );
/* Enqueue a set of extraenous tasks to set the task waits. */
struct task *rewait_tasks = &s->tasks[s->nr_tasks];
const int num_rewait_tasks = s->nr_queues > s->size - s->nr_tasks
? s->size - s->nr_tasks
: s->nr_queues;
const int waiting_old =
s->waiting; // Remember that engine_launch may fiddle with this value.
for (int k = 0; k < num_rewait_tasks; k++) {
rewait_tasks[k].type = task_type_rewait;
rewait_tasks[k].ci = (struct cell *)&s->tasks[k * nr_tasks / s->nr_queues];
rewait_tasks[k].cj =
(struct cell *)&s->tasks[(k + 1) * nr_tasks / s->nr_queues];
rewait_tasks[k].skip = 0;
rewait_tasks[k].wait = 0;
rewait_tasks[k].rid = -1;
rewait_tasks[k].weight = 1;
rewait_tasks[k].implicit = 0;
rewait_tasks[k].nr_unlock_tasks = 0;
scheduler_enqueue(s, &rewait_tasks[k]);
pthread_cond_broadcast(&s->sleep_cond);
}
/* Wait for the rewait tasks to have executed. */
pthread_mutex_lock(&s->sleep_mutex);
while (s->waiting > waiting_old) {
pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex);
}
pthread_mutex_unlock(&s->sleep_mutex);
/* message("waiting tasks took %.3f ms.",
(double)(getticks() - tic) / CPU_TPS * 1000); */
/* Don't enqueue link tasks directly. */
s->mask &= ~(1 << task_type_link);
/* Loop over the tasks and enqueue whoever is ready. */
// tic = getticks();
for (k = 0; k < nr_tasks; k++) {
for (int k = 0; k < nr_tasks; k++) {
t = &tasks[tid[k]];
if (((1 << t->type) & s->mask) && !t->skip) {
if (t->wait == 0) {
......@@ -940,8 +964,9 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
/* Ignore skipped tasks and tasks not in the mask. */
if (t->skip || ((1 << t->type) & ~(s->mask) && t->type != task_type_link) ||
atomic_cas(&t->rid, -1, 0) != -1)
atomic_cas(&t->rid, -1, 0) != -1) {
return;
}
/* If this is an implicit task, just pretend it's done. */
if (t->implicit) {
......
......@@ -46,7 +46,7 @@ const char *taskID_names[task_type_count] = {
"none", "sort", "self", "pair", "sub",
"ghost", "kick1", "kick2", "send", "recv",
"link", "grav_pp", "grav_mm", "grav_up", "grav_down",
"psort", "split_cell"};
"psort", "split_cell", "rewait"};
/**
* @brief Unlock the cell held by this task.
......
......@@ -46,6 +46,7 @@ enum task_types {
task_type_grav_down,
task_type_psort,
task_type_split_cell,
task_type_rewait,
task_type_count
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment