diff --git a/src/engine.c b/src/engine.c index 2d3a5337433280a2af736351f3b0611b01a8d2a0..9254a7184de06e4119f319df55c99edfd9b865b7 100644 --- a/src/engine.c +++ b/src/engine.c @@ -139,39 +139,56 @@ void engine_make_ghost_tasks(struct engine *e, struct cell *c, * @brief Redistribute the particles amongst the nodes according * to their cell's node IDs. * + * The strategy here is as follows: + * 1) Each node counts the number of particles it has to send to each other + * node. + * 2) The number of particles of each type is then exchanged. + * 3) The particles to send are placed in a temporary buffer in which the + * part-gpart links are preserved. + * 4) Each node allocates enough space for the new particles. + * 5) (Asynchronous) communications are issued to transfer the data. + * + * * @param e The #engine. */ - void engine_redistribute(struct engine *e) { #ifdef WITH_MPI - int nr_nodes = e->nr_nodes, nodeID = e->nodeID; + const int nr_nodes = e->nr_nodes; + const int nodeID = e->nodeID; struct space *s = e->s; - int my_cells = 0; - int *cdim = s->cdim; struct cell *cells = s->cells; - int nr_cells = s->nr_cells; + const int nr_cells = s->nr_cells; + const int *cdim = s->cdim; + const double ih[3] = {s->ih[0], s->ih[1], s->ih[2]}; + const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]}; + struct part *parts = s->parts; + struct xpart *xparts = s->xparts; + struct gpart *gparts = s->gparts; ticks tic = getticks(); - /* Start by sorting the particles according to their nodes and - getting the counts. The counts array is indexed as - count[from * nr_nodes + to]. */ - int *counts; - size_t *dest; - double ih[3], dim[3]; - ih[0] = s->ih[0]; - ih[1] = s->ih[1]; - ih[2] = s->ih[2]; - dim[0] = s->dim[0]; - dim[1] = s->dim[1]; - dim[2] = s->dim[2]; - if ((counts = (int *)malloc(sizeof(int) *nr_nodes *nr_nodes)) == NULL || - (dest = (size_t *)malloc(sizeof(size_t) * s->nr_parts)) == NULL) - error("Failed to allocate count and dest buffers."); + /* Allocate temporary arrays to store the counts of particles to be sent + and the destination of each particle */ + int *counts, *g_counts; + if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL) + error("Failed to allocate count temporary buffer."); + if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL) + error("Failed to allocate gcount temporary buffer."); bzero(counts, sizeof(int) * nr_nodes * nr_nodes); - struct part *parts = s->parts; + bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes); + + // MATTHIEU: Should be int and not size_t once Pedro's changes are merged. + size_t *dest, *g_dest; + if ((dest = (size_t *)malloc(sizeof(size_t) * s->nr_parts)) == NULL) + error("Failed to allocate dest temporary buffer."); + if ((g_dest = (size_t *)malloc(sizeof(size_t) * s->nr_gparts)) == NULL) + error("Failed to allocate g_dest temporary buffer."); + + /* Get destination of each particle */ for (size_t k = 0; k < s->nr_parts; k++) { + + /* Periodic boundary conditions */ for (int j = 0; j < 3; j++) { if (parts[k].x[j] < 0.0) parts[k].x[j] += dim[j]; @@ -184,36 +201,115 @@ void engine_redistribute(struct engine *e) { error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].", cid, k, parts[k].x[0], parts[k].x[1], parts[k].x[2]); */ dest[k] = cells[cid].nodeID; + + /* The counts array is indexed as count[from * nr_nodes + to]. */ counts[nodeID * nr_nodes + dest[k]] += 1; } + + /* Sort the particles according to their cell index. */ space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose); + /* We need to re-link the gpart partners of parts. */ + int current_dest = dest[0]; + size_t count_this_dest = 0; + for (size_t k = 0; k < s->nr_parts; ++k) { + if (s->parts[k].gpart != NULL) { + + /* As the addresses will be invalidated by the communications, we will */ + /* instead store the absolute index from the start of the sub-array */ + /* of particles to be sent to a given node. */ + /* Recall that gparts without partners have a negative id. */ + /* We will restore the pointers on the receiving node later on. */ + if (dest[k] != current_dest) { + current_dest = dest[k]; + count_this_dest = 0; + } + + s->parts[k].gpart->id = count_this_dest; + count_this_dest++; + } + } + + /* Get destination of each g-particle */ + for (size_t k = 0; k < s->nr_gparts; k++) { + + /* Periodic boundary conditions */ + for (int j = 0; j < 3; j++) { + if (gparts[k].x[j] < 0.0) + gparts[k].x[j] += dim[j]; + else if (gparts[k].x[j] >= dim[j]) + gparts[k].x[j] -= dim[j]; + } + const int cid = cell_getid(cdim, gparts[k].x[0] * ih[0], + gparts[k].x[1] * ih[1], gparts[k].x[2] * ih[2]); + /* if (cid < 0 || cid >= s->nr_cells) + error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].", + cid, k, g_parts[k].x[0], g_parts[k].x[1], g_parts[k].x[2]); */ + g_dest[k] = cells[cid].nodeID; + + /* The counts array is indexed as count[from * nr_nodes + to]. */ + g_counts[nodeID * nr_nodes + dest[k]] += 1; + } + + /* Sort the gparticles according to their cell index. */ + space_gparts_sort(gparts, g_dest, s->nr_gparts, 0, nr_nodes - 1); + /* Get all the counts from all the nodes. */ if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS) error("Failed to allreduce particle transfer counts."); - /* Get the new number of parts for this node, be generous in allocating. */ - size_t nr_parts = 0; + /* Get all the g_counts from all the nodes. */ + if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT, + MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS) + error("Failed to allreduce gparticle transfer counts."); + + /* Each node knows how many parts and gparts will be transferred to every + other node. We can start preparing to receive data */ + + /* Get the new number of parts and gparts for this node */ + size_t nr_parts = 0, nr_gparts = 0; for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID]; + for (int k = 0; k < nr_nodes; k++) + nr_gparts += g_counts[k * nr_nodes + nodeID]; + + /* Allocate the new arrays with some extra margin */ struct part *parts_new = NULL; - struct xpart *xparts_new = NULL, *xparts = s->xparts; + struct xpart *xparts_new = NULL; + struct gpart *gparts_new = NULL; if (posix_memalign((void **)&parts_new, part_align, - sizeof(struct part) * nr_parts * 1.2) != 0 || - posix_memalign((void **)&xparts_new, part_align, - sizeof(struct xpart) * nr_parts * 1.2) != 0) + sizeof(struct part) * nr_parts * + engine_redistribute_alloc_margin) != 0) error("Failed to allocate new part data."); - - /* Emit the sends and recvs for the particle data. */ + if (posix_memalign((void **)&xparts_new, xpart_align, + sizeof(struct xpart) * nr_parts * + engine_redistribute_alloc_margin) != 0) + error("Failed to allocate new xpart data."); + if (posix_memalign((void **)&gparts_new, gpart_align, + sizeof(struct gpart) * nr_gparts * + engine_redistribute_alloc_margin) != 0) + error("Failed to allocate new gpart data."); + + /* Prepare MPI requests for the asynchronous communications */ MPI_Request *reqs; - if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 4 * nr_nodes)) == + if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) == NULL) error("Failed to allocate MPI request list."); - for (int k = 0; k < 4 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL; - for (size_t offset_send = 0, offset_recv = 0, k = 0; k < nr_nodes; k++) { - int ind_send = nodeID * nr_nodes + k; - int ind_recv = k * nr_nodes + nodeID; + for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL; + + /* Emit the sends and recvs for the particle and gparticle data. */ + size_t offset_send = 0, offset_recv = 0; + size_t g_offset_send = 0, g_offset_recv = 0; + for (int k = 0; k < nr_nodes; k++) { + + /* Indices in the count arrays of the node of interest */ + const int ind_send = nodeID * nr_nodes + k; + const int ind_recv = k * nr_nodes + nodeID; + + /* Are we sending any part/xpart ? */ if (counts[ind_send] > 0) { + + /* If the send is to the same node, just copy */ if (k == nodeID) { memcpy(&parts_new[offset_recv], &s->parts[offset_send], sizeof(struct part) * counts[ind_recv]); @@ -221,36 +317,71 @@ void engine_redistribute(struct engine *e) { sizeof(struct xpart) * counts[ind_recv]); offset_send += counts[ind_send]; offset_recv += counts[ind_recv]; + + /* Else, emit some communications */ } else { if (MPI_Isend(&s->parts[offset_send], counts[ind_send], - e->part_mpi_type, k, 2 * ind_send + 0, MPI_COMM_WORLD, - &reqs[4 * k]) != MPI_SUCCESS) - error("Failed to isend parts to node %zi.", k); + e->part_mpi_type, k, 3 * ind_send + 0, MPI_COMM_WORLD, + &reqs[6 * k]) != MPI_SUCCESS) + error("Failed to isend parts to node %i.", k); if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], - e->xpart_mpi_type, k, 2 * ind_send + 1, MPI_COMM_WORLD, - &reqs[4 * k + 1]) != MPI_SUCCESS) - error("Failed to isend xparts to node %zi.", k); + e->xpart_mpi_type, k, 3 * ind_send + 1, MPI_COMM_WORLD, + &reqs[6 * k + 1]) != MPI_SUCCESS) + error("Failed to isend xparts to node %i.", k); offset_send += counts[ind_send]; } } + + /* Are we sending any gpart ? */ + if (g_counts[ind_send] > 0) { + + /* If the send is to the same node, just copy */ + if (k == nodeID) { + memcpy(&gparts_new[g_offset_recv], &s->gparts[offset_send], + sizeof(struct gpart) * g_counts[ind_recv]); + g_offset_send += g_counts[ind_send]; + g_offset_recv += g_counts[ind_recv]; + + /* Else, emit some communications */ + } else { + if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send], + e->gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD, + &reqs[6 * k + 2]) != MPI_SUCCESS) + error("Failed to isend gparts to node %i.", k); + g_offset_send += counts[ind_send]; + } + } + + /* Now emit the corresponding Irecv() */ + + /* Are we receiving any part/xpart from this node ? */ if (k != nodeID && counts[ind_recv] > 0) { if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], e->part_mpi_type, - k, 2 * ind_recv + 0, MPI_COMM_WORLD, - &reqs[4 * k + 2]) != MPI_SUCCESS) - error("Failed to emit irecv of parts from node %zi.", k); + k, 3 * ind_recv + 0, MPI_COMM_WORLD, + &reqs[6 * k + 3]) != MPI_SUCCESS) + error("Failed to emit irecv of parts from node %i.", k); if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], - e->xpart_mpi_type, k, 2 * ind_recv + 1, MPI_COMM_WORLD, - &reqs[4 * k + 3]) != MPI_SUCCESS) - error("Failed to emit irecv of parts from node %zi.", k); + e->xpart_mpi_type, k, 3 * ind_recv + 1, MPI_COMM_WORLD, + &reqs[6 * k + 4]) != MPI_SUCCESS) + error("Failed to emit irecv of xparts from node %i.", k); offset_recv += counts[ind_recv]; } + + /* Are we receiving any gpart from this node ? */ + if (k != nodeID && g_counts[ind_recv] > 0) { + if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv], + e->gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD, + &reqs[6 * k + 5]) != MPI_SUCCESS) + error("Failed to emit irecv of gparts from node %i.", k); + g_offset_recv += g_counts[ind_recv]; + } } /* Wait for all the sends and recvs to tumble in. */ - MPI_Status stats[4 * nr_nodes]; + MPI_Status stats[6 * nr_nodes]; int res; - if ((res = MPI_Waitall(4 * nr_nodes, reqs, stats)) != MPI_SUCCESS) { - for (int k = 0; k < 4 * nr_nodes; k++) { + if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) { + for (int k = 0; k < 6 * nr_nodes; k++) { char buff[MPI_MAX_ERROR_STRING]; int res; MPI_Error_string(stats[k].MPI_ERROR, buff, &res); @@ -259,6 +390,32 @@ void engine_redistribute(struct engine *e) { error("Failed during waitall for part data."); } + /* We now need to restore the part<->gpart links */ + size_t offset_parts = 0, offset_gparts = 0; + for (int node = 0; node < nr_nodes; ++node) { + + const int ind_recv = node * nr_nodes + nodeID; + const size_t count_parts = counts[ind_recv]; + const size_t count_gparts = g_counts[ind_recv]; + + /* Loop over the gparts received from that node */ + for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) { + + /* Does this gpart have a partner ? */ + if (gparts_new[k].id >= 0) { + + const size_t partner_index = offset_parts + gparts_new[k].id; + + /* Re-link */ + gparts_new[k].part = &parts_new[partner_index]; + gparts_new[k].part->gpart = &gparts_new[k]; + } + } + + offset_parts += count_parts; + offset_gparts += count_gparts; + } + /* Verify that all parts are in the right place. */ /* for ( int k = 0 ; k < nr_parts ; k++ ) { int cid = cell_getid( cdim , parts_new[k].x[0]*ih[0], parts_new[k].x[1]*ih[1], parts_new[k].x[2]*ih[2] ); @@ -266,26 +423,45 @@ void engine_redistribute(struct engine *e) { error( "Received particle (%i) that does not belong here (nodeID=%i).", k , cells[ cid ].nodeID ); } */ + /* Verify that the links are correct */ + /* MATTHIEU: To be commented out once we are happy */ + for (size_t k = 0; k < nr_gparts; ++k) { + + if (gparts_new[k].id > 0) { + + if (gparts_new[k].x[0] != gparts_new[k].part->x[0] || + gparts_new[k].x[1] != gparts_new[k].part->x[1] || + gparts_new[k].x[2] != gparts_new[k].part->x[2]) + message("Linked particles are not at the same position !"); + } + } + /* Set the new part data, free the old. */ free(parts); free(xparts); + free(gparts); s->parts = parts_new; s->xparts = xparts_new; + s->gparts = gparts_new; s->nr_parts = nr_parts; - s->size_parts = 1.2 * nr_parts; - - /* Be verbose about what just happened. */ - for (int k = 0; k < nr_cells; k++) - if (cells[k].nodeID == nodeID) my_cells += 1; - if (e->verbose) - message("node %i now has %zi parts in %i cells.", nodeID, nr_parts, - my_cells); + s->nr_gparts = nr_gparts; + s->size_parts = engine_redistribute_alloc_margin * nr_parts; + s->size_gparts = engine_redistribute_alloc_margin * nr_gparts; - /* Clean up other stuff. */ + /* Clean up the temporary stuff. */ free(reqs); free(counts); free(dest); + /* Be verbose about what just happened. */ + if (e->verbose) { + int my_cells = 0; + for (int k = 0; k < nr_cells; k++) + if (cells[k].nodeID == nodeID) my_cells += 1; + message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID, + nr_parts, nr_gparts, my_cells); + } + if (e->verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), clocks_getunit()); @@ -2175,6 +2351,7 @@ void engine_init(struct engine *e, struct space *s, float dt, int nr_threads, #ifdef WITH_MPI part_create_mpi_type(&e->part_mpi_type); xpart_create_mpi_type(&e->xpart_mpi_type); + gpart_create_mpi_type(&e->gpart_mpi_type); #endif /* First of all, init the barrier and lock it. */ diff --git a/src/engine.h b/src/engine.h index 741ae1f553494e435394f529606b4cb794b0e3d2..45a36acec326e0897ddc3c2bc1d9567263cab70d 100644 --- a/src/engine.h +++ b/src/engine.h @@ -62,6 +62,7 @@ extern const char *engine_policy_names[]; #define engine_maxtaskspercell 96 #define engine_maxproxies 64 #define engine_tasksreweight 10 +#define engine_redistribute_alloc_margin 1.2 /* The rank of the engine as a global variable (for messages). */ extern int engine_rank; @@ -165,6 +166,7 @@ struct engine { /* MPI data type for the particle transfers */ MPI_Datatype part_mpi_type; MPI_Datatype xpart_mpi_type; + MPI_Datatype gpart_mpi_type; #endif }; diff --git a/src/part.c b/src/part.c index 6a99325ef23a7062fafb387fa3f3bd6b2203d057..bf4adcf270ac54cd79887481486f946b040e8c28 100644 --- a/src/part.c +++ b/src/part.c @@ -65,4 +65,22 @@ void xpart_create_mpi_type(MPI_Datatype* xpart_type) { MPI_Type_commit(xpart_type); } +/** + * @brief Registers and returns an MPI type for the gparticles + * + * @param gpart_type The type container + */ +void gpart_create_mpi_type(MPI_Datatype* gpart_type) { + + /* This is not the recommended way of doing this. + One should define the structure field by field + But as long as we don't do serialization via MPI-IO + we don't really care. + Also we would have to modify this function everytime something + is added to the part structure. */ + MPI_Type_contiguous(sizeof(struct gpart) / sizeof(unsigned char), MPI_BYTE, + gpart_type); + MPI_Type_commit(gpart_type); +} + #endif diff --git a/src/part.h b/src/part.h index 865403e8c2c157dc5a8ff7a32bc41be676d7919b..03eb28a648070e2735dfc20f4d50730aba7aed2c 100644 --- a/src/part.h +++ b/src/part.h @@ -54,6 +54,7 @@ #ifdef WITH_MPI void part_create_mpi_type(MPI_Datatype* part_type); void xpart_create_mpi_type(MPI_Datatype* xpart_type); +void gpart_create_mpi_type(MPI_Datatype* gpart_type); #endif #endif /* SWIFT_PART_H */