Commit 35ab2dac authored by Matthieu Schaller's avatar Matthieu Schaller
Browse files

Merge branch 'mpi-fixes' into 'master'

Use non-buffered MPI sends for small messages

See merge request !457
parents bee23d1c b6d45870
......@@ -15,6 +15,7 @@ Scheduler:
cell_split_size: 400 # (Optional) Maximal number of particles per cell (this is the default value).
max_top_level_cells: 12 # (Optional) Maximal number of top-level cells in any dimension. The number of top-level cells will be the cube of this (this is the default value).
tasks_per_cell: 0 # (Optional) The average number of tasks per cell. If not large enough the simulation will fail (means guess...).
mpi_message_limit: 4096 # (Optional) Maximum MPI task message size to send non-buffered, KB.
# Parameters governing the time integration (Set dt_min and dt_max to the same value for a fixed time-step run.)
TimeIntegration:
......
......@@ -4701,6 +4701,12 @@ void engine_init(struct engine *e, struct space *s,
scheduler_init(&e->sched, e->s, engine_estimate_nr_tasks(e), nr_queues,
(policy & scheduler_flag_steal), e->nodeID, &e->threadpool);
/* Maximum size of MPI task messages, in KB, that should not be buffered,
* that is sent using MPI_Issend, not MPI_Isend. 4Mb by default.
*/
e->sched.mpi_message_limit =
parser_get_opt_param_int(params, "Scheduler:mpi_message_limit", 4) * 1024;
/* Allocate and init the threads. */
if ((e->runners = (struct runner *)malloc(sizeof(struct runner) *
e->nr_threads)) == NULL)
......
......@@ -1317,26 +1317,49 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
if (t->subtype == task_subtype_tend) {
t->buff = malloc(sizeof(struct pcell_step) * t->ci->pcell_size);
cell_pack_end_step(t->ci, t->buff);
err = MPI_Isend(
t->buff, t->ci->pcell_size * sizeof(struct pcell_step), MPI_BYTE,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
if ((t->ci->pcell_size * sizeof(struct pcell_step)) >
s->mpi_message_limit)
err = MPI_Isend(
t->buff, t->ci->pcell_size * sizeof(struct pcell_step),
MPI_BYTE, t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
else
err = MPI_Issend(
t->buff, t->ci->pcell_size * sizeof(struct pcell_step),
MPI_BYTE, t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
} else if (t->subtype == task_subtype_xv ||
t->subtype == task_subtype_rho ||
t->subtype == task_subtype_gradient) {
err = MPI_Isend(t->ci->parts, t->ci->count, part_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
if ((t->ci->count * sizeof(struct part)) > s->mpi_message_limit)
err = MPI_Isend(t->ci->parts, t->ci->count, part_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
else
err = MPI_Issend(t->ci->parts, t->ci->count, part_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
// message( "sending %i parts with tag=%i from %i to %i." ,
// t->ci->count , t->flags , s->nodeID , t->cj->nodeID );
// fflush(stdout);
} else if (t->subtype == task_subtype_gpart) {
err = MPI_Isend(t->ci->gparts, t->ci->gcount, gpart_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
if ((t->ci->gcount * sizeof(struct gpart)) > s->mpi_message_limit)
err = MPI_Isend(t->ci->gparts, t->ci->gcount, gpart_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
else
err = MPI_Issend(t->ci->gparts, t->ci->gcount, gpart_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
} else if (t->subtype == task_subtype_spart) {
err = MPI_Isend(t->ci->sparts, t->ci->scount, spart_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
if ((t->ci->scount * sizeof(struct spart)) > s->mpi_message_limit)
err = MPI_Isend(t->ci->sparts, t->ci->scount, spart_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
else
err = MPI_Issend(t->ci->sparts, t->ci->scount, spart_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
} else if (t->subtype == task_subtype_multipole) {
err = MPI_Isend(t->ci->multipole, 1, multipole_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
if ((t->ci->scount * sizeof(struct gravity_tensors)) >
s->mpi_message_limit)
err = MPI_Isend(t->ci->multipole, 1, multipole_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
else
err = MPI_Issend(t->ci->multipole, 1, multipole_mpi_type,
t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req);
} else {
error("Unknown communication sub-type");
}
......
......@@ -103,6 +103,10 @@ struct scheduler {
/* The node we are working on. */
int nodeID;
/* Maximum size of task messages, in bytes, to sent using non-buffered
* MPI. */
size_t mpi_message_limit;
/* 'Pointer' to the seed for the random number generator */
pthread_key_t local_seed_pointer;
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment