Commit 5c2c2f35 authored by Peter W. Draper's avatar Peter W. Draper Committed by Matthieu Schaller
Browse files

Use MPI communicators to reduce the use of tags.

parent 2b9c96d6
...@@ -186,6 +186,7 @@ int cell_pack(struct cell *restrict c, struct pcell *restrict pc) { ...@@ -186,6 +186,7 @@ int cell_pack(struct cell *restrict c, struct pcell *restrict pc) {
pc->count = c->count; pc->count = c->count;
pc->gcount = c->gcount; pc->gcount = c->gcount;
pc->scount = c->scount; pc->scount = c->scount;
c->tag = pc->tag = atomic_inc(&cell_next_tag) % cell_max_tag; c->tag = pc->tag = atomic_inc(&cell_next_tag) % cell_max_tag;
#ifdef SWIFT_DEBUG_CHECKS #ifdef SWIFT_DEBUG_CHECKS
pc->cellID = c->cellID; pc->cellID = c->cellID;
......
...@@ -1325,12 +1325,12 @@ void engine_addtasks_send_hydro(struct engine *e, struct cell *ci, ...@@ -1325,12 +1325,12 @@ void engine_addtasks_send_hydro(struct engine *e, struct cell *ci,
if (t_xv == NULL) { if (t_xv == NULL) {
t_xv = scheduler_addtask(s, task_type_send, task_subtype_xv, t_xv = scheduler_addtask(s, task_type_send, task_subtype_xv,
6 * ci->tag + 0, 0, ci, cj); ci->tag, 0, ci, cj);
t_rho = scheduler_addtask(s, task_type_send, task_subtype_rho, t_rho = scheduler_addtask(s, task_type_send, task_subtype_rho,
6 * ci->tag + 1, 0, ci, cj); ci->tag, 0, ci, cj);
#ifdef EXTRA_HYDRO_LOOP #ifdef EXTRA_HYDRO_LOOP
t_gradient = scheduler_addtask(s, task_type_send, task_subtype_gradient, t_gradient = scheduler_addtask(s, task_type_send, task_subtype_gradient,
6 * ci->tag + 3, 0, ci, cj); ci->tag, 0, ci, cj);
#endif #endif
#ifdef EXTRA_HYDRO_LOOP #ifdef EXTRA_HYDRO_LOOP
...@@ -1414,7 +1414,7 @@ void engine_addtasks_send_gravity(struct engine *e, struct cell *ci, ...@@ -1414,7 +1414,7 @@ void engine_addtasks_send_gravity(struct engine *e, struct cell *ci,
if (t_grav == NULL) { if (t_grav == NULL) {
t_grav = scheduler_addtask(s, task_type_send, task_subtype_gpart, t_grav = scheduler_addtask(s, task_type_send, task_subtype_gpart,
6 * ci->tag + 4, 0, ci, cj); ci->tag, 0, ci, cj);
/* The sends should unlock the down pass. */ /* The sends should unlock the down pass. */
scheduler_addunlock(s, t_grav, ci->super_gravity->grav_down); scheduler_addunlock(s, t_grav, ci->super_gravity->grav_down);
...@@ -1474,7 +1474,7 @@ void engine_addtasks_send_timestep(struct engine *e, struct cell *ci, ...@@ -1474,7 +1474,7 @@ void engine_addtasks_send_timestep(struct engine *e, struct cell *ci,
if (t_ti == NULL) { if (t_ti == NULL) {
t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend, t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend,
6 * ci->tag + 2, 0, ci, cj); ci->tag, 0, ci, cj);
/* The super-cell's timestep task should unlock the send_ti task. */ /* The super-cell's timestep task should unlock the send_ti task. */
scheduler_addunlock(s, ci->super->timestep, t_ti); scheduler_addunlock(s, ci->super->timestep, t_ti);
...@@ -1515,13 +1515,13 @@ void engine_addtasks_recv_hydro(struct engine *e, struct cell *c, ...@@ -1515,13 +1515,13 @@ void engine_addtasks_recv_hydro(struct engine *e, struct cell *c,
if (t_xv == NULL && c->density != NULL) { if (t_xv == NULL && c->density != NULL) {
/* Create the tasks. */ /* Create the tasks. */
t_xv = scheduler_addtask(s, task_type_recv, task_subtype_xv, 6 * c->tag + 0, t_xv = scheduler_addtask(s, task_type_recv, task_subtype_xv, c->tag,
0, c, NULL); 0, c, NULL);
t_rho = scheduler_addtask(s, task_type_recv, task_subtype_rho, t_rho = scheduler_addtask(s, task_type_recv, task_subtype_rho,
6 * c->tag + 1, 0, c, NULL); c->tag, 0, c, NULL);
#ifdef EXTRA_HYDRO_LOOP #ifdef EXTRA_HYDRO_LOOP
t_gradient = scheduler_addtask(s, task_type_recv, task_subtype_gradient, t_gradient = scheduler_addtask(s, task_type_recv, task_subtype_gradient,
6 * c->tag + 3, 0, c, NULL); c->tag, 0, c, NULL);
#endif #endif
} }
...@@ -1577,7 +1577,7 @@ void engine_addtasks_recv_gravity(struct engine *e, struct cell *c, ...@@ -1577,7 +1577,7 @@ void engine_addtasks_recv_gravity(struct engine *e, struct cell *c,
/* Create the tasks. */ /* Create the tasks. */
t_grav = scheduler_addtask(s, task_type_recv, task_subtype_gpart, t_grav = scheduler_addtask(s, task_type_recv, task_subtype_gpart,
6 * c->tag + 4, 0, c, NULL); c->tag, 0, c, NULL);
} }
c->recv_grav = t_grav; c->recv_grav = t_grav;
...@@ -1613,7 +1613,7 @@ void engine_addtasks_recv_timestep(struct engine *e, struct cell *c, ...@@ -1613,7 +1613,7 @@ void engine_addtasks_recv_timestep(struct engine *e, struct cell *c,
if (t_ti == NULL && (c->grav != NULL || c->density != NULL)) { if (t_ti == NULL && (c->grav != NULL || c->density != NULL)) {
t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend, t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend,
6 * c->tag + 2, 0, c, NULL); c->tag, 0, c, NULL);
} }
c->recv_ti = t_ti; c->recv_ti = t_ti;
...@@ -4793,7 +4793,7 @@ void engine_step(struct engine *e) { ...@@ -4793,7 +4793,7 @@ void engine_step(struct engine *e) {
if (e->verbose) engine_print_task_counts(e); if (e->verbose) engine_print_task_counts(e);
/* Dump local cells and active particle counts. */ /* Dump local cells and active particle counts. */
/* dumpCells("cells", 0, 0, 0, 0, e->s, e->nodeID, e->step); */ //dumpCells("cells", 1, 0, 0, 0, e->s, e->nodeID, e->step);
#ifdef SWIFT_DEBUG_CHECKS #ifdef SWIFT_DEBUG_CHECKS
/* Check that we have the correct total mass in the top-level multipoles */ /* Check that we have the correct total mass in the top-level multipoles */
...@@ -6341,6 +6341,7 @@ void engine_config(int restart, struct engine *e, struct swift_params *params, ...@@ -6341,6 +6341,7 @@ void engine_config(int restart, struct engine *e, struct swift_params *params,
#ifdef WITH_MPI #ifdef WITH_MPI
part_create_mpi_types(); part_create_mpi_types();
stats_create_MPI_type(); stats_create_MPI_type();
task_create_mpi_comms();
#endif #endif
/* Initialise the collection group. */ /* Initialise the collection group. */
......
...@@ -1558,27 +1558,31 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { ...@@ -1558,27 +1558,31 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
t->ci->pcell_size); t->ci->pcell_size);
err = MPI_Irecv( err = MPI_Irecv(
t->buff, t->ci->pcell_size * sizeof(struct pcell_step), MPI_BYTE, t->buff, t->ci->pcell_size * sizeof(struct pcell_step), MPI_BYTE,
t->ci->nodeID, t->flags, MPI_COMM_WORLD, &t->req); t->ci->nodeID, t->flags, subtaskMPI_comms[t->subtype], &t->req);
} else if (t->subtype == task_subtype_xv || } else if (t->subtype == task_subtype_xv ||
t->subtype == task_subtype_rho || t->subtype == task_subtype_rho ||
t->subtype == task_subtype_gradient) { t->subtype == task_subtype_gradient) {
err = MPI_Irecv(t->ci->parts, t->ci->count, part_mpi_type, err = MPI_Irecv(t->ci->parts, t->ci->count, part_mpi_type,
t->ci->nodeID, t->flags, MPI_COMM_WORLD, &t->req); t->ci->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
// message( "receiving %i parts with tag=%i from %i to %i." , // message( "receiving %i parts with tag=%i from %i to %i." ,
// t->ci->count , t->flags , t->ci->nodeID , s->nodeID ); // t->ci->count , t->flags , t->ci->nodeID , s->nodeID );
// fflush(stdout); // fflush(stdout);
} else if (t->subtype == task_subtype_gpart) { } else if (t->subtype == task_subtype_gpart) {
err = MPI_Irecv(t->ci->gparts, t->ci->gcount, gpart_mpi_type, err = MPI_Irecv(t->ci->gparts, t->ci->gcount, gpart_mpi_type,
t->ci->nodeID, t->flags, MPI_COMM_WORLD, &t->req); t->ci->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
} else if (t->subtype == task_subtype_spart) { } else if (t->subtype == task_subtype_spart) {
err = MPI_Irecv(t->ci->sparts, t->ci->scount, spart_mpi_type, err = MPI_Irecv(t->ci->sparts, t->ci->scount, spart_mpi_type,
t->ci->nodeID, t->flags, MPI_COMM_WORLD, &t->req); t->ci->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
} else if (t->subtype == task_subtype_multipole) { } else if (t->subtype == task_subtype_multipole) {
t->buff = (struct gravity_tensors *)malloc( t->buff = (struct gravity_tensors *)malloc(
sizeof(struct gravity_tensors) * t->ci->pcell_size); sizeof(struct gravity_tensors) * t->ci->pcell_size);
err = MPI_Irecv( err = MPI_Irecv(
t->buff, sizeof(struct gravity_tensors) * t->ci->pcell_size, t->buff, sizeof(struct gravity_tensors) * t->ci->pcell_size,
MPI_BYTE, t->ci->nodeID, t->flags, MPI_COMM_WORLD, &t->req); MPI_BYTE, t->ci->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
} else { } else {
error("Unknown communication sub-type"); error("Unknown communication sub-type");
} }
...@@ -1600,44 +1604,53 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { ...@@ -1600,44 +1604,53 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
s->mpi_message_limit) s->mpi_message_limit)
err = MPI_Isend( err = MPI_Isend(
t->buff, t->ci->pcell_size * sizeof(struct pcell_step), t->buff, t->ci->pcell_size * sizeof(struct pcell_step),
MPI_BYTE, t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); MPI_BYTE, t->cj->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
else else
err = MPI_Issend( err = MPI_Issend(
t->buff, t->ci->pcell_size * sizeof(struct pcell_step), t->buff, t->ci->pcell_size * sizeof(struct pcell_step),
MPI_BYTE, t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); MPI_BYTE, t->cj->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
} else if (t->subtype == task_subtype_xv || } else if (t->subtype == task_subtype_xv ||
t->subtype == task_subtype_rho || t->subtype == task_subtype_rho ||
t->subtype == task_subtype_gradient) { t->subtype == task_subtype_gradient) {
if ((t->ci->count * sizeof(struct part)) > s->mpi_message_limit) if ((t->ci->count * sizeof(struct part)) > s->mpi_message_limit)
err = MPI_Isend(t->ci->parts, t->ci->count, part_mpi_type, err = MPI_Isend(t->ci->parts, t->ci->count, part_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); t->cj->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
else else
err = MPI_Issend(t->ci->parts, t->ci->count, part_mpi_type, err = MPI_Issend(t->ci->parts, t->ci->count, part_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); t->cj->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
// message( "sending %i parts with tag=%i from %i to %i." , // message( "sending %i parts with tag=%i from %i to %i." ,
// t->ci->count , t->flags , s->nodeID , t->cj->nodeID ); // t->ci->count , t->flags , s->nodeID , t->cj->nodeID );
// fflush(stdout); // fflush(stdout);
} else if (t->subtype == task_subtype_gpart) { } else if (t->subtype == task_subtype_gpart) {
if ((t->ci->gcount * sizeof(struct gpart)) > s->mpi_message_limit) if ((t->ci->gcount * sizeof(struct gpart)) > s->mpi_message_limit)
err = MPI_Isend(t->ci->gparts, t->ci->gcount, gpart_mpi_type, err = MPI_Isend(t->ci->gparts, t->ci->gcount, gpart_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); t->cj->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
else else
err = MPI_Issend(t->ci->gparts, t->ci->gcount, gpart_mpi_type, err = MPI_Issend(t->ci->gparts, t->ci->gcount, gpart_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); t->cj->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
} else if (t->subtype == task_subtype_spart) { } else if (t->subtype == task_subtype_spart) {
if ((t->ci->scount * sizeof(struct spart)) > s->mpi_message_limit) if ((t->ci->scount * sizeof(struct spart)) > s->mpi_message_limit)
err = MPI_Isend(t->ci->sparts, t->ci->scount, spart_mpi_type, err = MPI_Isend(t->ci->sparts, t->ci->scount, spart_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); t->cj->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
else else
err = MPI_Issend(t->ci->sparts, t->ci->scount, spart_mpi_type, err = MPI_Issend(t->ci->sparts, t->ci->scount, spart_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); t->cj->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
} else if (t->subtype == task_subtype_multipole) { } else if (t->subtype == task_subtype_multipole) {
t->buff = (struct gravity_tensors *)malloc( t->buff = (struct gravity_tensors *)malloc(
sizeof(struct gravity_tensors) * t->ci->pcell_size); sizeof(struct gravity_tensors) * t->ci->pcell_size);
cell_pack_multipoles(t->ci, (struct gravity_tensors *)t->buff); cell_pack_multipoles(t->ci, (struct gravity_tensors *)t->buff);
err = MPI_Isend( err = MPI_Isend(
t->buff, t->ci->pcell_size * sizeof(struct gravity_tensors), t->buff, t->ci->pcell_size * sizeof(struct gravity_tensors),
MPI_BYTE, t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); MPI_BYTE, t->cj->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
} else { } else {
error("Unknown communication sub-type"); error("Unknown communication sub-type");
} }
......
...@@ -63,6 +63,11 @@ const char *subtaskID_names[task_subtype_count] = { ...@@ -63,6 +63,11 @@ const char *subtaskID_names[task_subtype_count] = {
"none", "density", "gradient", "force", "grav", "external_grav", "none", "density", "gradient", "force", "grav", "external_grav",
"tend", "xv", "rho", "gpart", "multipole", "spart"}; "tend", "xv", "rho", "gpart", "multipole", "spart"};
#ifdef WITH_MPI
/* MPI communicators for the subtypes. */
MPI_Comm subtaskMPI_comms[task_subtype_count];
#endif
/** /**
* @brief Computes the overlap between the parts array of two given cells. * @brief Computes the overlap between the parts array of two given cells.
* *
...@@ -485,3 +490,15 @@ void task_print(const struct task *t) { ...@@ -485,3 +490,15 @@ void task_print(const struct task *t) {
taskID_names[t->type], subtaskID_names[t->subtype], t->wait, taskID_names[t->type], subtaskID_names[t->subtype], t->wait,
t->nr_unlock_tasks, t->skip); t->nr_unlock_tasks, t->skip);
} }
#ifdef WITH_MPI
/**
* @brief Create global communicators for each of the subtasks.
*/
void task_create_mpi_comms(void) {
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtaskMPI_comms[i]);
}
}
#endif
...@@ -110,6 +110,13 @@ extern const char *taskID_names[]; ...@@ -110,6 +110,13 @@ extern const char *taskID_names[];
*/ */
extern const char *subtaskID_names[]; extern const char *subtaskID_names[];
/**
* @brief The MPI communicators for the different subtypes.
*/
#ifdef WITH_MPI
extern MPI_Comm subtaskMPI_comms[task_subtype_count];
#endif
/** /**
* @brief A task to be run by the #scheduler. * @brief A task to be run by the #scheduler.
*/ */
...@@ -187,5 +194,7 @@ float task_overlap(const struct task *ta, const struct task *tb); ...@@ -187,5 +194,7 @@ float task_overlap(const struct task *ta, const struct task *tb);
int task_lock(struct task *t); int task_lock(struct task *t);
void task_do_rewait(struct task *t); void task_do_rewait(struct task *t);
void task_print(const struct task *t); void task_print(const struct task *t);
#ifdef WITH_MPI
void task_create_mpi_comms(void);
#endif
#endif /* SWIFT_TASK_H */ #endif /* SWIFT_TASK_H */
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment