Commit 13ff2fb8 authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

use chunked mappers in the threadpool, make sure that the map function...

use chunked mappers in the threadpool, make sure that the map function actually waits for all the threads to have started.
parent 4b872bc1
......@@ -1165,48 +1165,54 @@ void engine_make_hydroloop_tasks(struct engine *e) {
*
* @param e The #engine.
*/
void engine_count_and_link_tasks_mapper(void *map_data, void *extra_data) {
void engine_count_and_link_tasks_mapper(void *map_data, int num_elements,
void *extra_data) {
struct engine *e = (struct engine *)extra_data;
struct task *t = (struct task *)map_data;
struct task *tasks = (struct task *)map_data;
struct scheduler *sched = &e->sched;
if (t->skip) return;
for (int ind = 0; ind < num_elements; ind++) {
/* Link sort tasks together. */
if (t->type == task_type_sort && t->ci->split)
for (int j = 0; j < 8; j++)
if (t->ci->progeny[j] != NULL && t->ci->progeny[j]->sorts != NULL) {
t->ci->progeny[j]->sorts->skip = 0;
scheduler_addunlock(sched, t->ci->progeny[j]->sorts, t);
}
struct task *t = &tasks[ind];
/* Link density tasks to cells. */
if (t->type == task_type_self) {
atomic_inc(&t->ci->nr_tasks);
if (t->subtype == task_subtype_density) {
engine_addlink(e, &t->ci->density, t);
atomic_inc(&t->ci->nr_density);
}
} else if (t->type == task_type_pair) {
atomic_inc(&t->ci->nr_tasks);
atomic_inc(&t->cj->nr_tasks);
if (t->subtype == task_subtype_density) {
engine_addlink(e, &t->ci->density, t);
atomic_inc(&t->ci->nr_density);
engine_addlink(e, &t->cj->density, t);
atomic_inc(&t->cj->nr_density);
}
} else if (t->type == task_type_sub) {
atomic_inc(&t->ci->nr_tasks);
if (t->cj != NULL) atomic_inc(&t->cj->nr_tasks);
if (t->subtype == task_subtype_density) {
engine_addlink(e, &t->ci->density, t);
atomic_inc(&t->ci->nr_density);
if (t->cj != NULL) {
if (t->skip) continue;
/* Link sort tasks together. */
if (t->type == task_type_sort && t->ci->split)
for (int j = 0; j < 8; j++)
if (t->ci->progeny[j] != NULL && t->ci->progeny[j]->sorts != NULL) {
t->ci->progeny[j]->sorts->skip = 0;
scheduler_addunlock(sched, t->ci->progeny[j]->sorts, t);
}
/* Link density tasks to cells. */
if (t->type == task_type_self) {
atomic_inc(&t->ci->nr_tasks);
if (t->subtype == task_subtype_density) {
engine_addlink(e, &t->ci->density, t);
atomic_inc(&t->ci->nr_density);
}
} else if (t->type == task_type_pair) {
atomic_inc(&t->ci->nr_tasks);
atomic_inc(&t->cj->nr_tasks);
if (t->subtype == task_subtype_density) {
engine_addlink(e, &t->ci->density, t);
atomic_inc(&t->ci->nr_density);
engine_addlink(e, &t->cj->density, t);
atomic_inc(&t->cj->nr_density);
}
} else if (t->type == task_type_sub) {
atomic_inc(&t->ci->nr_tasks);
if (t->cj != NULL) atomic_inc(&t->cj->nr_tasks);
if (t->subtype == task_subtype_density) {
engine_addlink(e, &t->ci->density, t);
atomic_inc(&t->ci->nr_density);
if (t->cj != NULL) {
engine_addlink(e, &t->cj->density, t);
atomic_inc(&t->cj->nr_density);
}
}
}
}
}
......@@ -1246,89 +1252,94 @@ static inline void engine_make_hydro_loops_dependencies(struct scheduler *sched,
*
* @param e The #engine.
*/
void engine_make_extra_hydroloop_tasks_mapper(void *map_data,
void engine_make_extra_hydroloop_tasks_mapper(void *map_data, int num_elements,
void *extra_data) {
struct engine *e = (struct engine *)extra_data;
struct scheduler *sched = &e->sched;
const int nodeID = e->nodeID;
struct task *t = (struct task *)map_data;
struct task *tasks = (struct task *)map_data;
/* Skip? */
if (t->skip) return;
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &tasks[ind];
/* Self-interaction? */
if (t->type == task_type_self && t->subtype == task_subtype_density) {
/* Skip? */
if (t->skip) continue;
/* Start by constructing the task for the second hydro loop */
struct task *t2 = scheduler_addtask(
sched, task_type_self, task_subtype_force, 0, 0, t->ci, NULL, 0);
/* Self-interaction? */
if (t->type == task_type_self && t->subtype == task_subtype_density) {
/* Add the link between the new loop and the cell */
engine_addlink(e, &t->ci->force, t2);
atomic_inc(&t->ci->nr_force);
/* Start by constructing the task for the second hydro loop */
struct task *t2 = scheduler_addtask(
sched, task_type_self, task_subtype_force, 0, 0, t->ci, NULL, 0);
/* Now, build all the dependencies for the hydro */
engine_make_hydro_loops_dependencies(sched, t, t2, t->ci);
}
/* Add the link between the new loop and the cell */
engine_addlink(e, &t->ci->force, t2);
atomic_inc(&t->ci->nr_force);
/* Otherwise, pair interaction? */
else if (t->type == task_type_pair && t->subtype == task_subtype_density) {
/* Start by constructing the task for the second hydro loop */
struct task *t2 = scheduler_addtask(
sched, task_type_pair, task_subtype_force, 0, 0, t->ci, t->cj, 0);
/* Add the link between the new loop and both cells */
engine_addlink(e, &t->ci->force, t2);
atomic_inc(&t->ci->nr_force);
engine_addlink(e, &t->cj->force, t2);
atomic_inc(&t->cj->nr_force);
/* Now, build all the dependencies for the hydro for the cells */
/* that are local and are not descendant of the same super-cells */
if (t->ci->nodeID == nodeID) {
/* Now, build all the dependencies for the hydro */
engine_make_hydro_loops_dependencies(sched, t, t2, t->ci);
}
if (t->cj->nodeID == nodeID && t->ci->super != t->cj->super) {
engine_make_hydro_loops_dependencies(sched, t, t2, t->cj);
}
}
/* Otherwise, sub interaction? */
else if (t->type == task_type_sub && t->subtype == task_subtype_density) {
/* Otherwise, pair interaction? */
else if (t->type == task_type_pair && t->subtype == task_subtype_density) {
/* Start by constructing the task for the second hydro loop */
struct task *t2 = scheduler_addtask(
sched, task_type_sub, task_subtype_force, t->flags, 0, t->ci, t->cj, 0);
/* Start by constructing the task for the second hydro loop */
struct task *t2 = scheduler_addtask(
sched, task_type_pair, task_subtype_force, 0, 0, t->ci, t->cj, 0);
/* Add the link between the new loop and both cells */
engine_addlink(e, &t->ci->force, t2);
atomic_inc(&t->ci->nr_force);
if (t->cj != NULL) {
/* Add the link between the new loop and both cells */
engine_addlink(e, &t->ci->force, t2);
atomic_inc(&t->ci->nr_force);
engine_addlink(e, &t->cj->force, t2);
atomic_inc(&t->cj->nr_force);
}
/* Now, build all the dependencies for the hydro for the cells */
/* that are local and are not descendant of the same super-cells */
if (t->ci->nodeID == nodeID) {
engine_make_hydro_loops_dependencies(sched, t, t2, t->ci);
/* Now, build all the dependencies for the hydro for the cells */
/* that are local and are not descendant of the same super-cells */
if (t->ci->nodeID == nodeID) {
engine_make_hydro_loops_dependencies(sched, t, t2, t->ci);
}
if (t->cj->nodeID == nodeID && t->ci->super != t->cj->super) {
engine_make_hydro_loops_dependencies(sched, t, t2, t->cj);
}
}
if (t->cj != NULL && t->cj->nodeID == nodeID &&
t->ci->super != t->cj->super) {
engine_make_hydro_loops_dependencies(sched, t, t2, t->cj);
/* Otherwise, sub interaction? */
else if (t->type == task_type_sub && t->subtype == task_subtype_density) {
/* Start by constructing the task for the second hydro loop */
struct task *t2 =
scheduler_addtask(sched, task_type_sub, task_subtype_force, t->flags,
0, t->ci, t->cj, 0);
/* Add the link between the new loop and both cells */
engine_addlink(e, &t->ci->force, t2);
atomic_inc(&t->ci->nr_force);
if (t->cj != NULL) {
engine_addlink(e, &t->cj->force, t2);
atomic_inc(&t->cj->nr_force);
}
/* Now, build all the dependencies for the hydro for the cells */
/* that are local and are not descendant of the same super-cells */
if (t->ci->nodeID == nodeID) {
engine_make_hydro_loops_dependencies(sched, t, t2, t->ci);
}
if (t->cj != NULL && t->cj->nodeID == nodeID &&
t->ci->super != t->cj->super) {
engine_make_hydro_loops_dependencies(sched, t, t2, t->cj);
}
}
}
/* /\* Kick tasks should rely on the grav_down tasks of their cell. *\/ */
/* else if (t->type == task_type_kick && t->ci->grav_down != NULL) */
/* scheduler_addunlock(sched, t->ci->grav_down, t); */
/* /\* Kick tasks should rely on the grav_down tasks of their cell. *\/ */
/* else if (t->type == task_type_kick && t->ci->grav_down != NULL) */
/* scheduler_addunlock(sched, t->ci->grav_down, t); */
/* External gravity tasks should depend on init and unlock the kick */
else if (t->type == task_type_grav_external) {
scheduler_addunlock(sched, t->ci->init, t);
scheduler_addunlock(sched, t, t->ci->kick);
/* External gravity tasks should depend on init and unlock the kick */
else if (t->type == task_type_grav_external) {
scheduler_addunlock(sched, t->ci->init, t);
scheduler_addunlock(sched, t, t->ci->kick);
}
}
}
......@@ -1445,7 +1456,7 @@ void engine_maketasks(struct engine *e) {
store the density tasks in each cell, and make each sort
depend on the sorts of its progeny. */
threadpool_map(&e->threadpool, engine_count_and_link_tasks_mapper,
sched->tasks, sched->nr_tasks, sizeof(struct task), e);
sched->tasks, sched->nr_tasks, sizeof(struct task), 1000, e);
/* Append hierarchical tasks to each cells */
for (int k = 0; k < nr_cells; k++)
......@@ -1455,7 +1466,7 @@ void engine_maketasks(struct engine *e) {
Each force task depends on the cell ghosts and unlocks the kick task
of its super-cell. */
threadpool_map(&e->threadpool, engine_make_extra_hydroloop_tasks_mapper,
sched->tasks, sched->nr_tasks, sizeof(struct task), e);
sched->tasks, sched->nr_tasks, sizeof(struct task), 1000, e);
/* Add the communication tasks if MPI is being used. */
if (e->policy & engine_policy_mpi) {
......@@ -1507,140 +1518,67 @@ void engine_maketasks(struct engine *e) {
* @return 1 if the space has to be rebuilt, 0 otherwise.
*/
void engine_marktasks_fixdt_mapper(void *map_data, void *extra_data) {
void engine_marktasks_fixdt_mapper(void *map_data, int num_elements,
void *extra_data) {
/* Unpack the arguments. */
struct task *t = (struct task *)map_data;
struct task *tasks = (struct task *)map_data;
int *rebuild_space = (int *)extra_data;
/* Pair? */
if (t->type == task_type_pair ||
(t->type == task_type_sub && t->cj != NULL)) {
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &tasks[ind];
/* Local pointers. */
const struct cell *ci = t->ci;
const struct cell *cj = t->cj;
/* Pair? */
if (t->type == task_type_pair ||
(t->type == task_type_sub && t->cj != NULL)) {
/* Too much particle movement? */
if (t->tight &&
(fmaxf(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin ||
ci->dx_max > space_maxreldx * ci->h_max ||
cj->dx_max > space_maxreldx * cj->h_max))
*rebuild_space = 1;
/* Local pointers. */
const struct cell *ci = t->ci;
const struct cell *cj = t->cj;
}
/* Too much particle movement? */
if (t->tight &&
(fmaxf(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin ||
ci->dx_max > space_maxreldx * ci->h_max ||
cj->dx_max > space_maxreldx * cj->h_max))
*rebuild_space = 1;
/* Sort? */
else if (t->type == task_type_sort) {
}
/* If all the sorts have been done, make this task implicit. */
if (!(t->flags & (t->flags ^ t->ci->sorted))) t->implicit = 1;
/* Sort? */
else if (t->type == task_type_sort) {
/* If all the sorts have been done, make this task implicit. */
if (!(t->flags & (t->flags ^ t->ci->sorted))) t->implicit = 1;
}
}
}
void engine_marktasks_sorts_mapper(void *map_data, void *extra_data) {
void engine_marktasks_sorts_mapper(void *map_data, int num_elements,
void *extra_data) {
/* Unpack the arguments. */
struct task *t = (struct task *)map_data;
if (t->type == task_type_sort) {
t->flags = 0;
t->skip = 1;
struct task *tasks = (struct task *)map_data;
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &tasks[ind];
if (t->type == task_type_sort) {
t->flags = 0;
t->skip = 1;
}
}
}
void engine_marktasks_mapper(void *map_data, void *extra_data) {
void engine_marktasks_mapper(void *map_data, int num_elements,
void *extra_data) {
/* Unpack the arguments. */
struct task *t = (struct task *)map_data;
struct task *tasks = (struct task *)map_data;
const int ti_end = ((int *)extra_data)[0];
int *rebuild_space = &((int *)extra_data)[1];
/* Single-cell task? */
if (t->type == task_type_self || t->type == task_type_ghost ||
(t->type == task_type_sub && t->cj == NULL)) {
/* Set this task's skip. */
t->skip = (t->ci->ti_end_min > ti_end);
}
/* Pair? */
else if (t->type == task_type_pair ||
(t->type == task_type_sub && t->cj != NULL)) {
/* Local pointers. */
const struct cell *ci = t->ci;
const struct cell *cj = t->cj;
/* Set this task's skip. */
t->skip = (ci->ti_end_min > ti_end && cj->ti_end_min > ti_end);
/* Too much particle movement? */
if (t->tight &&
(fmaxf(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin ||
ci->dx_max > space_maxreldx * ci->h_max ||
cj->dx_max > space_maxreldx * cj->h_max))
*rebuild_space = 1;
/* Set the sort flags. */
if (!t->skip && t->type == task_type_pair) {
if (!(ci->sorted & (1 << t->flags))) {
atomic_or(&ci->sorts->flags, (1 << t->flags));
ci->sorts->skip = 0;
}
if (!(cj->sorted & (1 << t->flags))) {
atomic_or(&cj->sorts->flags, (1 << t->flags));
cj->sorts->skip = 0;
}
}
}
/* Kick? */
else if (t->type == task_type_kick) {
t->skip = (t->ci->ti_end_min > ti_end);
t->ci->updated = 0;
t->ci->g_updated = 0;
}
/* Drift? */
else if (t->type == task_type_drift)
t->skip = 0;
/* Init? */
else if (t->type == task_type_init) {
/* Set this task's skip. */
t->skip = (t->ci->ti_end_min > ti_end);
}
/* None? */
else if (t->type == task_type_none)
t->skip = 1;
}
int engine_marktasks_serial(struct engine *e) {
struct scheduler *s = &e->sched;
const int ti_end = e->ti_current;
const int nr_tasks = s->nr_tasks;
const int *const ind = s->tasks_ind;
struct task *tasks = s->tasks;
/* Run through the tasks and mark as skip or not. */
for (int k = 0; k < nr_tasks; k++) {
/* Get a handle on the kth task. */
struct task *t = &tasks[ind[k]];
/* Sort-task? Note that due to the task ranking, the sorts
will all come before the pairs. */
if (t->type == task_type_sort) {
/* Re-set the flags. */
t->flags = 0;
t->skip = 1;
}
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &tasks[ind];
/* Single-cell task? */
else if (t->type == task_type_self || t->type == task_type_ghost ||
(t->type == task_type_sub && t->cj == NULL)) {
if (t->type == task_type_self || t->type == task_type_ghost ||
(t->type == task_type_sub && t->cj == NULL)) {
/* Set this task's skip. */
t->skip = (t->ci->ti_end_min > ti_end);
......@@ -1662,16 +1600,16 @@ int engine_marktasks_serial(struct engine *e) {
(fmaxf(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin ||
ci->dx_max > space_maxreldx * ci->h_max ||
cj->dx_max > space_maxreldx * cj->h_max))
return 1;
*rebuild_space = 1;
/* Set the sort flags. */
if (!t->skip && t->type == task_type_pair) {
if (!(ci->sorted & (1 << t->flags))) {
ci->sorts->flags |= (1 << t->flags);
atomic_or(&ci->sorts->flags, (1 << t->flags));
ci->sorts->skip = 0;
}
if (!(cj->sorted & (1 << t->flags))) {
cj->sorts->flags |= (1 << t->flags);
atomic_or(&cj->sorts->flags, (1 << t->flags));
cj->sorts->skip = 0;
}
}
......@@ -1699,9 +1637,6 @@ int engine_marktasks_serial(struct engine *e) {
else if (t->type == task_type_none)
t->skip = 1;
}
/* All is well... */
return 0;
}
int engine_marktasks(struct engine *e) {
......@@ -1715,20 +1650,19 @@ int engine_marktasks(struct engine *e) {
/* Run through the tasks and mark as skip or not. */
threadpool_map(&e->threadpool, engine_marktasks_fixdt_mapper, s->tasks,
s->nr_tasks, sizeof(struct task), &rebuild_space);
s->nr_tasks, sizeof(struct task), 1000, &rebuild_space);
return rebuild_space;
/* Multiple-timestep case */
} else {
/* Run through the tasks and mark as skip or not. */
/* int extra_data[2] = {e->ti_current, rebuild_space};
int extra_data[2] = {e->ti_current, rebuild_space};
threadpool_map(&e->threadpool, engine_marktasks_sorts_mapper, s->tasks,
s->nr_tasks, sizeof(struct task), NULL);
s->nr_tasks, sizeof(struct task), 1000, NULL);
threadpool_map(&e->threadpool, engine_marktasks_mapper, s->tasks,
s->nr_tasks, sizeof(struct task), extra_data);
rebuild_space = extra_data[1]; */
rebuild_space = engine_marktasks_serial(e);
s->nr_tasks, sizeof(struct task), 1000, extra_data);
rebuild_space = extra_data[1];
}
if (e->verbose)
......
This diff is collapsed.
......@@ -654,7 +654,7 @@ void space_split(struct space *s, struct cell *cells, int verbose) {
const ticks tic = getticks();
threadpool_map(&s->e->threadpool, space_split_mapper, cells, s->nr_cells,
sizeof(struct cell), s);
sizeof(struct cell), 1, s);
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
......@@ -684,8 +684,8 @@ void space_parts_sort(struct space *s, int *ind, size_t N, int min, int max,
sort_struct.xparts = s->xparts;
sort_struct.ind = ind;
sort_struct.stack_size = 2 * (max - min + 1) + 10 + s->e->nr_threads;
if ((sort_struct.stack = malloc(sizeof(struct qstack) *
sort_struct.stack_size)) == NULL)
if ((sort_struct.stack =
malloc(sizeof(struct qstack) * sort_struct.stack_size)) == NULL)
error("Failed to allocate sorting stack.");
for (int i = 0; i < sort_struct.stack_size; i++)
sort_struct.stack[i].ready = 0;
......@@ -702,8 +702,8 @@ void space_parts_sort(struct space *s, int *ind, size_t N, int min, int max,
/* Launch the sorting tasks with a stride of zero such that the same
map data is passed to each thread. */
threadpool_map(&s->e->threadpool, space_parts_sort_mapper,
&sort_struct, s->e->threadpool.num_threads, 0, NULL);
threadpool_map(&s->e->threadpool, space_parts_sort_mapper, &sort_struct,
s->e->threadpool.num_threads, 0, 1, NULL);
/* Verify sort_struct. */
/* for (int i = 1; i < N; i++)
......@@ -721,7 +721,7 @@ void space_parts_sort(struct space *s, int *ind, size_t N, int min, int max,
clocks_getunit());
}
void space_parts_sort_mapper(void *map_data, void *extra_data) {
void space_parts_sort_mapper(void *map_data, int num_elements, void *extra_data) {
/* Unpack the mapping data. */
struct parallel_sort *sort_struct = (struct parallel_sort *)map_data;
......@@ -735,8 +735,7 @@ void space_parts_sort_mapper(void *map_data, void *extra_data) {
while (sort_struct->waiting) {
/* Grab an interval off the queue. */
int qid =
atomic_inc(&sort_struct->first) % sort_struct->stack_size;
int qid = atomic_inc(&sort_struct->first) % sort_struct->stack_size;
/* Wait for the entry to be ready, or for the sorting do be done. */
while (!sort_struct->stack[qid].ready)
......@@ -795,16 +794,14 @@ void space_parts_sort_mapper(void *map_data, void *extra_data) {
/* Recurse on the left? */
if (jj > i && pivot > min) {
qid = atomic_inc(&sort_struct->last) %
sort_struct->stack_size;
qid = atomic_inc(&sort_struct->last) % sort_struct->stack_size;
while (sort_struct->stack[qid].ready)
;
sort_struct->stack[qid].i = i;
sort_struct->stack[qid].j = jj;
sort_struct->stack[qid].min = min;
sort_struct->stack[qid].max = pivot;
if (atomic_inc(&sort_struct->waiting) >=
sort_struct->stack_size)
if (atomic_inc(&sort_struct->waiting) >= sort_struct->stack_size)
error("Qstack overflow.");
sort_struct->stack[qid].ready = 1;
}
......@@ -820,16 +817,14 @@ void space_parts_sort_mapper(void *map_data, void *extra_data) {
/* Recurse on the right? */
if (pivot + 1 < max) {
qid = atomic_inc(&sort_struct->last) %
sort_struct->stack_size;
qid = atomic_inc(&sort_struct->last) % sort_struct->stack_size;
while (sort_struct->stack[qid].ready)
;
sort_struct->stack[qid].i = jj + 1;
sort_struct->stack[qid].j = j;
sort_struct->stack[qid].min = pivot + 1;
sort_struct->stack[qid].max = max;
if (atomic_inc(&sort_struct->waiting) >=
sort_struct->stack_size)
if (atomic_inc(&sort_struct->waiting) >= sort_struct->stack_size)
error("Qstack overflow.");
sort_struct->stack[qid].ready = 1;
}
......@@ -870,8 +865,8 @@ void space_gparts_sort(struct space *s, int *ind, size_t N, int min, int max,