Commit 4d022972 authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

get rid of the rewait tasks.

parent 22808bfa
......@@ -2657,6 +2657,9 @@ void engine_init(struct engine *e, struct space *s,
part_create_mpi_types();
#endif
/* Initialize the threadpool. */
threadpool_init(&e->threadpool, e->nr_threads);
/* First of all, init the barrier and lock it. */
if (pthread_mutex_init(&e->barrier_mutex, NULL) != 0)
error("Failed to initialize barrier mutex.");
......@@ -2671,7 +2674,7 @@ void engine_init(struct engine *e, struct space *s,
/* Init the scheduler with enough tasks for the initial sorting tasks. */
const int nr_tasks = 2 * s->tot_cells + 2 * e->nr_threads;
scheduler_init(&e->sched, e->s, nr_tasks, nr_queues, scheduler_flag_steal,
e->nodeID);
e->nodeID, &e->threadpool);
/* Create the sorting tasks. */
for (int i = 0; i < e->nr_threads; i++) {
......@@ -2729,9 +2732,6 @@ void engine_init(struct engine *e, struct space *s,
while (e->barrier_running || e->barrier_launch)
if (pthread_cond_wait(&e->barrier_cond, &e->barrier_mutex) != 0)
error("Error while waiting for runner threads to get in place.");
/* Initialize the threadpool. */
threadpool_init(&e->threadpool, e->nr_threads);
}
/**
......
......@@ -1404,10 +1404,6 @@ void *runner_main(void *data) {
case task_type_split_cell:
space_do_split(e->s, t->ci);
break;
case task_type_rewait:
scheduler_do_rewait((struct task *)t->ci, (struct task *)t->cj,
t->flags, t->rank);
break;
case task_type_comm_root:
break;
default:
......
......@@ -948,6 +948,31 @@ void scheduler_reweight(struct scheduler *s) {
message( "task weights are in [ %i , %i ]." , min , max ); */
}
/**
* @brief #threadpool_map function which runs through the task
* graph and re-computes the task wait counters.
*/
void scheduler_rewait_mapper(void *map_data, void *extra_data) {
struct scheduler *s = (struct scheduler *)extra_data;
struct task *t = (struct task *)map_data;
if (t->skip) return;
/* Skip tasks not in the mask */
if (!((1 << t->type) & s->mask) || !((1 << t->subtype) & s->submask))
return;
/* Skip sort tasks that have already been performed */
if (t->type == task_type_sort && t->flags == 0) return;
/* Sets the waits of the dependances */
for (int k = 0; k < t->nr_unlock_tasks; k++) {
struct task *u = t->unlock_tasks[k];
atomic_inc(&u->wait);
}
}
/**
* @brief Start the scheduler, i.e. fill the queues with ready tasks.
*
......@@ -965,7 +990,7 @@ void scheduler_start(struct scheduler *s, unsigned int mask,
// ticks tic;
/* Store the masks */
s->mask = mask | (1 << task_type_rewait) | (1 << task_type_comm_root);
s->mask = mask | (1 << task_type_comm_root);
s->submask = submask | (1 << task_subtype_none);
/* Clear all the waits and rids. */
......@@ -974,54 +999,12 @@ void scheduler_start(struct scheduler *s, unsigned int mask,
s->tasks[k].wait = 1;
s->tasks[k].rid = -1;
}
// message( "waiting tasks took %.3f %s." ,
// message( "clearing task waits took %.3f %s." ,
// clocks_from_ticks(getticks() - tic), clocks_getunit() );
/* Enqueue a set of extraenous tasks to set the task waits. */
struct task *rewait_tasks = &s->tasks[s->nr_tasks];
const int num_rewait_tasks = s->nr_queues > s->size - s->nr_tasks
? s->size - s->nr_tasks
: s->nr_queues;
/* Remember that engine_launch may fiddle with this value. */
const int waiting_old = s->waiting;
/* We are going to use the task structure in a modified way to pass
information to the task. Don't do this at home !
- ci and cj will give the range of tasks to which the waits will be applied
- the flags will be used to transfer the mask
- the rank will be used to transfer the submask
- the rest is unused.
*/
for (int k = 0; k < num_rewait_tasks; k++) {
rewait_tasks[k].type = task_type_rewait;
rewait_tasks[k].ci = (struct cell *)&s->tasks[k * nr_tasks / s->nr_queues];
rewait_tasks[k].cj =
(struct cell *)&s->tasks[(k + 1) * nr_tasks / s->nr_queues];
rewait_tasks[k].flags = s->mask;
rewait_tasks[k].rank = s->submask;
rewait_tasks[k].skip = 0;
rewait_tasks[k].wait = 0;
rewait_tasks[k].rid = -1;
rewait_tasks[k].weight = 1;
rewait_tasks[k].implicit = 0;
rewait_tasks[k].nr_unlock_tasks = 0;
scheduler_enqueue(s, &rewait_tasks[k]);
pthread_cond_broadcast(&s->sleep_cond);
}
/* Wait for the rewait tasks to have executed. */
pthread_mutex_lock(&s->sleep_mutex);
pthread_cond_broadcast(&s->sleep_cond);
while (s->waiting > waiting_old) {
pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex);
}
pthread_mutex_unlock(&s->sleep_mutex);
/* message("waiting tasks took %.3f %s.",
clocks_from_ticks(getticks() - tic), clocks_getunit());*/
s->mask = mask | (1 << task_type_comm_root);
s->submask = submask | (1 << task_subtype_none);
/* Re-wait the tasks. */
threadpool_map(s->threadpool, scheduler_rewait_mapper, s->tasks, nr_tasks,
sizeof(struct task), s);
/* Loop over the tasks and enqueue whoever is ready. */
// tic = getticks();
......@@ -1317,7 +1300,8 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
*/
void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
int nr_queues, unsigned int flags, int nodeID) {
int nr_queues, unsigned int flags, int nodeID,
struct threadpool *tp) {
/* Init the lock. */
lock_init(&s->lock);
......@@ -1349,6 +1333,7 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
s->flags = flags;
s->space = space;
s->nodeID = nodeID;
s->threadpool = tp;
/* Init the tasks array. */
s->size = 0;
......@@ -1381,31 +1366,3 @@ void scheduler_print_tasks(const struct scheduler *s, const char *fileName) {
fclose(file);
}
/**
* @brief Sets the waits of the dependants of a range of task
*
* @param t_begin Beginning of the #task range
* @param t_end End of the #task range
* @param mask The scheduler task mask
* @param submask The scheduler subtask mask
*/
void scheduler_do_rewait(struct task *t_begin, struct task *t_end,
unsigned int mask, unsigned int submask) {
for (struct task *t2 = t_begin; t2 != t_end; t2++) {
if (t2->skip) continue;
/* Skip tasks not in the mask */
if (!((1 << t2->type) & mask) || !((1 << t2->subtype) & submask)) continue;
/* Skip sort tasks that have already been performed */
if (t2->type == task_type_sort && t2->flags == 0) continue;
/* Sets the waits of the dependances */
for (int k = 0; k < t2->nr_unlock_tasks; k++) {
struct task *t3 = t2->unlock_tasks[k];
atomic_inc(&t3->wait);
}
}
}
......@@ -37,6 +37,7 @@
#include "queue.h"
#include "space.h"
#include "task.h"
#include "threadpool.h"
/* Some constants. */
#define scheduler_maxwait 3
......@@ -97,6 +98,9 @@ struct scheduler {
/* The space associated with this scheduler. */
struct space *space;
/* Threadpool to use internally for mundane parallel work. */
struct threadpool *threadpool;
/* The node we are working on. */
int nodeID;
......@@ -104,7 +108,8 @@ struct scheduler {
/* Function prototypes. */
void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
int nr_queues, unsigned int flags, int nodeID);
int nr_queues, unsigned int flags, int nodeID,
struct threadpool *tp);
struct task *scheduler_gettask(struct scheduler *s, int qid,
const struct task *prev);
void scheduler_enqueue(struct scheduler *s, struct task *t);
......@@ -123,7 +128,5 @@ void scheduler_addunlock(struct scheduler *s, struct task *ta, struct task *tb);
void scheduler_set_unlocks(struct scheduler *s);
void scheduler_dump_queue(struct scheduler *s);
void scheduler_print_tasks(const struct scheduler *s, const char *fileName);
void scheduler_do_rewait(struct task *t_begin, struct task *t_end,
unsigned int mask, unsigned int submask);
#endif /* SWIFT_SCHEDULER_H */
......@@ -50,8 +50,7 @@ const char *taskID_names[task_type_count] = {
"none", "sort", "self", "pair", "sub",
"init", "ghost", "drift", "kick", "send",
"recv", "grav_pp", "grav_mm", "grav_up", "grav_down",
"grav_external", "part_sort", "gpart_sort", "split_cell", "rewait",
"comm_root"};
"grav_external", "part_sort", "gpart_sort", "split_cell", "comm_root "};
const char *subtaskID_names[task_type_count] = {"none", "density",
"force", "grav"};
......@@ -85,10 +84,9 @@ float task_overlap(const struct task *ta, const struct task *tb) {
use cells. */
if (ta == NULL || tb == NULL || ta->type == task_type_none ||
ta->type == task_type_part_sort || ta->type == task_type_gpart_sort ||
ta->type == task_type_split_cell || ta->type == task_type_rewait ||
tb->type == task_type_none || tb->type == task_type_part_sort ||
tb->type == task_type_gpart_sort || tb->type == task_type_split_cell ||
tb->type == task_type_rewait)
ta->type == task_type_split_cell || tb->type == task_type_none ||
tb->type == task_type_part_sort || tb->type == task_type_gpart_sort ||
tb->type == task_type_split_cell)
return 0.0f;
/* Compute the union of the cell data. */
......
......@@ -52,7 +52,6 @@ enum task_types {
task_type_part_sort,
task_type_gpart_sort,
task_type_split_cell,
task_type_rewait,
task_type_comm_root,
task_type_count
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment