diff --git a/src/cell.c b/src/cell.c index cb78ee51c43e6692697fe213b062fccad75087ec..4408938d9fc84dc509d848a1f7655b703f59b390 100644 --- a/src/cell.c +++ b/src/cell.c @@ -52,6 +52,7 @@ #include "gravity.h" #include "hydro.h" #include "hydro_properties.h" +#include "scheduler.h" #include "space.h" #include "timers.h" @@ -901,7 +902,7 @@ int cell_is_drift_needed(struct cell *c, int ti_current) { * * @return 1 If the space needs rebuilding. 0 otherwise. */ -int cell_unskip_tasks(struct cell *c) { +int cell_unskip_tasks(struct cell *c, struct scheduler *s) { /* Un-skip the density tasks involved with this cell. */ for (struct link *l = c->density; l != NULL; l = l->next) { @@ -914,11 +915,13 @@ int cell_unskip_tasks(struct cell *c) { if (t->type == task_type_pair) { if (!(ci->sorted & (1 << t->flags))) { atomic_or(&ci->sorts->flags, (1 << t->flags)); - ci->sorts->skip = 0; + if (atomic_cas(&ci->sorts->skip, 1, 0)) + scheduler_add_active(s, ci->sorts); } if (!(cj->sorted & (1 << t->flags))) { atomic_or(&cj->sorts->flags, (1 << t->flags)); - cj->sorts->skip = 0; + if (atomic_cas(&cj->sorts->skip, 1, 0)) + scheduler_add_active(s, cj->sorts); } } @@ -942,19 +945,22 @@ int cell_unskip_tasks(struct cell *c) { /* Look for the local cell cj's send tasks. */ struct link *l = NULL; for (l = cj->send_xv; l != NULL && l->t->cj->nodeID != ci->nodeID; - l = l->next); + l = l->next) + ; if (l == NULL) error("Missing link to send_xv task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); for (l = cj->send_rho; l != NULL && l->t->cj->nodeID != ci->nodeID; - l = l->next); + l = l->next) + ; if (l == NULL) error("Missing link to send_rho task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); for (l = cj->send_ti; l != NULL && l->t->cj->nodeID != ci->nodeID; - l = l->next); + l = l->next) + ; if (l == NULL) error("Missing link to send_ti task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); } else if (cj->nodeID != engine_rank) { @@ -965,35 +971,48 @@ int cell_unskip_tasks(struct cell *c) { /* Look for the local cell ci's send tasks. */ struct link *l = NULL; for (l = ci->send_xv; l != NULL && l->t->cj->nodeID != cj->nodeID; - l = l->next); + l = l->next) + ; if (l == NULL) error("Missing link to send_xv task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); for (l = ci->send_rho; l != NULL && l->t->cj->nodeID != cj->nodeID; - l = l->next); + l = l->next) + ; if (l == NULL) error("Missing link to send_rho task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); for (l = ci->send_ti; l != NULL && l->t->cj->nodeID != cj->nodeID; - l = l->next); + l = l->next) + ; if (l == NULL) error("Missing link to send_ti task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); } #endif } } /* Unskip all the other task types. */ - for (struct link *l = c->gradient; l != NULL; l = l->next) l->t->skip = 0; - for (struct link *l = c->force; l != NULL; l = l->next) l->t->skip = 0; - for (struct link *l = c->grav; l != NULL; l = l->next) l->t->skip = 0; - if (c->extra_ghost != NULL) c->extra_ghost->skip = 0; - if (c->ghost != NULL) c->ghost->skip = 0; - if (c->init != NULL) c->init->skip = 0; - if (c->kick != NULL) c->kick->skip = 0; - if (c->cooling != NULL) c->cooling->skip = 0; - if (c->sourceterms != NULL) c->sourceterms->skip = 0; - if (c->grav_external != NULL) c->grav_external->skip = 0; + for (struct link *l = c->gradient; l != NULL; l = l->next) + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); + for (struct link *l = c->force; l != NULL; l = l->next) + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); + for (struct link *l = c->grav; l != NULL; l = l->next) + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); + if (c->extra_ghost != NULL && atomic_cas(&c->extra_ghost->skip, 1, 0)) + scheduler_add_active(s, c->extra_ghost); + if (c->ghost != NULL && atomic_cas(&c->ghost->skip, 1, 0)) + scheduler_add_active(s, c->ghost); + if (c->init != NULL && atomic_cas(&c->init->skip, 1, 0)) + scheduler_add_active(s, c->init); + if (c->kick != NULL && atomic_cas(&c->kick->skip, 1, 0)) + scheduler_add_active(s, c->kick); + if (c->cooling != NULL && atomic_cas(&c->cooling->skip, 1, 0)) + scheduler_add_active(s, c->cooling); + if (c->sourceterms != NULL && atomic_cas(&c->sourceterms->skip, 1, 0)) + scheduler_add_active(s, c->sourceterms); + if (c->grav_external != NULL && atomic_cas(&c->grav_external->skip, 1, 0)) + scheduler_add_active(s, c->grav_external); return 0; } diff --git a/src/cell.h b/src/cell.h index 289a68e5456eb2f6d1917de65170aa590c37e0a4..4b970d587923f53dab909250de2b31da3252d6ae 100644 --- a/src/cell.h +++ b/src/cell.h @@ -38,6 +38,7 @@ /* Avoid cyclic inclusions */ struct space; +struct scheduler; /* Max tag size set to 2^29 to take into account some MPI implementations * that use 2^31 as the upper bound on MPI tags and the fact that @@ -240,6 +241,6 @@ int cell_are_neighbours(const struct cell *restrict ci, void cell_check_multipole(struct cell *c, void *data); void cell_clean(struct cell *c); int cell_is_drift_needed(struct cell *c, int ti_current); -int cell_unskip_tasks(struct cell *c); +int cell_unskip_tasks(struct cell *c, struct scheduler *s); #endif /* SWIFT_CELL_H */ diff --git a/src/engine.c b/src/engine.c index fadaffd32c3eaf0a4165ad47cf5e2cc99e6059db..a372312a6430828cda79d9bbe493c3df2e729b50 100644 --- a/src/engine.c +++ b/src/engine.c @@ -1381,6 +1381,7 @@ void engine_count_and_link_tasks(struct engine *e) { if (t->type == task_type_sort && t->ci->split) for (int j = 0; j < 8; j++) if (t->ci->progeny[j] != NULL && t->ci->progeny[j]->sorts != NULL) { + t->ci->progeny[j]->sorts->skip = 0; scheduler_addunlock(sched, t->ci->progeny[j]->sorts, t); } @@ -2004,8 +2005,9 @@ void engine_marktasks_mapper(void *map_data, int num_elements, void *extra_data) { /* Unpack the arguments. */ struct task *tasks = (struct task *)map_data; - const int ti_end = ((int *)extra_data)[0]; - int *rebuild_space = &((int *)extra_data)[1]; + const int ti_end = ((size_t *)extra_data)[0]; + size_t *rebuild_space = &((size_t *)extra_data)[1]; + struct scheduler *s = (struct scheduler *)(((size_t *)extra_data)[2]); for (int ind = 0; ind < num_elements; ind++) { struct task *t = &tasks[ind]; @@ -2017,7 +2019,10 @@ void engine_marktasks_mapper(void *map_data, int num_elements, t->type == task_type_sourceterms || t->type == task_type_sub_self) { /* Set this task's skip. */ - t->skip = (t->ci->ti_end_min > ti_end); + if ((t->skip = (t->ci->ti_end_min > ti_end))) + continue; + else + scheduler_add_active(s, t); } /* Pair? */ @@ -2037,16 +2042,22 @@ void engine_marktasks_mapper(void *map_data, int num_elements, /* Set this task's skip. */ if ((t->skip = (ci->ti_end_min > ti_end && cj->ti_end_min > ti_end)) == 1) continue; + else scheduler_add_active(s, t); + + /* If this is not a density task, we don't have to do any of the below. */ + if (t->subtype != task_subtype_density) continue; /* Set the sort flags. */ - if (t->type == task_type_pair && t->subtype != task_subtype_grav) { + if (t->type == task_type_pair) { if (!(ci->sorted & (1 << t->flags))) { atomic_or(&ci->sorts->flags, (1 << t->flags)); - ci->sorts->skip = 0; + if (atomic_cas(&ci->sorts->skip, 1, 0)) + scheduler_add_active(s, ci->sorts); } if (!(cj->sorted & (1 << t->flags))) { atomic_or(&cj->sorts->flags, (1 << t->flags)); - cj->sorts->skip = 0; + if (atomic_cas(&cj->sorts->skip, 1, 0)) + scheduler_add_active(s, cj->sorts); } } @@ -2066,19 +2077,19 @@ void engine_marktasks_mapper(void *map_data, int num_elements, l = l->next) ; if (l == NULL) error("Missing link to send_xv task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); for (l = cj->send_rho; l != NULL && l->t->cj->nodeID != ci->nodeID; l = l->next) ; if (l == NULL) error("Missing link to send_rho task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); for (l = cj->send_ti; l != NULL && l->t->cj->nodeID != ci->nodeID; l = l->next) ; if (l == NULL) error("Missing link to send_ti task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); } else if (cj->nodeID != engine_rank) { @@ -2092,19 +2103,19 @@ void engine_marktasks_mapper(void *map_data, int num_elements, l = l->next) ; if (l == NULL) error("Missing link to send_xv task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); for (l = ci->send_rho; l != NULL && l->t->cj->nodeID != cj->nodeID; l = l->next) ; if (l == NULL) error("Missing link to send_rho task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); for (l = ci->send_ti; l != NULL && l->t->cj->nodeID != cj->nodeID; l = l->next) ; if (l == NULL) error("Missing link to send_ti task."); - l->t->skip = 0; + if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t); } #endif @@ -2112,20 +2123,22 @@ void engine_marktasks_mapper(void *map_data, int num_elements, /* Kick? */ else if (t->type == task_type_kick) { - t->skip = (t->ci->ti_end_min > ti_end); t->ci->updated = 0; t->ci->g_updated = 0; + if ((t->skip = (t->ci->ti_end_min > ti_end))) + continue; + else + scheduler_add_active(s, t); } /* Init? */ else if (t->type == task_type_init) { /* Set this task's skip. */ - t->skip = (t->ci->ti_end_min > ti_end); + if ((t->skip = (t->ci->ti_end_min > ti_end))) + continue; + else + scheduler_add_active(s, t); } - - /* None? */ - else if (t->type == task_type_none) - t->skip = 1; } } @@ -2152,7 +2165,8 @@ int engine_marktasks(struct engine *e) { } else { /* Run through the tasks and mark as skip or not. */ - int extra_data[2] = {e->ti_current, rebuild_space}; + size_t extra_data[3] = {e->ti_current, rebuild_space, (size_t)&e->sched}; + scheduler_clear_active(&e->sched); threadpool_map(&e->threadpool, engine_marktasks_mapper, s->tasks, s->nr_tasks, sizeof(struct task), 10000, extra_data); rebuild_space = extra_data[1]; @@ -2634,6 +2648,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs) { } /* Now, launch the calculation */ + engine_print_task_counts(e); TIMER_TIC; engine_launch(e, e->nr_threads, mask, submask); TIMER_TOC(timer_runners); diff --git a/src/runner.c b/src/runner.c index bef07aadcbee64aa9a93dc1c62027d866c666575..7294d0e1c862af4f31c0ef32b5f936df56e9df70 100644 --- a/src/runner.c +++ b/src/runner.c @@ -758,14 +758,13 @@ static void runner_do_drift(struct cell *c, struct engine *e, int drift) { /* Unskip any active tasks. */ if (c->ti_end_min == e->ti_current) { - const int forcerebuild = cell_unskip_tasks(c); + const int forcerebuild = cell_unskip_tasks(c, &e->sched); if (forcerebuild) atomic_inc(&e->forcerebuild); } /* Do we really need to drift? */ if (drift) { - if (!e->drift_all && !cell_is_drift_needed(c, ti_current)) - return; + if (!e->drift_all && !cell_is_drift_needed(c, ti_current)) return; } else { /* Not drifting, but may still need to recurse for task skipping. */ @@ -813,8 +812,8 @@ static void runner_do_drift(struct cell *c, struct engine *e, int drift) { /* Compute (square of) motion since last cell construction */ const float dx2 = gp->x_diff[0] * gp->x_diff[0] + - gp->x_diff[1] * gp->x_diff[1] + - gp->x_diff[2] * gp->x_diff[2]; + gp->x_diff[1] * gp->x_diff[1] + + gp->x_diff[2] * gp->x_diff[2]; dx2_max = (dx2_max > dx2) ? dx2_max : dx2; } @@ -831,8 +830,8 @@ static void runner_do_drift(struct cell *c, struct engine *e, int drift) { /* Compute (square of) motion since last cell construction */ const float dx2 = xp->x_diff[0] * xp->x_diff[0] + - xp->x_diff[1] * xp->x_diff[1] + - xp->x_diff[2] * xp->x_diff[2]; + xp->x_diff[1] * xp->x_diff[1] + + xp->x_diff[2] * xp->x_diff[2]; dx2_max = (dx2_max > dx2) ? dx2_max : dx2; /* Maximal smoothing length */ @@ -841,7 +840,7 @@ static void runner_do_drift(struct cell *c, struct engine *e, int drift) { /* Now collect quantities for statistics */ const float half_dt = - (ti_current - (p->ti_begin + p->ti_end) / 2) * timeBase; + (ti_current - (p->ti_begin + p->ti_end) / 2) * timeBase; const double x[3] = {p->x[0], p->x[1], p->x[2]}; const float v[3] = {xp->v_full[0] + p->a_hydro[0] * half_dt, xp->v_full[1] + p->a_hydro[1] * half_dt, diff --git a/src/scheduler.c b/src/scheduler.c index 9fa2ecf775d0ef307bfb6eeac01f1435995db1a2..fb41c126df9f02b71dbc5bb58c383d48b8193e27 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -42,7 +42,6 @@ #include "atomic.h" #include "const.h" #include "cycle.h" -#include "engine.h" #include "error.h" #include "intrinsics.h" #include "kernel_hydro.h" @@ -51,6 +50,24 @@ #include "task.h" #include "timers.h" +/** + * @brief Add a task to the list of active tasks. + * + * @param s The #scheduler. + * @param t The task to be added. + */ + +void scheduler_add_active(struct scheduler *s, struct task *t) { + int ind = atomic_inc(&s->active_count); + s->tid_active[ind] = t - s->tasks; +} + +/** + * @brief Re-set the list of active tasks. + */ + +void scheduler_clear_active(struct scheduler *s) { s->active_count = 0; } + /** * @brief Add an unlock_task to the given task. * @@ -864,6 +881,7 @@ void scheduler_reset(struct scheduler *s, int size) { /* Free existing task lists if necessary. */ if (s->tasks != NULL) free(s->tasks); if (s->tasks_ind != NULL) free(s->tasks_ind); + if (s->tid_active != NULL) free(s->tid_active); /* Allocate the new lists. */ if (posix_memalign((void *)&s->tasks, task_align, @@ -872,6 +890,9 @@ void scheduler_reset(struct scheduler *s, int size) { if ((s->tasks_ind = (int *)malloc(sizeof(int) * size)) == NULL) error("Failed to allocate task lists."); + + if ((s->tid_active = (int *)malloc(sizeof(int) * size)) == NULL) + error("Failed to allocate aactive task lists."); } /* Reset the counters. */ @@ -883,6 +904,7 @@ void scheduler_reset(struct scheduler *s, int size) { s->submask = 0; s->nr_unlocks = 0; s->completed_unlock_writes = 0; + s->active_count = 0; /* Set the task pointers in the queues. */ for (int k = 0; k < s->nr_queues; k++) s->queues[k].tasks = s->tasks; @@ -892,11 +914,11 @@ void scheduler_reset(struct scheduler *s, int size) { * @brief Compute the task weights * * @param s The #scheduler. - * @param verbose Are we talkative ? + * @param verbose Are we talkative? */ + void scheduler_reweight(struct scheduler *s, int verbose) { - const ticks tic = getticks(); const int nr_tasks = s->nr_tasks; int *tid = s->tasks_ind; struct task *tasks = s->tasks; @@ -905,6 +927,7 @@ void scheduler_reweight(struct scheduler *s, int verbose) { 0.4025, 0.1897, 0.4025, 0.1897, 0.4025, 0.5788, 0.4025, 0.5788}; const float wscale = 0.001; + const ticks tic = getticks(); /* Run through the tasks backwards and set their weights. */ for (int k = nr_tasks - 1; k >= 0; k--) { @@ -1040,7 +1063,7 @@ void scheduler_start(struct scheduler *s, unsigned int mask, s->mask = mask; s->submask = submask | (1 << task_subtype_none); - /* Clear all the waits, rids and times. */ + /* Clear all the waits, rids, and times. */ for (int k = 0; k < s->nr_tasks; k++) { s->tasks[k].wait = 1; s->tasks[k].rid = -1; @@ -1049,6 +1072,7 @@ void scheduler_start(struct scheduler *s, unsigned int mask, } /* Re-wait the tasks. */ + message("sheduler_rewait_mapper..."); threadpool_map(s->threadpool, scheduler_rewait_mapper, s->tasks, s->nr_tasks, sizeof(struct task), 1000, s); @@ -1095,8 +1119,17 @@ void scheduler_start(struct scheduler *s, unsigned int mask, #endif /* Loop over the tasks and enqueue whoever is ready. */ + /* message("sheduler_enqueue_mapper..."); threadpool_map(s->threadpool, scheduler_enqueue_mapper, s->tasks_ind, - s->nr_tasks, sizeof(int), 1000, s); + s->nr_tasks, sizeof(int), 1000, s); */ + for (int k = 0; k < s->active_count; k++) { + struct task *t = &s->tasks[s->tid_active[k]]; + if (atomic_dec(&t->wait) == 1 && !t->skip && ((1 << t->type) & s->mask) && + ((1 << t->subtype) & s->submask)) { + scheduler_enqueue(s, t); + pthread_cond_signal(&s->sleep_cond); + } + } /* To be safe, fire of one last sleep_cond in a safe way. */ pthread_mutex_lock(&s->sleep_mutex); diff --git a/src/scheduler.h b/src/scheduler.h index c4eb5e99447d623e5fb8e442efc1c254c00bfadd..b463747146be6c3bdc9ce366d48aaed9dd7b0fa6 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -83,6 +83,10 @@ struct scheduler { /* The task indices. */ int *tasks_ind; + /* List of initial tasks. */ + int *tid_active; + int active_count; + /* The task unlocks. */ struct task **volatile unlocks; int *volatile unlock_ind; @@ -106,6 +110,8 @@ struct scheduler { }; /* Function prototypes. */ +void scheduler_add_active(struct scheduler *s, struct task *t); +void scheduler_clear_active(struct scheduler *s); void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks, int nr_queues, unsigned int flags, int nodeID, struct threadpool *tp);