Commit 7a4be38d authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'mpi_skip' into 'master'

Mpi skip

Fixes #171.

Adds new tasks to propagate cell end times to other nodes so 
that the related tasks can be skipped, avoiding unnecessary
MPI requests.

See merge request !191
parents 86851808 17118801
......@@ -26,8 +26,8 @@ Valid options are:
-f {int} Overwrite the CPU frequency (Hz) to be used for time measurements
-g Run with an external gravitational potential
-G Run with self-gravity
-n {int} Execute a fixed number of time steps. Defaults to -1, which means
use the time_end parameter to stop.
-n {int} Execute a fixed number of time steps. When unset use the time_end
parameter to stop.
-s Run with SPH
-t {int} The number of threads to use on each MPI rank. Defaults to 1 if not specified.
-v [12] Increase the level of verbosity 1: MPI-rank 0 writes
......
......@@ -74,7 +74,8 @@ void print_help_message() {
printf(" %2s %8s %s\n", "-g", "",
"Run with an external gravitational potential");
printf(" %2s %8s %s\n", "-G", "", "Run with self-gravity");
printf(" %2s %8s %s\n", "-n", "{int}", "Execute a fixed number of time steps");
printf(" %2s %8s %s\n", "-n", "{int}",
"Execute a fixed number of time steps");
printf(" %2s %8s %s\n", "-s", "", "Run with SPH");
printf(" %2s %8s %s\n", "-t", "{int}",
"The number of threads to use on each MPI rank. Defaults to 1 if not "
......@@ -139,7 +140,7 @@ int main(int argc, char *argv[]) {
int with_aff = 0;
int dry_run = 0;
int dump_tasks = 0;
int nsteps = -1;
int nsteps = -2;
int with_cosmology = 0;
int with_external_gravity = 0;
int with_self_gravity = 0;
......
......@@ -128,6 +128,7 @@ int cell_unpack(struct pcell *pc, struct cell *c, struct space *s) {
}
/* Return the total number of unpacked cells. */
c->pcell_size = count;
return count;
}
......@@ -213,6 +214,39 @@ int cell_pack(struct cell *c, struct pcell *pc) {
pc->progeny[k] = -1;
/* Return the number of packed cells used. */
c->pcell_size = count;
return count;
}
int cell_pack_ti_ends(struct cell *c, int *ti_ends) {
/* Pack this cell's data. */
ti_ends[0] = c->ti_end_min;
/* Fill in the progeny, depth-first recursion. */
int count = 1;
for (int k = 0; k < 8; k++)
if (c->progeny[k] != NULL) {
count += cell_pack_ti_ends(c->progeny[k], &ti_ends[count]);
}
/* Return the number of packed values. */
return count;
}
int cell_unpack_ti_ends(struct cell *c, int *ti_ends) {
/* Unpack this cell's data. */
c->ti_end_min = ti_ends[0];
/* Fill in the progeny, depth-first recursion. */
int count = 1;
for (int k = 0; k < 8; k++)
if (c->progeny[k] != NULL) {
count += cell_unpack_ti_ends(c->progeny[k], &ti_ends[count]);
}
/* Return the number of packed values. */
return count;
}
......
......@@ -125,7 +125,10 @@ struct cell {
struct task *ghost, *init, *drift, *kick;
/* Task receiving data. */
struct task *recv_xv, *recv_rho;
struct task *recv_xv, *recv_rho, *recv_ti;
/* Task send data. */
struct link *send_xv, *send_rho, *send_ti;
/* Tasks for gravity tree. */
struct task *grav_up, *grav_down;
......@@ -185,6 +188,8 @@ int cell_glocktree(struct cell *c);
void cell_gunlocktree(struct cell *c);
int cell_pack(struct cell *c, struct pcell *pc);
int cell_unpack(struct pcell *pc, struct cell *c, struct space *s);
int cell_pack_ti_ends(struct cell *c, int *ti_ends);
int cell_unpack_ti_ends(struct cell *c, int *ti_ends);
int cell_getsize(struct cell *c);
int cell_link_parts(struct cell *c, struct part *parts);
int cell_link_gparts(struct cell *c, struct gpart *gparts);
......
......@@ -128,10 +128,12 @@ void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c,
engine_policy_external_gravity;
const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;
/* Is this the super-cell? */
if (super == NULL && (c->grav != NULL || (c->gcount > 0 && !c->split))) {
/* Am I the super-cell? */
/* TODO(pedro): Add a condition for gravity tasks as well. */
if (super == NULL &&
(c->density != NULL || (!c->split && (c->count > 0 || c->gcount > 0)))) {
/* This is the super cell, i.e. the first with gravity tasks attached. */
/* This is the super cell, i.e. the first with density tasks attached. */
super = c;
/* Local tasks only... */
......@@ -674,44 +676,63 @@ void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
*
* @param e The #engine.
* @param ci The sending #cell.
* @param cj The receiving #cell
* @param cj Dummy cell containing the nodeID of the receiving node.
* @param t_xv The send_xv #task, if it has already been created.
* @param t_rho The send_rho #task, if it has already been created.
* @param t_ti The send_ti #task, if required and has already been created.
*/
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj) {
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj,
struct task *t_xv, struct task *t_rho,
struct task *t_ti) {
#ifdef WITH_MPI
struct link *l = NULL;
struct scheduler *s = &e->sched;
const int nodeID = cj->nodeID;
/* Check if any of the density tasks are for the target node. */
for (l = ci->density; l != NULL; l = l->next)
if (l->t->ci->nodeID == cj->nodeID ||
(l->t->cj != NULL && l->t->cj->nodeID == cj->nodeID))
if (l->t->ci->nodeID == nodeID ||
(l->t->cj != NULL && l->t->cj->nodeID == nodeID))
break;
/* If so, attach send tasks. */
if (l != NULL) {
/* Create the tasks. */
struct task *t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
2 * ci->tag, 0, ci, cj, 0);
struct task *t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
2 * ci->tag + 1, 0, ci, cj, 0);
/* Create the tasks and their dependencies? */
if (t_xv == NULL) {
t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
3 * ci->tag, 0, ci, cj, 0);
t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
3 * ci->tag + 1, 0, ci, cj, 0);
if (!(e->policy & engine_policy_fixdt))
t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend,
3 * ci->tag + 2, 0, ci, cj, 0);
/* The send_rho task depends on the cell's ghost task. */
scheduler_addunlock(s, ci->super->ghost, t_rho);
/* The send_rho task depends on the cell's ghost task. */
scheduler_addunlock(s, ci->super->ghost, t_rho);
/* The send_rho task should unlock the super-cell's kick task. */
scheduler_addunlock(s, t_rho, ci->super->kick);
/* The send_rho task should unlock the super-cell's kick task. */
scheduler_addunlock(s, t_rho, ci->super->kick);
/* The send_xv task should unlock the super-cell's ghost task. */
scheduler_addunlock(s, t_xv, ci->super->ghost);
/* The send_xv task should unlock the super-cell's ghost task. */
scheduler_addunlock(s, t_xv, ci->super->ghost);
/* The super-cell's kick task should unlock the send_ti task. */
if (t_ti != NULL) scheduler_addunlock(s, ci->super->kick, t_ti);
}
/* Add them to the local cell. */
ci->send_xv = engine_addlink(e, ci->send_xv, t_xv);
ci->send_rho = engine_addlink(e, ci->send_rho, t_rho);
if (t_ti != NULL) ci->send_ti = engine_addlink(e, ci->send_ti, t_ti);
}
/* Recurse? */
else if (ci->split)
if (ci->split)
for (int k = 0; k < 8; k++)
if (ci->progeny[k] != NULL) engine_addtasks_send(e, ci->progeny[k], cj);
if (ci->progeny[k] != NULL)
engine_addtasks_send(e, ci->progeny[k], cj, t_xv, t_rho, t_ti);
#else
error("SWIFT was not compiled with MPI support.");
......@@ -722,40 +743,51 @@ void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj) {
* @brief Add recv tasks to a hierarchy of cells.
*
* @param e The #engine.
* @param c The #cell.
* @param c The foreign #cell.
* @param t_xv The recv_xv #task, if it has already been created.
* @param t_rho The recv_rho #task, if it has already been created.
* @param t_ti The recv_ti #task, if required and has already been created.
*/
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
struct task *t_rho) {
struct task *t_rho, struct task *t_ti) {
#ifdef WITH_MPI
struct scheduler *s = &e->sched;
/* Do we need to construct a recv task? */
if (t_xv == NULL && c->nr_density > 0) {
/* Do we need to construct a recv task?
Note that since c is a foreign cell, all its density tasks will involve
only the current rank, and thus we don't have to check them.*/
if (t_xv == NULL && c->density != NULL) {
/* Create the tasks. */
t_xv = c->recv_xv = scheduler_addtask(s, task_type_recv, task_subtype_none,
2 * c->tag, 0, c, NULL, 0);
t_rho = c->recv_rho = scheduler_addtask(
s, task_type_recv, task_subtype_none, 2 * c->tag + 1, 0, c, NULL, 0);
}
t_xv = scheduler_addtask(s, task_type_recv, task_subtype_none, 3 * c->tag,
0, c, NULL, 0);
t_rho = scheduler_addtask(s, task_type_recv, task_subtype_none,
3 * c->tag + 1, 0, c, NULL, 0);
if (!(e->policy & engine_policy_fixdt))
t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend,
3 * c->tag + 2, 0, c, NULL, 0);
}
c->recv_xv = t_xv;
c->recv_rho = t_rho;
c->recv_ti = t_ti;
/* Add dependencies. */
for (struct link *l = c->density; l != NULL; l = l->next) {
scheduler_addunlock(s, t_xv, l->t);
scheduler_addunlock(s, l->t, t_rho);
}
for (struct link *l = c->force; l != NULL; l = l->next)
for (struct link *l = c->force; l != NULL; l = l->next) {
scheduler_addunlock(s, t_rho, l->t);
if (t_ti != NULL) scheduler_addunlock(s, l->t, t_ti);
}
if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);
/* Recurse? */
if (c->split)
for (int k = 0; k < 8; k++)
if (c->progeny[k] != NULL)
engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho);
engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho, t_ti);
#else
error("SWIFT was not compiled with MPI support.");
......@@ -1566,12 +1598,13 @@ void engine_maketasks(struct engine *e) {
/* Loop through the proxy's incoming cells and add the
recv tasks. */
for (int k = 0; k < p->nr_cells_in; k++)
engine_addtasks_recv(e, p->cells_in[k], NULL, NULL);
engine_addtasks_recv(e, p->cells_in[k], NULL, NULL, NULL);
/* Loop through the proxy's outgoing cells and add the
send tasks. */
for (int k = 0; k < p->nr_cells_out; k++)
engine_addtasks_send(e, p->cells_out[k], p->cells_in[0]);
engine_addtasks_send(e, p->cells_out[k], p->cells_in[0], NULL, NULL,
NULL);
}
}
......@@ -1646,10 +1679,9 @@ int engine_marktasks(struct engine *e) {
for (int k = 0; k < nr_tasks; k++) {
/* Get a handle on the kth task. */
struct task *t = &tasks[ind[k]];
struct task *t = &tasks[k];
/* Sort-task? Note that due to the task ranking, the sorts
will all come before the pairs. */
/* Sort-task? */
if (t->type == task_type_sort) {
/* Re-set the flags. */
......@@ -1658,6 +1690,24 @@ int engine_marktasks(struct engine *e) {
}
/* Send/recv-task? */
else if (t->type == task_type_send || t->type == task_type_recv) {
t->skip = 1;
}
}
/* Run through the tasks and mark as skip or not. */
for (int k = 0; k < nr_tasks; k++) {
/* Get a handle on the kth task. */
struct task *t = &tasks[k];
/* Skip sorts, sends, and recvs. */
if (t->type == task_type_sort || t->type == task_type_send ||
t->type == task_type_recv) {
continue;
}
/* Single-cell task? */
else if (t->type == task_type_self || t->type == task_type_ghost ||
t->type == task_type_sub_self) {
......@@ -1673,9 +1723,6 @@ int engine_marktasks(struct engine *e) {
const struct cell *ci = t->ci;
const struct cell *cj = t->cj;
/* Set this task's skip. */
t->skip = (ci->ti_end_min > ti_end && cj->ti_end_min > ti_end);
/* Too much particle movement? */
if (t->tight &&
(fmaxf(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin ||
......@@ -1683,8 +1730,13 @@ int engine_marktasks(struct engine *e) {
cj->dx_max > space_maxreldx * cj->h_max))
return 1;
/* Set this task's skip. */
if ((t->skip = (ci->ti_end_min > ti_end && cj->ti_end_min > ti_end)) ==
1)
continue;
/* Set the sort flags. */
if (!t->skip && t->type == task_type_pair) {
if (t->type == task_type_pair) {
if (!(ci->sorted & (1 << t->flags))) {
ci->sorts->flags |= (1 << t->flags);
ci->sorts->skip = 0;
......@@ -1695,6 +1747,68 @@ int engine_marktasks(struct engine *e) {
}
}
/* Activate the send/recv flags. */
if (ci->nodeID != e->nodeID) {
/* Activate the tasks to recv foreign cell ci's data. */
ci->recv_xv->skip = 0;
ci->recv_rho->skip = 0;
ci->recv_ti->skip = 0;
/* Look for the local cell cj's send tasks. */
struct link *l = NULL;
for (l = cj->send_xv; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next)
;
if (l == NULL) {
abort();
error("Missing link to send_xv task.");
}
l->t->skip = 0;
for (l = cj->send_rho; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_rho task.");
l->t->skip = 0;
for (l = cj->send_ti; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_ti task.");
l->t->skip = 0;
} else if (cj->nodeID != e->nodeID) {
/* Activate the tasks to recv foreign cell cj's data. */
cj->recv_xv->skip = 0;
cj->recv_rho->skip = 0;
cj->recv_ti->skip = 0;
/* Look for the local cell ci's send tasks. */
struct link *l = NULL;
for (l = ci->send_xv; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next)
;
if (l == NULL) {
abort();
error("Missing link to send_xv task.");
}
l->t->skip = 0;
for (l = ci->send_rho; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_rho task.");
l->t->skip = 0;
for (l = ci->send_ti; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_ti task.");
l->t->skip = 0;
}
}
/* Kick? */
......@@ -2202,9 +2316,9 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs) {
/* Add MPI tasks if need be */
if (e->policy & engine_policy_mpi) {
mask |= 1 << task_type_send;
mask |= 1 << task_type_recv;
submask |= 1 << task_subtype_tend;
}
/* Now, launch the calculation */
......@@ -2332,9 +2446,9 @@ void engine_step(struct engine *e) {
/* Add MPI tasks if need be */
if (e->policy & engine_policy_mpi) {
mask |= 1 << task_type_send;
mask |= 1 << task_type_recv;
submask |= 1 << task_subtype_tend;
}
/* Send off the runners. */
......
......@@ -980,19 +980,36 @@ void runner_do_recv_cell(struct runner *r, struct cell *c, int timer) {
int ti_end_max = 0;
float h_max = 0.f;
/* Collect everything... */
for (size_t k = 0; k < nr_parts; k++) {
const int ti_end = parts[k].ti_end;
// if(ti_end < ti_current) error("Received invalid particle !");
ti_end_min = min(ti_end_min, ti_end);
ti_end_max = max(ti_end_max, ti_end);
h_max = fmaxf(h_max, parts[k].h);
/* If this cell is a leaf, collect the particle data. */
if (!c->split) {
/* Collect everything... */
for (size_t k = 0; k < nr_parts; k++) {
const int ti_end = parts[k].ti_end;
// if(ti_end < ti_current) error("Received invalid particle !");
ti_end_min = min(ti_end_min, ti_end);
ti_end_max = max(ti_end_max, ti_end);
h_max = fmaxf(h_max, parts[k].h);
}
for (size_t k = 0; k < nr_gparts; k++) {
const int ti_end = gparts[k].ti_end;
// if(ti_end < ti_current) error("Received invalid particle !");
ti_end_min = min(ti_end_min, ti_end);
ti_end_max = max(ti_end_max, ti_end);
}
}
for (size_t k = 0; k < nr_gparts; k++) {
const int ti_end = gparts[k].ti_end;
// if(ti_end < ti_current) error("Received invalid particle !");
ti_end_min = min(ti_end_min, ti_end);
ti_end_max = max(ti_end_max, ti_end);
/* Otherwise, recurse and collect. */
else {
for (int k = 0; k < 8; k++) {
if (c->progeny[k] != NULL) {
runner_do_recv_cell(r, c->progeny[k], 0);
ti_end_min = min(ti_end_min, c->progeny[k]->ti_end_min);
ti_end_max = max(ti_end_max, c->progeny[k]->ti_end_max);
h_max = fmaxf(h_max, c->progeny[k]->h_max);
}
}
}
/* ... and store. */
......@@ -1098,9 +1115,17 @@ void *runner_main(void *data) {
runner_do_kick_fixdt(r, ci, 1);
break;
case task_type_send:
if (t->subtype == task_subtype_tend) {
free(t->buff);
}
break;
case task_type_recv:
runner_do_recv_cell(r, ci, 1);
if (t->subtype == task_subtype_tend) {
cell_unpack_ti_ends(ci, t->buff);
free(t->buff);
} else {
runner_do_recv_cell(r, ci, 1);
}
break;
case task_type_grav_external:
runner_do_grav_external(r, t->ci, 1);
......
......@@ -565,9 +565,6 @@ struct task *scheduler_addtask(struct scheduler *s, int type, int subtype,
t->rid = -1;
t->last_rid = -1;
/* Init the lock. */
lock_init(&t->lock);
/* Add an index for it. */
// lock_lock( &s->lock );
s->tasks_ind[atomic_inc(&s->nr_tasks)] = ind;
......@@ -978,8 +975,14 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
break;
case task_type_recv:
#ifdef WITH_MPI
err = MPI_Irecv(t->ci->parts, t->ci->count, part_mpi_type,
t->ci->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
if (t->subtype == task_subtype_tend) {
t->buff = malloc(sizeof(int) * t->ci->pcell_size);
err = MPI_Irecv(t->buff, t->ci->pcell_size, MPI_INT, t->ci->nodeID,
t->flags, MPI_COMM_WORLD, &t->req);
} else {
err = MPI_Irecv(t->ci->parts, t->ci->count, part_mpi_type,
t->ci->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
}
if (err != MPI_SUCCESS) {
mpi_error(err, "Failed to emit irecv for particle data.");
}
......@@ -993,8 +996,15 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
break;
case task_type_send:
#ifdef WITH_MPI
err = MPI_Isend(t->ci->parts, t->ci->count, part_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
if (t->subtype == task_subtype_tend) {
t->buff = malloc(sizeof(int) * t->ci->pcell_size);
cell_pack_ti_ends(t->ci, t->buff);
err = MPI_Isend(t->buff, t->ci->pcell_size, MPI_INT, t->cj->nodeID,
t->flags, MPI_COMM_WORLD, &t->req);
} else {
err = MPI_Isend(t->ci->parts, t->ci->count, part_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
}
if (err != MPI_SUCCESS) {
mpi_error(err, "Failed to emit isend for particle data.");
}
......
......@@ -54,7 +54,7 @@ const char *taskID_names[task_type_count] = {
"split_cell", "rewait"};
const char *subtaskID_names[task_type_count] = {"none", "density", "force",
"grav"};
"grav", "t_end"};
/**
* @brief Computes the overlap between the parts array of two given cells.
......@@ -213,77 +213,6 @@ int task_lock(struct task *t) {
return 1;
}
/**
* @brief Remove all unlocks to tasks that are of the given type.
*
* @param t The #task.
* @param type The task type ID to remove.
*/
void task_cleanunlock(struct task *t, int type) {
int k;
lock_lock(&t->lock);
for (k = 0; k < t->nr_unlock_tasks; k++)
if (t->unlock_tasks[k]->type == type) {
t->nr_unlock_tasks -= 1;
t->unlock_tasks[k] = t->unlock_tasks[t->nr_unlock_tasks];
}
lock_unlock_blind(&t->lock);
}
/**
* @brief Remove an unlock_task from the given task.
*
* @param ta The unlocking #task.
* @param tb The #task that will be unlocked.
*/
void task_rmunlock(struct task *ta, struct task *tb) {
int k;
lock_lock(&ta->lock);
for (k = 0; k < ta->nr_unlock_tasks; k++)
if (ta->unlock_tasks[k] == tb) {
ta->nr_unlock_tasks -= 1;
ta->unlock_tasks[k] = ta->unlock_tasks[ta->nr_unlock_tasks];
lock_unlock_blind(&ta->lock);
return;
}
error("Task not found.");
}
/**
* @brief Remove an unlock_task from the given task.
*
* @param ta The unlocking #task.
* @param tb The #task that will be unlocked.
*
* Differs from #task_rmunlock in that it will not fail if
* the task @c tb is not in the unlocks of @c ta.
*/
void task_rmunlock_blind(struct task *ta, struct task *tb) {
int k;
lock_lock(&ta->lock);