diff --git a/examples/parameter_example.yml b/examples/parameter_example.yml index 6327540c27753ea61a79d7fb4c16d60c5f00635d..a7fa05a2948a2f4281f8afb8e5526f46cef1c6d3 100644 --- a/examples/parameter_example.yml +++ b/examples/parameter_example.yml @@ -15,6 +15,7 @@ Scheduler: cell_split_size: 400 # (Optional) Maximal number of particles per cell (this is the default value). max_top_level_cells: 12 # (Optional) Maximal number of top-level cells in any dimension. The number of top-level cells will be the cube of this (this is the default value). tasks_per_cell: 0 # (Optional) The average number of tasks per cell. If not large enough the simulation will fail (means guess...). + mpi_message_limit: 4194304 # (Optional) Maximum MPI task message size to send non-buffered # Parameters governing the time integration (Set dt_min and dt_max to the same value for a fixed time-step run.) TimeIntegration: diff --git a/src/engine.c b/src/engine.c index 7a397e1a00b331946602b73dae9c5a11351b429b..fdc534f9a6938c95dc7f955e67488959612838eb 100644 --- a/src/engine.c +++ b/src/engine.c @@ -4636,6 +4636,13 @@ void engine_init(struct engine *e, struct space *s, scheduler_init(&e->sched, e->s, engine_estimate_nr_tasks(e), nr_queues, (policy & scheduler_flag_steal), e->nodeID, &e->threadpool); + /* Maximum size of MPI task messages that should not be buffered, + * that is sent using MPI_Issend, not MPI_Isend. 4Mb by default. + */ + e->sched.mpi_message_limit = + parser_get_opt_param_int(params, "Scheduler:mpi_message_limit", + 4194304); + /* Allocate and init the threads. */ if ((e->runners = (struct runner *)malloc(sizeof(struct runner) * e->nr_threads)) == NULL) diff --git a/src/scheduler.c b/src/scheduler.c index ea2c6bde010dd387b355e961903411c0b18a41bf..8c6fedfb51c36ac3bdb65e7f86537ca190ef6b2a 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -1317,31 +1317,58 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { if (t->subtype == task_subtype_tend) { t->buff = malloc(sizeof(struct pcell_step) * t->ci->pcell_size); cell_pack_end_step(t->ci, t->buff); - err = MPI_Isend( - t->buff, t->ci->pcell_size * sizeof(struct pcell_step), MPI_BYTE, - t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); + if ((t->ci->pcell_size * sizeof(struct pcell_step)) > s->mpi_message_limit) + err = MPI_Isend(t->buff, t->ci->pcell_size * sizeof(struct pcell_step), + MPI_BYTE, t->cj->nodeID, t->flags, MPI_COMM_WORLD, + &t->req); + else + err = MPI_Issend(t->buff, t->ci->pcell_size * sizeof(struct pcell_step), + MPI_BYTE, t->cj->nodeID, t->flags, MPI_COMM_WORLD, + &t->req); } else if (t->subtype == task_subtype_xv || t->subtype == task_subtype_rho || t->subtype == task_subtype_gradient) { - err = MPI_Isend(t->ci->parts, t->ci->count, part_mpi_type, - t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); + if ((t->ci->count * sizeof(struct part)) > s->mpi_message_limit) + err = MPI_Isend(t->ci->parts, t->ci->count, part_mpi_type, + t->cj->nodeID, t->flags, MPI_COMM_WORLD, + &t->req); + else + err = MPI_Issend(t->ci->parts, t->ci->count, part_mpi_type, + t->cj->nodeID, t->flags, MPI_COMM_WORLD, + &t->req); // message( "sending %i parts with tag=%i from %i to %i." , // t->ci->count , t->flags , s->nodeID , t->cj->nodeID ); // fflush(stdout); } else if (t->subtype == task_subtype_gpart) { - err = MPI_Isend(t->ci->gparts, t->ci->gcount, gpart_mpi_type, - t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); + if ((t->ci->gcount * sizeof(struct gpart)) > s->mpi_message_limit) + err = MPI_Isend(t->ci->gparts, t->ci->gcount, gpart_mpi_type, + t->cj->nodeID, t->flags, MPI_COMM_WORLD, + &t->req); + else + err = MPI_Issend(t->ci->gparts, t->ci->gcount, gpart_mpi_type, + t->cj->nodeID, t->flags, MPI_COMM_WORLD, + &t->req); } else if (t->subtype == task_subtype_spart) { - err = MPI_Isend(t->ci->sparts, t->ci->scount, spart_mpi_type, - t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); + if ((t->ci->scount * sizeof(struct spart)) > s->mpi_message_limit) + err = MPI_Isend(t->ci->sparts, t->ci->scount, spart_mpi_type, + t->cj->nodeID, t->flags, MPI_COMM_WORLD, + &t->req); + else + err = MPI_Issend(t->ci->sparts, t->ci->scount, spart_mpi_type, + t->cj->nodeID, t->flags, MPI_COMM_WORLD, + &t->req); } else if (t->subtype == task_subtype_multipole) { - err = MPI_Isend(t->ci->multipole, 1, multipole_mpi_type, - t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); + if ((t->ci->scount * sizeof(struct gravity_tensors)) > s->mpi_message_limit) + err = MPI_Isend(t->ci->multipole, 1, multipole_mpi_type, + t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); + else + err = MPI_Issend(t->ci->multipole, 1, multipole_mpi_type, + t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); } else { error("Unknown communication sub-type"); } if (err != MPI_SUCCESS) { - mpi_error(err, "Failed to emit isend for particle data."); + mpi_error(err, "Failed to emit isend for particle data."); } qid = 0; #else diff --git a/src/scheduler.h b/src/scheduler.h index c5ccbf43f3048dfb47d2d3eb7a7db6634b646700..86c6dd0c5184636eab244b62ff306d44974f4a8c 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -102,6 +102,10 @@ struct scheduler { /* The node we are working on. */ int nodeID; + + /* Maximum size of task messages, in bytes, to sent using non-buffered + * MPI. */ + int mpi_message_limit; /* 'Pointer' to the seed for the random number generator */ pthread_key_t local_seed_pointer;