Commit 6796bc8a authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'mark_tasks_in_drift2' into 'master'

Mark tasks in drift2

This "works" as in "I'm going to be now, it runs 100 steps of CosmoVolume without barfing".

@pdraper, if you have any spare cycles, can you run it on anything you've got until it breaks? Otherwise, I'll just keep on hacking at it as soon as I've got another hour or two. Thanks!

See merge request !267
parents cd34c3b6 98fff2d2
......@@ -572,8 +572,8 @@ int main(int argc, char *argv[]) {
fprintf(file_thread, " %03i 0 0 0 0 %lli %lli 0 0 0 0 %lli\n", myrank,
e.tic_step, e.toc_step, cpufreq);
int count = 0;
for (int l = 0; l < e.sched.nr_tasks; l++)
if (!e.sched.tasks[l].skip && !e.sched.tasks[l].implicit) {
for (int l = 0; l < e.sched.nr_tasks; l++) {
if (!e.sched.tasks[l].implicit && e.sched.tasks[l].toc != 0) {
fprintf(
file_thread, " %03i %i %i %i %i %lli %lli %i %i %i %i %i\n",
myrank, e.sched.tasks[l].rid, e.sched.tasks[l].type,
......@@ -588,11 +588,10 @@ int main(int argc, char *argv[]) {
(e.sched.tasks[l].cj != NULL) ? e.sched.tasks[l].cj->gcount
: 0,
e.sched.tasks[l].flags);
fflush(stdout);
count++;
}
message("rank %d counted %d tasks", myrank, count);
fflush(stdout);
count++;
}
fclose(file_thread);
}
......@@ -608,8 +607,8 @@ int main(int argc, char *argv[]) {
/* Add some information to help with the plots */
fprintf(file_thread, " %i %i %i %i %lli %lli %i %i %i %lli\n", -2, -1, -1,
1, e.tic_step, e.toc_step, 0, 0, 0, cpufreq);
for (int l = 0; l < e.sched.nr_tasks; l++)
if (!e.sched.tasks[l].skip && !e.sched.tasks[l].implicit)
for (int l = 0; l < e.sched.nr_tasks; l++) {
if (!e.sched.tasks[l].implicit && e.sched.tasks[l].toc != 0) {
fprintf(
file_thread, " %i %i %i %i %lli %lli %i %i %i %i\n",
e.sched.tasks[l].rid, e.sched.tasks[l].type,
......@@ -619,6 +618,8 @@ int main(int argc, char *argv[]) {
(e.sched.tasks[l].cj == NULL) ? 0 : e.sched.tasks[l].cj->count,
(e.sched.tasks[l].ci == NULL) ? 0 : e.sched.tasks[l].ci->gcount,
(e.sched.tasks[l].cj == NULL) ? 0 : e.sched.tasks[l].cj->gcount);
}
}
fclose(file_thread);
#endif
}
......
......@@ -52,6 +52,7 @@
#include "gravity.h"
#include "hydro.h"
#include "hydro_properties.h"
#include "scheduler.h"
#include "space.h"
#include "timers.h"
......@@ -896,6 +897,120 @@ int cell_is_drift_needed(struct cell *c, int ti_current) {
return 0;
}
/**
* @brief Un-skips all the tasks associated with a given cell and checks
* if the space needs to be rebuilt.
*
* @param c the #cell.
* @param s the #scheduler.
*
* @return 1 If the space needs rebuilding. 0 otherwise.
*/
int cell_unskip_tasks(struct cell *c, struct scheduler *s) {
/* Un-skip the density tasks involved with this cell. */
for (struct link *l = c->density; l != NULL; l = l->next) {
struct task *t = l->t;
const struct cell *ci = t->ci;
const struct cell *cj = t->cj;
scheduler_activate(s, t);
/* Set the correct sorting flags */
if (t->type == task_type_pair) {
if (!(ci->sorted & (1 << t->flags))) {
atomic_or(&ci->sorts->flags, (1 << t->flags));
scheduler_activate(s, ci->sorts);
}
if (!(cj->sorted & (1 << t->flags))) {
atomic_or(&cj->sorts->flags, (1 << t->flags));
scheduler_activate(s, cj->sorts);
}
}
/* Check whether there was too much particle motion */
if (t->type == task_type_pair || t->type == task_type_sub_pair) {
if (t->tight &&
(max(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin ||
ci->dx_max > space_maxreldx * ci->h_max ||
cj->dx_max > space_maxreldx * cj->h_max))
return 1;
#ifdef WITH_MPI
/* Activate the send/recv flags. */
if (ci->nodeID != engine_rank) {
/* Activate the tasks to recv foreign cell ci's data. */
scheduler_activate(s, ci->recv_xv);
scheduler_activate(s, ci->recv_rho);
scheduler_activate(s, ci->recv_ti);
/* Look for the local cell cj's send tasks. */
struct link *l = NULL;
for (l = cj->send_xv; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_xv task.");
scheduler_activate(s, l->t);
for (l = cj->send_rho; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_rho task.");
scheduler_activate(s, l->t);
for (l = cj->send_ti; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_ti task.");
scheduler_activate(s, l->t);
} else if (cj->nodeID != engine_rank) {
/* Activate the tasks to recv foreign cell cj's data. */
scheduler_activate(s, cj->recv_xv);
scheduler_activate(s, cj->recv_rho);
scheduler_activate(s, cj->recv_ti);
/* Look for the local cell ci's send tasks. */
struct link *l = NULL;
for (l = ci->send_xv; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_xv task.");
scheduler_activate(s, l->t);
for (l = ci->send_rho; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_rho task.");
scheduler_activate(s, l->t);
for (l = ci->send_ti; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_ti task.");
scheduler_activate(s, l->t);
}
#endif
}
}
/* Unskip all the other task types. */
for (struct link *l = c->gradient; l != NULL; l = l->next)
scheduler_activate(s, l->t);
for (struct link *l = c->force; l != NULL; l = l->next)
scheduler_activate(s, l->t);
for (struct link *l = c->grav; l != NULL; l = l->next)
scheduler_activate(s, l->t);
if (c->extra_ghost != NULL) scheduler_activate(s, c->extra_ghost);
if (c->ghost != NULL) scheduler_activate(s, c->ghost);
if (c->init != NULL) scheduler_activate(s, c->init);
if (c->kick != NULL) scheduler_activate(s, c->kick);
if (c->cooling != NULL) scheduler_activate(s, c->cooling);
if (c->sourceterms != NULL) scheduler_activate(s, c->sourceterms);
return 0;
}
/**
* @brief Set the super-cell pointers for all cells in a hierarchy.
*
......
......@@ -38,6 +38,7 @@
/* Avoid cyclic inclusions */
struct space;
struct scheduler;
/* Max tag size set to 2^29 to take into account some MPI implementations
* that use 2^31 as the upper bound on MPI tags and the fact that
......@@ -232,6 +233,7 @@ int cell_are_neighbours(const struct cell *restrict ci,
void cell_check_multipole(struct cell *c, void *data);
void cell_clean(struct cell *c);
int cell_is_drift_needed(struct cell *c, int ti_current);
int cell_unskip_tasks(struct cell *c, struct scheduler *s);
void cell_set_super(struct cell *c, struct cell *super);
#endif /* SWIFT_CELL_H */
......@@ -1346,13 +1346,10 @@ void engine_count_and_link_tasks(struct engine *e) {
struct cell *const ci = t->ci;
struct cell *const cj = t->cj;
if (t->skip) continue;
/* Link sort tasks together. */
if (t->type == task_type_sort && ci->split)
for (int j = 0; j < 8; j++)
if (ci->progeny[j] != NULL && ci->progeny[j]->sorts != NULL) {
ci->progeny[j]->sorts->skip = 0;
scheduler_addunlock(sched, ci->progeny[j]->sorts, t);
}
......@@ -1493,9 +1490,6 @@ void engine_link_gravity_tasks(struct engine *e) {
/* Get a pointer to the task. */
struct task *t = &sched->tasks[k];
/* Skip? */
if (t->skip) continue;
/* Multipole construction */
if (t->type == task_type_grav_up) {
scheduler_addunlock(sched, t, gather);
......@@ -1643,9 +1637,6 @@ void engine_make_extra_hydroloop_tasks(struct engine *e) {
for (int ind = 0; ind < nr_tasks; ind++) {
struct task *t = &sched->tasks[ind];
/* Skip? */
if (t->skip) continue;
/* Self-interaction? */
if (t->type == task_type_self && t->subtype == task_subtype_density) {
......@@ -2002,7 +1993,7 @@ void engine_maketasks(struct engine *e) {
}
/**
* @brief Mark tasks to be skipped and set the sort flags accordingly.
* @brief Mark tasks to be un-skipped and set the sort flags accordingly.
* Threadpool mapper function for fixdt version.
*
* @param map_data pointer to the tasks
......@@ -2013,11 +2004,15 @@ void engine_marktasks_fixdt_mapper(void *map_data, int num_elements,
void *extra_data) {
/* Unpack the arguments. */
struct task *tasks = (struct task *)map_data;
int *rebuild_space = (int *)extra_data;
size_t *rebuild_space = &((size_t *)extra_data)[0];
struct scheduler *s = (struct scheduler *)(((size_t *)extra_data)[1]);
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &tasks[ind];
/* All tasks are unskipped (we skip by default). */
scheduler_activate(s, t);
/* Pair? */
if (t->type == task_type_pair || t->type == task_type_sub_pair) {
......@@ -2044,28 +2039,7 @@ void engine_marktasks_fixdt_mapper(void *map_data, int num_elements,
}
/**
* @brief Mark any sort tasks as initially skipped.
* Threadpool mapper function.
*
* @param map_data pointer to the tasks
* @param num_elements number of tasks
* @param extra_data unused
*/
void engine_marktasks_sorts_mapper(void *map_data, int num_elements,
void *extra_data) {
/* Unpack the arguments. */
struct task *tasks = (struct task *)map_data;
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &tasks[ind];
if (t->type == task_type_sort) {
t->flags = 0;
t->skip = 1;
}
}
}
/**
* @brief Mark tasks to be skipped and set the sort flags accordingly.
* @brief Mark tasks to be un-skipped and set the sort flags accordingly.
* Threadpool mapper function.
*
* @param map_data pointer to the tasks
......@@ -2076,18 +2050,20 @@ void engine_marktasks_mapper(void *map_data, int num_elements,
void *extra_data) {
/* Unpack the arguments. */
struct task *tasks = (struct task *)map_data;
const int ti_end = ((int *)extra_data)[0];
int *rebuild_space = &((int *)extra_data)[1];
const int ti_end = ((size_t *)extra_data)[0];
size_t *rebuild_space = &((size_t *)extra_data)[1];
struct scheduler *s = (struct scheduler *)(((size_t *)extra_data)[2]);
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &tasks[ind];
/* Single-cell task? */
if (t->type == task_type_self || t->type == task_type_ghost ||
t->type == task_type_sub_self) {
t->type == task_type_extra_ghost || t->type == task_type_cooling ||
t->type == task_type_sourceterms || t->type == task_type_sub_self) {
/* Set this task's skip. */
t->skip = (t->ci->ti_end_min > ti_end);
if (t->ci->ti_end_min <= ti_end) scheduler_activate(s, t);
}
/* Pair? */
......@@ -2104,19 +2080,24 @@ void engine_marktasks_mapper(void *map_data, int num_elements,
cj->dx_max > space_maxreldx * cj->h_max))
*rebuild_space = 1;
/* Set this task's skip. */
if ((t->skip = (ci->ti_end_min > ti_end && cj->ti_end_min > ti_end)) == 1)
/* Set this task's skip, otherwise nothing to do. */
if (ci->ti_end_min <= ti_end || cj->ti_end_min <= ti_end)
scheduler_activate(s, t);
else
continue;
/* If this is not a density task, we don't have to do any of the below. */
if (t->subtype != task_subtype_density) continue;
/* Set the sort flags. */
if (t->type == task_type_pair && t->subtype != task_subtype_grav) {
if (t->type == task_type_pair) {
if (!(ci->sorted & (1 << t->flags))) {
atomic_or(&ci->sorts->flags, (1 << t->flags));
ci->sorts->skip = 0;
scheduler_activate(s, ci->sorts);
}
if (!(cj->sorted & (1 << t->flags))) {
atomic_or(&cj->sorts->flags, (1 << t->flags));
cj->sorts->skip = 0;
scheduler_activate(s, cj->sorts);
}
}
......@@ -2126,9 +2107,9 @@ void engine_marktasks_mapper(void *map_data, int num_elements,
if (ci->nodeID != engine_rank) {
/* Activate the tasks to recv foreign cell ci's data. */
ci->recv_xv->skip = 0;
ci->recv_rho->skip = 0;
ci->recv_ti->skip = 0;
scheduler_activate(s, ci->recv_xv);
scheduler_activate(s, ci->recv_rho);
scheduler_activate(s, ci->recv_ti);
/* Look for the local cell cj's send tasks. */
struct link *l = NULL;
......@@ -2136,45 +2117,46 @@ void engine_marktasks_mapper(void *map_data, int num_elements,
l = l->next)
;
if (l == NULL) error("Missing link to send_xv task.");
l->t->skip = 0;
scheduler_activate(s, l->t);
for (l = cj->send_rho; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_rho task.");
l->t->skip = 0;
scheduler_activate(s, l->t);
for (l = cj->send_ti; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_ti task.");
l->t->skip = 0;
scheduler_activate(s, l->t);
} else if (cj->nodeID != engine_rank) {
/* Activate the tasks to recv foreign cell cj's data. */
cj->recv_xv->skip = 0;
cj->recv_rho->skip = 0;
cj->recv_ti->skip = 0;
scheduler_activate(s, cj->recv_xv);
scheduler_activate(s, cj->recv_rho);
scheduler_activate(s, cj->recv_ti);
/* Look for the local cell ci's send tasks. */
struct link *l = NULL;
for (l = ci->send_xv; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_xv task.");
l->t->skip = 0;
scheduler_activate(s, l->t);
for (l = ci->send_rho; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_rho task.");
l->t->skip = 0;
scheduler_activate(s, l->t);
for (l = ci->send_ti; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_ti task.");
l->t->skip = 0;
scheduler_activate(s, l->t);
}
#endif
......@@ -2182,25 +2164,26 @@ void engine_marktasks_mapper(void *map_data, int num_elements,
/* Kick? */
else if (t->type == task_type_kick) {
t->skip = (t->ci->ti_end_min > ti_end);
t->ci->updated = 0;
t->ci->g_updated = 0;
if (t->ci->ti_end_min <= ti_end) scheduler_activate(s, t);
}
/* Init? */
else if (t->type == task_type_init) {
/* Set this task's skip. */
t->skip = (t->ci->ti_end_min > ti_end);
if (t->ci->ti_end_min <= ti_end) scheduler_activate(s, t);
}
/* None? */
else if (t->type == task_type_none)
t->skip = 1;
/* Tasks with no cells should not be skipped? */
else if (t->type == task_type_grav_gather_m ||
t->type == task_type_grav_fft) {
scheduler_activate(s, t);
}
}
}
/**
* @brief Mark tasks to be skipped and set the sort flags accordingly.
* @brief Mark tasks to be un-skipped and set the sort flags accordingly.
*
* @return 1 if the space has to be rebuilt, 0 otherwise.
*/
......@@ -2214,31 +2197,16 @@ int engine_marktasks(struct engine *e) {
if (e->policy & engine_policy_fixdt) {
/* Run through the tasks and mark as skip or not. */
size_t extra_data[2] = {rebuild_space, (size_t)&e->sched};
threadpool_map(&e->threadpool, engine_marktasks_fixdt_mapper, s->tasks,
s->nr_tasks, sizeof(struct task), 1000, &rebuild_space);
s->nr_tasks, sizeof(struct task), 1000, extra_data);
return rebuild_space;
/* Multiple-timestep case */
} else {
/* Run through the tasks and mark as skip or not. */
int extra_data[2] = {e->ti_current, rebuild_space};
threadpool_map(&e->threadpool, engine_marktasks_sorts_mapper, s->tasks,
s->nr_tasks, sizeof(struct task), 10000, NULL);
#ifdef WITH_MPI
if (e->policy & engine_policy_mpi) {
/* Skip all sends and recvs, we will unmark if needed. */
for (int k = 0; k < s->nr_tasks; k++) {
struct task *t = &s->tasks[k];
if (t->type == task_type_send || t->type == task_type_recv) {
t->skip = 1;
}
}
}
#endif
size_t extra_data[3] = {e->ti_current, rebuild_space, (size_t)&e->sched};
threadpool_map(&e->threadpool, engine_marktasks_mapper, s->tasks,
s->nr_tasks, sizeof(struct task), 10000, extra_data);
rebuild_space = extra_data[1];
......@@ -2259,16 +2227,21 @@ int engine_marktasks(struct engine *e) {
*/
void engine_print_task_counts(struct engine *e) {
struct scheduler *sched = &e->sched;
const ticks tic = getticks();
struct scheduler *const sched = &e->sched;
const int nr_tasks = sched->nr_tasks;
const struct task *const tasks = sched->tasks;
/* Count and print the number of each task type. */
int counts[task_type_count + 1];
for (int k = 0; k <= task_type_count; k++) counts[k] = 0;
for (int k = 0; k < sched->nr_tasks; k++)
if (!sched->tasks[k].skip)
counts[(int)sched->tasks[k].type] += 1;
else
for (int k = 0; k < nr_tasks; k++) {
if (tasks[k].skip)
counts[task_type_count] += 1;
else
counts[(int)tasks[k].type] += 1;
}
message("Total = %d", nr_tasks);
#ifdef WITH_MPI
printf("[%04i] %s engine_print_task_counts: task counts are [ %s=%i",
e->nodeID, clocks_get_timesincestart(), taskID_names[0], counts[0]);
......@@ -2282,6 +2255,10 @@ void engine_print_task_counts(struct engine *e) {
fflush(stdout);
message("nr_parts = %zu.", e->s->nr_parts);
message("nr_gparts = %zu.", e->s->nr_gparts);
if (e->verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
clocks_getunit());
}
/**
......@@ -2334,7 +2311,7 @@ void engine_prepare(struct engine *e, int nodrift) {
TIMER_TIC;
/* Run through the tasks and mark as skip or not. */
int rebuild = (e->forcerebuild || engine_marktasks(e));
int rebuild = e->forcerebuild;
/* Collect the values of rebuild from all nodes. */
#ifdef WITH_MPI
......@@ -2444,6 +2421,10 @@ void engine_collect_kick(struct cell *c) {
ti_end_min = min(ti_end_min, cp->ti_end_min);
updated += cp->updated;
g_updated += cp->g_updated;
/* Collected, so clear for next time. */
cp->updated = 0;
cp->g_updated = 0;
}
}
}
......@@ -2479,6 +2460,10 @@ void engine_collect_timestep(struct engine *e) {
ti_end_min = min(ti_end_min, c->ti_end_min);
updates += c->updated;
g_updates += c->g_updated;
/* Collected, so clear for next time. */
c->updated = 0;
c->g_updated = 0;
}
/* Aggregate the data from the different nodes. */
......@@ -2807,7 +2792,7 @@ void engine_step(struct engine *e) {
e->timeLastStatistics += e->deltaTimeStatistics;
}
/* Drift only the necessary particles, that all means all particles
/* Drift only the necessary particles, that means all particles
* if we are about to repartition. */
int repart = (e->forcerepart != REPART_NONE);
e->drift_all = repart || e->drift_all;
......@@ -2928,8 +2913,9 @@ void engine_drift(struct engine *e) {
const ticks tic = getticks();
threadpool_map(&e->threadpool, runner_do_drift_mapper, e->s->cells_top,
e->s->nr_cells, sizeof(struct cell), 1, e);
if (e->verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
message("took %.3f %s (including task unskipping).", clocks_from_ticks(getticks() - tic),
clocks_getunit());
}
......
......@@ -747,26 +747,47 @@ void runner_do_ghost(struct runner *r, struct cell *c) {
}
/**
* @brief Drift particles and g-particles in a cell forward in time
* @brief Drift particles and g-particles in a cell forward in time,
* unskipping any tasks associated with active cells.
*
* @param c The cell.
* @param e The engine.
* @param drift whether to actually drift the particles, will not be
* necessary for non-local cells.
*/
static void runner_do_drift(struct cell *c, struct engine *e) {
static void runner_do_drift(struct cell *c, struct engine *e, int drift) {
const int ti_current = e->ti_current;
/* Unskip any active tasks. */
if (c->ti_end_min == e->ti_current) {
const int forcerebuild = cell_unskip_tasks(c, &e->sched);
if (forcerebuild) atomic_inc(&e->forcerebuild);
}
/* Do we really need to drift? */
if (drift) {
if (!e->drift_all && !cell_is_drift_needed(c, ti_current)) return;
} else {
/* Not drifting, but may still need to recurse for task skipping. */
if (c->split) {
for (int k = 0; k < 8; k++) {
if (c->progeny[k] != NULL) {
struct cell *cp = c->progeny[k];
runner_do_drift(cp, e, 0);
}
}
}
return;
}
const double timeBase = e->timeBase;
const int ti_old = c->ti_old;
const int ti_current = e->ti_current;
struct part *const parts = c->parts;
struct xpart *const xparts = c->xparts;
struct gpart *const gparts = c->gparts;
/* Do we need to drift ? */
if (!e->drift_all && !cell_is_drift_needed(c, ti_current)) return;
/* Check that we are actually going to move forward. */