diff --git a/examples/parameter_example.yml b/examples/parameter_example.yml index 1d09719e011b93e01b39b02ecc3cd90590bd0a32..c029a0a5b862a2d20188c87873f71179a45f20e8 100644 --- a/examples/parameter_example.yml +++ b/examples/parameter_example.yml @@ -180,6 +180,7 @@ DomainDecomposition: # "region", "memory", or "vectorized". initial_grid: [10,10,10] # (Optional) Grid sizes if the "grid" strategy is chosen. + synchronous: 0 # (Optional) Use synchronous MPI requests to redistribute, uses less system memory, but slower. repartition_type: fullcosts # (Optional) The re-decomposition strategy, one of: # "none", "fullcosts", "edgecosts", "memory" or # "timecosts". diff --git a/src/engine.c b/src/engine.c index 1aed4b2a5e0ce44e92a8ee4de86428c2afd0c5c1..55a1f1d6081fe8c114ca39202a9d3bef5d02fc10 100644 --- a/src/engine.c +++ b/src/engine.c @@ -3684,6 +3684,11 @@ void engine_config(int restart, int fof, struct engine *e, engine_maxproxies)) == NULL) error("Failed to allocate memory for proxies."); e->nr_proxies = 0; + + /* Use synchronous MPI sends and receives when redistributing. */ + e->syncredist = + parser_get_opt_param_int(params, "DomainDecomposition:synchronous", 0); + #endif } diff --git a/src/engine.h b/src/engine.h index 72d528969553b6e24ab939ce05acea69c7cb1b0c..68a4df10c08325d5d810b361dab863bf9ee68ea6 100644 --- a/src/engine.h +++ b/src/engine.h @@ -363,6 +363,10 @@ struct engine { /* Step of last repartition. */ int last_repartition; + + /* Use synchronous redistributes. */ + int syncredist; + #endif /* Wallclock time of the last time-step */ diff --git a/src/engine_redistribute.c b/src/engine_redistribute.c index 3132ad2665c67cd244ae1ec9ece75726788c1506..e2c3ea056e57e5097785a9014e4e0d2500d52a52 100644 --- a/src/engine_redistribute.c +++ b/src/engine_redistribute.c @@ -29,7 +29,6 @@ #include "memswap.h" #ifdef WITH_MPI - /** * Do the exchange of one type of particles with all the other nodes. * @@ -44,6 +43,8 @@ * @param mpi_type the MPI_Datatype for these particles. * @param nr_nodes the number of nodes to exchange with. * @param nodeID the id of this node. + * @param syncredist whether to use slower more memory friendly synchronous + * exchanges. * * @result new particle data constructed from all the exchanges with the * given alignment. @@ -51,7 +52,7 @@ static void *engine_do_redistribute(const char *label, int *counts, char *parts, size_t new_nr_parts, size_t sizeofparts, size_t alignsize, MPI_Datatype mpi_type, - int nr_nodes, int nodeID) { + int nr_nodes, int nodeID, int syncredist) { /* Allocate a new particle array with some extra margin */ char *parts_new = NULL; @@ -60,100 +61,178 @@ static void *engine_do_redistribute(const char *label, int *counts, char *parts, sizeofparts * new_nr_parts * engine_redistribute_alloc_margin) != 0) error("Failed to allocate new particle data."); - /* Prepare MPI requests for the asynchronous communications */ - MPI_Request *reqs; - if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 2 * nr_nodes)) == - NULL) - error("Failed to allocate MPI request list."); + if (syncredist) { + + /* Slow synchronous redistribute,. */ + size_t offset_send = 0, offset_recv = 0; - /* Only send and receive only "chunk" particles per request. So we need to - * loop as many times as necessary here. Make 2Gb/sizeofparts so we only - * send 2Gb packets. */ - const int chunk = INT_MAX / sizeofparts; - int sent = 0; - int recvd = 0; + /* Only send and receive only "chunk" particles per request. + * Fixing the message size to 2GB. */ + const int chunk = INT_MAX / sizeofparts; + int res = 0; + for (int k = 0; k < nr_nodes; k++) { + int kk = k; + + /* Rank 0 decides the index of sending node */ + MPI_Bcast(&kk, 1, MPI_INT, 0, MPI_COMM_WORLD); + + int ind_recv = kk * nr_nodes + nodeID; + + if (nodeID == kk) { + + /* Send out our particles. */ + offset_send = 0; + for (int j = 0; j < nr_nodes; j++) { + + int ind_send = kk * nr_nodes + j; + + /* Just copy our own parts */ + if (counts[ind_send] > 0) { + if (j == nodeID) { + memcpy(&parts_new[offset_recv * sizeofparts], + &parts[offset_send * sizeofparts], + sizeofparts * counts[ind_recv]); + offset_send += counts[ind_send]; + offset_recv += counts[ind_recv]; + } else { + for (int i = 0, n = 0; i < counts[ind_send]; n++) { + + /* Count and index, with chunk parts at most. */ + size_t sendc = min(chunk, counts[ind_send] - i); + size_t sendo = offset_send + i; + + res = MPI_Send(&parts[sendo * sizeofparts], sendc, mpi_type, j, + n, MPI_COMM_WORLD); + if (res != MPI_SUCCESS) { + mpi_error(res, "Failed to send parts to node %i from %i.", j, + nodeID); + } + i += sendc; + } + offset_send += counts[ind_send]; + } + } + } + } else { + /* Listen for sends from kk. */ + if (counts[ind_recv] > 0) { + for (int i = 0, n = 0; i < counts[ind_recv]; n++) { + /* Count and index, with +chunk parts at most. */ + size_t recvc = min(chunk, counts[ind_recv] - i); + size_t recvo = offset_recv + i; + + MPI_Status status; + res = MPI_Recv(&parts_new[recvo * sizeofparts], recvc, mpi_type, kk, + n, MPI_COMM_WORLD, &status); + if (res != MPI_SUCCESS) { + mpi_error(res, "Failed to recv of parts from node %i to %i.", kk, + nodeID); + } + i += recvc; + } + offset_recv += counts[ind_recv]; + } + } + } - int activenodes = 1; - while (activenodes) { + } else { + /* Asynchronous redistribute, can take a lot of memory. */ - for (int k = 0; k < 2 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL; + /* Prepare MPI requests for the asynchronous communications */ + MPI_Request *reqs; + if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 2 * nr_nodes)) == + NULL) + error("Failed to allocate MPI request list."); - /* Emit the sends and recvs for the data. */ - size_t offset_send = sent; - size_t offset_recv = recvd; - activenodes = 0; + /* Only send and receive only "chunk" particles per request. So we need to + * loop as many times as necessary here. Make 2Gb/sizeofparts so we only + * send 2Gb packets. */ + const int chunk = INT_MAX / sizeofparts; + int sent = 0; + int recvd = 0; - for (int k = 0; k < nr_nodes; k++) { + int activenodes = 1; + while (activenodes) { - /* Indices in the count arrays of the node of interest */ - const int ind_send = nodeID * nr_nodes + k; - const int ind_recv = k * nr_nodes + nodeID; + for (int k = 0; k < 2 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL; - /* Are we sending any data this loop? */ - int sending = counts[ind_send] - sent; - if (sending > 0) { - activenodes++; - if (sending > chunk) sending = chunk; + /* Emit the sends and recvs for the data. */ + size_t offset_send = sent; + size_t offset_recv = recvd; + activenodes = 0; - /* If the send and receive is local then just copy. */ - if (k == nodeID) { - int receiving = counts[ind_recv] - recvd; - if (receiving > chunk) receiving = chunk; - memcpy(&parts_new[offset_recv * sizeofparts], - &parts[offset_send * sizeofparts], sizeofparts * receiving); - } else { - /* Otherwise send it. */ - int res = - MPI_Isend(&parts[offset_send * sizeofparts], sending, mpi_type, k, - ind_send, MPI_COMM_WORLD, &reqs[2 * k + 0]); - if (res != MPI_SUCCESS) - mpi_error(res, "Failed to isend parts to node %i.", k); - } - } + for (int k = 0; k < nr_nodes; k++) { - /* If we're sending to this node, then move past it to next. */ - if (counts[ind_send] > 0) offset_send += counts[ind_send]; + /* Indices in the count arrays of the node of interest */ + const int ind_send = nodeID * nr_nodes + k; + const int ind_recv = k * nr_nodes + nodeID; - /* Are we receiving any data from this node? Note already done if coming - * from this node. */ - if (k != nodeID) { - int receiving = counts[ind_recv] - recvd; - if (receiving > 0) { + /* Are we sending any data this loop? */ + int sending = counts[ind_send] - sent; + if (sending > 0) { activenodes++; - if (receiving > chunk) receiving = chunk; - int res = MPI_Irecv(&parts_new[offset_recv * sizeofparts], receiving, - mpi_type, k, ind_recv, MPI_COMM_WORLD, - &reqs[2 * k + 1]); - if (res != MPI_SUCCESS) - mpi_error(res, "Failed to emit irecv of parts from node %i.", k); + if (sending > chunk) sending = chunk; + + /* If the send and receive is local then just copy. */ + if (k == nodeID) { + int receiving = counts[ind_recv] - recvd; + if (receiving > chunk) receiving = chunk; + memcpy(&parts_new[offset_recv * sizeofparts], + &parts[offset_send * sizeofparts], sizeofparts * receiving); + } else { + /* Otherwise send it. */ + int res = + MPI_Isend(&parts[offset_send * sizeofparts], sending, mpi_type, + k, ind_send, MPI_COMM_WORLD, &reqs[2 * k + 0]); + if (res != MPI_SUCCESS) + mpi_error(res, "Failed to isend parts to node %i.", k); + } } - } - /* If we're receiving from this node, then move past it to next. */ - if (counts[ind_recv] > 0) offset_recv += counts[ind_recv]; - } + /* If we're sending to this node, then move past it to next. */ + if (counts[ind_send] > 0) offset_send += counts[ind_send]; + + /* Are we receiving any data from this node? Note already done if coming + * from this node. */ + if (k != nodeID) { + int receiving = counts[ind_recv] - recvd; + if (receiving > 0) { + activenodes++; + if (receiving > chunk) receiving = chunk; + int res = MPI_Irecv(&parts_new[offset_recv * sizeofparts], + receiving, mpi_type, k, ind_recv, + MPI_COMM_WORLD, &reqs[2 * k + 1]); + if (res != MPI_SUCCESS) + mpi_error(res, "Failed to emit irecv of parts from node %i.", k); + } + } + + /* If we're receiving from this node, then move past it to next. */ + if (counts[ind_recv] > 0) offset_recv += counts[ind_recv]; + } - /* Wait for all the sends and recvs to tumble in. */ - MPI_Status stats[2 * nr_nodes]; - int res; - if ((res = MPI_Waitall(2 * nr_nodes, reqs, stats)) != MPI_SUCCESS) { - for (int k = 0; k < 2 * nr_nodes; k++) { - char buff[MPI_MAX_ERROR_STRING]; - MPI_Error_string(stats[k].MPI_ERROR, buff, &res); - message("request from source %i, tag %i has error '%s'.", - stats[k].MPI_SOURCE, stats[k].MPI_TAG, buff); + /* Wait for all the sends and recvs to tumble in. */ + MPI_Status stats[2 * nr_nodes]; + int res; + if ((res = MPI_Waitall(2 * nr_nodes, reqs, stats)) != MPI_SUCCESS) { + for (int k = 0; k < 2 * nr_nodes; k++) { + char buff[MPI_MAX_ERROR_STRING]; + MPI_Error_string(stats[k].MPI_ERROR, buff, &res); + message("request from source %i, tag %i has error '%s'.", + stats[k].MPI_SOURCE, stats[k].MPI_TAG, buff); + } + error("Failed during waitall for part data."); } - error("Failed during waitall for part data."); + + /* Move to next chunks. */ + sent += chunk; + recvd += chunk; } - /* Move to next chunks. */ - sent += chunk; - recvd += chunk; + /* Free temps. */ + free(reqs); } - /* Free temps. */ - free(reqs); - /* And return new memory. */ return parts_new; } @@ -430,7 +509,8 @@ static void engine_redistribute_relink_mapper(void *map_data, int num_elements, * 3) The particles to send are placed in a temporary buffer in which the * part-gpart links are preserved. * 4) Each node allocates enough space for the new particles. - * 5) (Asynchronous) communications are issued to transfer the data. + * 5) Asynchronous or synchronous communications are issued to transfer the + * data. * * * @param e The #engine. @@ -895,7 +975,7 @@ void engine_redistribute(struct engine *e) { /* SPH particles. */ void *new_parts = engine_do_redistribute( "parts", counts, (char *)s->parts, nr_parts_new, sizeof(struct part), - part_align, part_mpi_type, nr_nodes, nodeID); + part_align, part_mpi_type, nr_nodes, nodeID, e->syncredist); swift_free("parts", s->parts); s->parts = (struct part *)new_parts; s->nr_parts = nr_parts_new; @@ -904,32 +984,35 @@ void engine_redistribute(struct engine *e) { /* Extra SPH particle properties. */ new_parts = engine_do_redistribute( "xparts", counts, (char *)s->xparts, nr_parts_new, sizeof(struct xpart), - xpart_align, xpart_mpi_type, nr_nodes, nodeID); + xpart_align, xpart_mpi_type, nr_nodes, nodeID, e->syncredist); swift_free("xparts", s->xparts); s->xparts = (struct xpart *)new_parts; /* Gravity particles. */ - new_parts = engine_do_redistribute( - "gparts", g_counts, (char *)s->gparts, nr_gparts_new, - sizeof(struct gpart), gpart_align, gpart_mpi_type, nr_nodes, nodeID); + new_parts = + engine_do_redistribute("gparts", g_counts, (char *)s->gparts, + nr_gparts_new, sizeof(struct gpart), gpart_align, + gpart_mpi_type, nr_nodes, nodeID, e->syncredist); swift_free("gparts", s->gparts); s->gparts = (struct gpart *)new_parts; s->nr_gparts = nr_gparts_new; s->size_gparts = engine_redistribute_alloc_margin * nr_gparts_new; /* Star particles. */ - new_parts = engine_do_redistribute( - "sparts", s_counts, (char *)s->sparts, nr_sparts_new, - sizeof(struct spart), spart_align, spart_mpi_type, nr_nodes, nodeID); + new_parts = + engine_do_redistribute("sparts", s_counts, (char *)s->sparts, + nr_sparts_new, sizeof(struct spart), spart_align, + spart_mpi_type, nr_nodes, nodeID, e->syncredist); swift_free("sparts", s->sparts); s->sparts = (struct spart *)new_parts; s->nr_sparts = nr_sparts_new; s->size_sparts = engine_redistribute_alloc_margin * nr_sparts_new; /* Black holes particles. */ - new_parts = engine_do_redistribute( - "bparts", b_counts, (char *)s->bparts, nr_bparts_new, - sizeof(struct bpart), bpart_align, bpart_mpi_type, nr_nodes, nodeID); + new_parts = + engine_do_redistribute("bparts", b_counts, (char *)s->bparts, + nr_bparts_new, sizeof(struct bpart), bpart_align, + bpart_mpi_type, nr_nodes, nodeID, e->syncredist); swift_free("bparts", s->bparts); s->bparts = (struct bpart *)new_parts; s->nr_bparts = nr_bparts_new;