diff --git a/examples/main.c b/examples/main.c index 6bb16d711cb3d44e8843279e76d4cd24f5ce0cc4..f770265c1b59dff93513a59ff7246258539325e3 100644 --- a/examples/main.c +++ b/examples/main.c @@ -363,7 +363,7 @@ int main(int argc, char *argv[]) { #if defined(WITH_MPI) #if defined(HAVE_PARALLEL_HDF5) read_ic_parallel(ICfileName, dim, &parts, &gparts, &Ngas, &Ngpart, &periodic, - myrank, nr_nodes, MPI_COMM_WORLD, MPI_INFO_NULL); + myrank, nr_nodes, MPI_COMM_WORLD, MPI_INFO_NULL); #else read_ic_serial(ICfileName, dim, &parts, &gparts, &Ngas, &Ngpart, &periodic, myrank, nr_nodes, MPI_COMM_WORLD, MPI_INFO_NULL); @@ -396,8 +396,8 @@ int main(int argc, char *argv[]) { /* MATTHIEU: Temporary fix to preserve master */ if (!with_gravity) { free(gparts); - for(size_t k = 0; k < Ngas; ++k) - parts[k].gpart = NULL; + gparts = NULL; + for (size_t k = 0; k < Ngas; ++k) parts[k].gpart = NULL; Ngpart = 0; #if defined(WITH_MPI) N_long[0] = Ngas; diff --git a/src/cell.c b/src/cell.c index df11782048dfa80c697f53feefe8fabc104eb23b..b0e299999e9073cc14d92b769b3e157f50d64c8f 100644 --- a/src/cell.c +++ b/src/cell.c @@ -89,14 +89,18 @@ int cell_unpack(struct pcell *pc, struct cell *c, struct space *s) { c->ti_end_min = pc->ti_end_min; c->ti_end_max = pc->ti_end_max; c->count = pc->count; + c->gcount = pc->gcount; c->tag = pc->tag; - /* Fill the progeny recursively, depth-first. */ + /* Number of new cells created. */ int count = 1; + + /* Fill the progeny recursively, depth-first. */ for (int k = 0; k < 8; k++) if (pc->progeny[k] >= 0) { struct cell *temp = space_getcell(s); temp->count = 0; + temp->gcount = 0; temp->loc[0] = c->loc[0]; temp->loc[1] = c->loc[1]; temp->loc[2] = c->loc[2]; @@ -122,7 +126,7 @@ int cell_unpack(struct pcell *pc, struct cell *c, struct space *s) { } /** - * @brief Link the cells recursively to the given part array. + * @brief Link the cells recursively to the given #part array. * * @param c The #cell. * @param parts The #part array. @@ -130,7 +134,7 @@ int cell_unpack(struct pcell *pc, struct cell *c, struct space *s) { * @return The number of particles linked. */ -int cell_link(struct cell *c, struct part *parts) { +int cell_link_parts(struct cell *c, struct part *parts) { c->parts = parts; @@ -139,14 +143,40 @@ int cell_link(struct cell *c, struct part *parts) { int offset = 0; for (int k = 0; k < 8; k++) { if (c->progeny[k] != NULL) - offset += cell_link(c->progeny[k], &parts[offset]); + offset += cell_link_parts(c->progeny[k], &parts[offset]); } } - /* Return the total number of unpacked cells. */ + /* Return the total number of linked particles. */ return c->count; } +/** + * @brief Link the cells recursively to the given #gpart array. + * + * @param c The #cell. + * @param gparts The #gpart array. + * + * @return The number of particles linked. + */ + +int cell_link_gparts(struct cell *c, struct gpart *gparts) { + + c->gparts = gparts; + + /* Fill the progeny recursively, depth-first. */ + if (c->split) { + int offset = 0; + for (int k = 0; k < 8; k++) { + if (c->progeny[k] != NULL) + offset += cell_link_gparts(c->progeny[k], &gparts[offset]); + } + } + + /* Return the total number of linked particles. */ + return c->gcount; +} + /** * @brief Pack the data of the given cell and all it's sub-cells. * @@ -164,6 +194,7 @@ int cell_pack(struct cell *c, struct pcell *pc) { pc->ti_end_min = c->ti_end_min; pc->ti_end_max = c->ti_end_max; pc->count = c->count; + pc->gcount = c->gcount; c->tag = pc->tag = atomic_inc(&cell_next_tag) % cell_max_tag; /* Fill in the progeny, depth-first recursion. */ diff --git a/src/cell.h b/src/cell.h index b0451b311fda9c300427da6b3a9a25955090d799..05b870b144dd425ee9e72ee3b529122b6e721523 100644 --- a/src/cell.h +++ b/src/cell.h @@ -44,7 +44,7 @@ struct pcell { int ti_end_min, ti_end_max; /* Number of particles in this cell. */ - int count; + int count, gcount; /* tag used for MPI communication. */ int tag; @@ -175,7 +175,8 @@ void cell_gunlocktree(struct cell *c); int cell_pack(struct cell *c, struct pcell *pc); int cell_unpack(struct pcell *pc, struct cell *c, struct space *s); int cell_getsize(struct cell *c); -int cell_link(struct cell *c, struct part *parts); +int cell_link_parts(struct cell *c, struct part *parts); +int cell_link_gparts(struct cell *c, struct gpart *gparts); void cell_init_parts(struct cell *c, void *data); void cell_convert_hydro(struct cell *c, void *data); void cell_clean_links(struct cell *c, void *data); diff --git a/src/common_io.c b/src/common_io.c index 5fb2d9513ec2acc0cd8d389a226b14d427e02539..6183effe9ce392ab930c581cbd118f025bbce773 100644 --- a/src/common_io.c +++ b/src/common_io.c @@ -502,7 +502,7 @@ void writeXMFline(FILE* xmfFile, char* fileName, char* partTypeGroupName, * @param gparts The array of #gpart freshly read in. * @param Ndm The number of DM particles read in. */ -void prepare_dm_gparts(struct gpart* gparts, size_t Ndm) { +void prepare_dm_gparts(struct gpart* const gparts, size_t Ndm) { /* Let's give all these gparts a negative id */ for (size_t i = 0; i < Ndm; ++i) { @@ -527,8 +527,9 @@ void prepare_dm_gparts(struct gpart* gparts, size_t Ndm) { * @param Ngas The number of gas particles read in. * @param Ndm The number of DM particles read in. */ -void duplicate_hydro_gparts(struct part* parts, struct gpart* gparts, - size_t Ngas, size_t Ndm) { +void duplicate_hydro_gparts(struct part* const parts, + struct gpart* const gparts, size_t Ngas, + size_t Ndm) { for (size_t i = 0; i < Ngas; ++i) { @@ -557,16 +558,19 @@ void duplicate_hydro_gparts(struct part* parts, struct gpart* gparts, * @param dmparts The array of #gpart containg DM particles to be filled. * @param Ndm The number of DM particles. */ -void collect_dm_gparts(struct gpart* gparts, size_t Ntot, struct gpart* dmparts, - size_t Ndm) { +void collect_dm_gparts(const struct gpart* const gparts, size_t Ntot, + struct gpart* const dmparts, size_t Ndm) { size_t count = 0; /* Loop over all gparts */ for (size_t i = 0; i < Ntot; ++i) { + /* message("i=%zd count=%zd id=%lld part=%p", i, count, gparts[i].id, + * gparts[i].part); */ + /* And collect the DM ones */ - if (gparts[i].id < 0) { + if (gparts[i].id < 0LL) { memcpy(&dmparts[count], &gparts[i], sizeof(struct gpart)); dmparts[count].id = -dmparts[count].id; count++; diff --git a/src/common_io.h b/src/common_io.h index 4ad0c6fb754c4288a0c731e2b1e2392998719d52..961f40e63d771e5e06ade525301caf59aae0bceb 100644 --- a/src/common_io.h +++ b/src/common_io.h @@ -78,11 +78,12 @@ extern const char* particle_type_names[]; hid_t hdf5Type(enum DATA_TYPE type); size_t sizeOfType(enum DATA_TYPE type); -void collect_dm_gparts(struct gpart* gparts, size_t Ntot, struct gpart* dmparts, - size_t Ndm); -void prepare_dm_gparts(struct gpart* gparts, size_t Ndm); -void duplicate_hydro_gparts(struct part* parts, struct gpart* gparts, - size_t Ngas, size_t Ndm); +void collect_dm_gparts(const struct gpart* const gparts, size_t Ntot, + struct gpart* const dmparts, size_t Ndm); +void prepare_dm_gparts(struct gpart* const gparts, size_t Ndm); +void duplicate_hydro_gparts(struct part* const parts, + struct gpart* const gparts, size_t Ngas, + size_t Ndm); void readAttribute(hid_t grp, char* name, enum DATA_TYPE type, void* data); diff --git a/src/engine.c b/src/engine.c index 9960c23096e0606ab8fdec14a25a55cf2554dccd..7ddbc2aacb85c3edca941c66a6a1ac15f13ebd93 100644 --- a/src/engine.c +++ b/src/engine.c @@ -139,39 +139,56 @@ void engine_make_ghost_tasks(struct engine *e, struct cell *c, * @brief Redistribute the particles amongst the nodes according * to their cell's node IDs. * + * The strategy here is as follows: + * 1) Each node counts the number of particles it has to send to each other + * node. + * 2) The number of particles of each type is then exchanged. + * 3) The particles to send are placed in a temporary buffer in which the + * part-gpart links are preserved. + * 4) Each node allocates enough space for the new particles. + * 5) (Asynchronous) communications are issued to transfer the data. + * + * * @param e The #engine. */ - void engine_redistribute(struct engine *e) { #ifdef WITH_MPI - int nr_nodes = e->nr_nodes, nodeID = e->nodeID; + const int nr_nodes = e->nr_nodes; + const int nodeID = e->nodeID; struct space *s = e->s; - int my_cells = 0; - int *cdim = s->cdim; struct cell *cells = s->cells; - int nr_cells = s->nr_cells; + const int nr_cells = s->nr_cells; + const int *cdim = s->cdim; + const double ih[3] = {s->ih[0], s->ih[1], s->ih[2]}; + const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]}; + struct part *parts = s->parts; + struct xpart *xparts = s->xparts; + struct gpart *gparts = s->gparts; ticks tic = getticks(); - /* Start by sorting the particles according to their nodes and - getting the counts. The counts array is indexed as - count[from * nr_nodes + to]. */ - int *counts; - size_t *dest; - double ih[3], dim[3]; - ih[0] = s->ih[0]; - ih[1] = s->ih[1]; - ih[2] = s->ih[2]; - dim[0] = s->dim[0]; - dim[1] = s->dim[1]; - dim[2] = s->dim[2]; - if ((counts = (int *)malloc(sizeof(int) *nr_nodes *nr_nodes)) == NULL || - (dest = (size_t *)malloc(sizeof(size_t) * s->nr_parts)) == NULL) - error("Failed to allocate count and dest buffers."); + /* Allocate temporary arrays to store the counts of particles to be sent + and the destination of each particle */ + int *counts, *g_counts; + if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL) + error("Failed to allocate count temporary buffer."); + if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL) + error("Failed to allocate gcount temporary buffer."); bzero(counts, sizeof(int) * nr_nodes * nr_nodes); - struct part *parts = s->parts; + bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes); + + // Allocate the destination index arrays. + int *dest, *g_dest; + if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL) + error("Failed to allocate dest temporary buffer."); + if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL) + error("Failed to allocate g_dest temporary buffer."); + + /* Get destination of each particle */ for (size_t k = 0; k < s->nr_parts; k++) { + + /* Periodic boundary conditions */ for (int j = 0; j < 3; j++) { if (parts[k].x[j] < 0.0) parts[k].x[j] += dim[j]; @@ -184,36 +201,121 @@ void engine_redistribute(struct engine *e) { error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].", cid, k, parts[k].x[0], parts[k].x[1], parts[k].x[2]); */ dest[k] = cells[cid].nodeID; + + /* The counts array is indexed as count[from * nr_nodes + to]. */ counts[nodeID * nr_nodes + dest[k]] += 1; } + + /* Sort the particles according to their cell index. */ space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose); + /* We need to re-link the gpart partners of parts. */ + int current_dest = dest[0]; + size_t count_this_dest = 0; + for (size_t k = 0; k < s->nr_parts; ++k) { + if (s->parts[k].gpart != NULL) { + + /* As the addresses will be invalidated by the communications, we will */ + /* instead store the absolute index from the start of the sub-array */ + /* of particles to be sent to a given node. */ + /* Recall that gparts without partners have a negative id. */ + /* We will restore the pointers on the receiving node later on. */ + if (dest[k] != current_dest) { + current_dest = dest[k]; + count_this_dest = 0; + } + + /* Debug */ + /* if(s->parts[k].gpart->id < 0) */ + /* error("Trying to link a partnerless gpart !"); */ + + s->parts[k].gpart->id = count_this_dest; + count_this_dest++; + } + } + + /* Get destination of each g-particle */ + for (size_t k = 0; k < s->nr_gparts; k++) { + + /* Periodic boundary conditions */ + for (int j = 0; j < 3; j++) { + if (gparts[k].x[j] < 0.0) + gparts[k].x[j] += dim[j]; + else if (gparts[k].x[j] >= dim[j]) + gparts[k].x[j] -= dim[j]; + } + const int cid = cell_getid(cdim, gparts[k].x[0] * ih[0], + gparts[k].x[1] * ih[1], gparts[k].x[2] * ih[2]); + /* if (cid < 0 || cid >= s->nr_cells) + error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].", + cid, k, g_parts[k].x[0], g_parts[k].x[1], g_parts[k].x[2]); */ + g_dest[k] = cells[cid].nodeID; + + /* The counts array is indexed as count[from * nr_nodes + to]. */ + g_counts[nodeID * nr_nodes + g_dest[k]] += 1; + } + + /* Sort the gparticles according to their cell index. */ + space_gparts_sort(gparts, g_dest, s->nr_gparts, 0, nr_nodes - 1); + /* Get all the counts from all the nodes. */ if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS) error("Failed to allreduce particle transfer counts."); - /* Get the new number of parts for this node, be generous in allocating. */ - size_t nr_parts = 0; + /* Get all the g_counts from all the nodes. */ + if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT, + MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS) + error("Failed to allreduce gparticle transfer counts."); + + /* Each node knows how many parts and gparts will be transferred to every + other node. We can start preparing to receive data */ + + /* Get the new number of parts and gparts for this node */ + size_t nr_parts = 0, nr_gparts = 0; for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID]; + for (int k = 0; k < nr_nodes; k++) + nr_gparts += g_counts[k * nr_nodes + nodeID]; + + /* Allocate the new arrays with some extra margin */ struct part *parts_new = NULL; - struct xpart *xparts_new = NULL, *xparts = s->xparts; + struct xpart *xparts_new = NULL; + struct gpart *gparts_new = NULL; if (posix_memalign((void **)&parts_new, part_align, - sizeof(struct part) * nr_parts * 1.2) != 0 || - posix_memalign((void **)&xparts_new, part_align, - sizeof(struct xpart) * nr_parts * 1.2) != 0) + sizeof(struct part) * nr_parts * + engine_redistribute_alloc_margin) != 0) error("Failed to allocate new part data."); - - /* Emit the sends and recvs for the particle data. */ + if (posix_memalign((void **)&xparts_new, xpart_align, + sizeof(struct xpart) * nr_parts * + engine_redistribute_alloc_margin) != 0) + error("Failed to allocate new xpart data."); + if (posix_memalign((void **)&gparts_new, gpart_align, + sizeof(struct gpart) * nr_gparts * + engine_redistribute_alloc_margin) != 0) + error("Failed to allocate new gpart data."); + + /* Prepare MPI requests for the asynchronous communications */ MPI_Request *reqs; - if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 4 * nr_nodes)) == + if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) == NULL) error("Failed to allocate MPI request list."); - for (int k = 0; k < 4 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL; - for (size_t offset_send = 0, offset_recv = 0, k = 0; k < nr_nodes; k++) { - int ind_send = nodeID * nr_nodes + k; - int ind_recv = k * nr_nodes + nodeID; + for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL; + + /* Emit the sends and recvs for the particle and gparticle data. */ + size_t offset_send = 0, offset_recv = 0; + size_t g_offset_send = 0, g_offset_recv = 0; + for (int k = 0; k < nr_nodes; k++) { + + /* Indices in the count arrays of the node of interest */ + const int ind_send = nodeID * nr_nodes + k; + const int ind_recv = k * nr_nodes + nodeID; + + /* Are we sending any part/xpart ? */ if (counts[ind_send] > 0) { + + /* message("Sending %d part to node %d", counts[ind_send], k); */ + + /* If the send is to the same node, just copy */ if (k == nodeID) { memcpy(&parts_new[offset_recv], &s->parts[offset_send], sizeof(struct part) * counts[ind_recv]); @@ -221,36 +323,73 @@ void engine_redistribute(struct engine *e) { sizeof(struct xpart) * counts[ind_recv]); offset_send += counts[ind_send]; offset_recv += counts[ind_recv]; + + /* Else, emit some communications */ } else { - if (MPI_Isend(&s->parts[offset_send], counts[ind_send], - e->part_mpi_type, k, 2 * ind_send + 0, MPI_COMM_WORLD, - &reqs[4 * k]) != MPI_SUCCESS) - error("Failed to isend parts to node %zi.", k); - if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], - e->xpart_mpi_type, k, 2 * ind_send + 1, MPI_COMM_WORLD, - &reqs[4 * k + 1]) != MPI_SUCCESS) - error("Failed to isend xparts to node %zi.", k); + if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type, + k, 3 * ind_send + 0, MPI_COMM_WORLD, + &reqs[6 * k]) != MPI_SUCCESS) + error("Failed to isend parts to node %i.", k); + if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type, + k, 3 * ind_send + 1, MPI_COMM_WORLD, + &reqs[6 * k + 1]) != MPI_SUCCESS) + error("Failed to isend xparts to node %i.", k); offset_send += counts[ind_send]; } } + + /* Are we sending any gpart ? */ + if (g_counts[ind_send] > 0) { + + /* message("Sending %d gpart to node %d", g_counts[ind_send], k); */ + + /* If the send is to the same node, just copy */ + if (k == nodeID) { + memcpy(&gparts_new[g_offset_recv], &s->gparts[g_offset_send], + sizeof(struct gpart) * g_counts[ind_recv]); + g_offset_send += g_counts[ind_send]; + g_offset_recv += g_counts[ind_recv]; + + /* Else, emit some communications */ + } else { + if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send], + gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD, + &reqs[6 * k + 2]) != MPI_SUCCESS) + error("Failed to isend gparts to node %i.", k); + g_offset_send += g_counts[ind_send]; + } + } + + /* Now emit the corresponding Irecv() */ + + /* Are we receiving any part/xpart from this node ? */ if (k != nodeID && counts[ind_recv] > 0) { - if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], e->part_mpi_type, - k, 2 * ind_recv + 0, MPI_COMM_WORLD, - &reqs[4 * k + 2]) != MPI_SUCCESS) - error("Failed to emit irecv of parts from node %zi.", k); - if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], - e->xpart_mpi_type, k, 2 * ind_recv + 1, MPI_COMM_WORLD, - &reqs[4 * k + 3]) != MPI_SUCCESS) - error("Failed to emit irecv of parts from node %zi.", k); + if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], part_mpi_type, k, + 3 * ind_recv + 0, MPI_COMM_WORLD, + &reqs[6 * k + 3]) != MPI_SUCCESS) + error("Failed to emit irecv of parts from node %i.", k); + if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], xpart_mpi_type, + k, 3 * ind_recv + 1, MPI_COMM_WORLD, + &reqs[6 * k + 4]) != MPI_SUCCESS) + error("Failed to emit irecv of xparts from node %i.", k); offset_recv += counts[ind_recv]; } + + /* Are we receiving any gpart from this node ? */ + if (k != nodeID && g_counts[ind_recv] > 0) { + if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv], + gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD, + &reqs[6 * k + 5]) != MPI_SUCCESS) + error("Failed to emit irecv of gparts from node %i.", k); + g_offset_recv += g_counts[ind_recv]; + } } /* Wait for all the sends and recvs to tumble in. */ - MPI_Status stats[4 * nr_nodes]; + MPI_Status stats[6 * nr_nodes]; int res; - if ((res = MPI_Waitall(4 * nr_nodes, reqs, stats)) != MPI_SUCCESS) { - for (int k = 0; k < 4 * nr_nodes; k++) { + if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) { + for (int k = 0; k < 6 * nr_nodes; k++) { char buff[MPI_MAX_ERROR_STRING]; int res; MPI_Error_string(stats[k].MPI_ERROR, buff, &res); @@ -259,6 +398,32 @@ void engine_redistribute(struct engine *e) { error("Failed during waitall for part data."); } + /* We now need to restore the part<->gpart links */ + size_t offset_parts = 0, offset_gparts = 0; + for (int node = 0; node < nr_nodes; ++node) { + + const int ind_recv = node * nr_nodes + nodeID; + const size_t count_parts = counts[ind_recv]; + const size_t count_gparts = g_counts[ind_recv]; + + /* Loop over the gparts received from that node */ + for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) { + + /* Does this gpart have a partner ? */ + if (gparts_new[k].id >= 0) { + + const size_t partner_index = offset_parts + gparts_new[k].id; + + /* Re-link */ + gparts_new[k].part = &parts_new[partner_index]; + gparts_new[k].part->gpart = &gparts_new[k]; + } + } + + offset_parts += count_parts; + offset_gparts += count_gparts; + } + /* Verify that all parts are in the right place. */ /* for ( int k = 0 ; k < nr_parts ; k++ ) { int cid = cell_getid( cdim , parts_new[k].x[0]*ih[0], @@ -268,26 +433,55 @@ void engine_redistribute(struct engine *e) { (nodeID=%i).", k , cells[ cid ].nodeID ); } */ + /* Verify that the links are correct */ + /* MATTHIEU: To be commented out once we are happy */ + for (size_t k = 0; k < nr_gparts; ++k) { + + if (gparts_new[k].id > 0) { + + if (gparts_new[k].part->gpart != &gparts_new[k]) + error("Linking problem !"); + + /* if (gparts_new[k].x[0] != gparts_new[k].part->x[0] || */ + /* gparts_new[k].x[1] != gparts_new[k].part->x[1] || */ + /* gparts_new[k].x[2] != gparts_new[k].part->x[2]) */ + /* error("Linked particles are not at the same position !"); */ + } + } + for (size_t k = 0; k < nr_parts; ++k) { + + if (parts_new[k].gpart != NULL) { + + if (parts_new[k].gpart->part != &parts_new[k]) error("Linking problem !"); + } + } + /* Set the new part data, free the old. */ free(parts); free(xparts); + free(gparts); s->parts = parts_new; s->xparts = xparts_new; + s->gparts = gparts_new; s->nr_parts = nr_parts; - s->size_parts = 1.2 * nr_parts; + s->nr_gparts = nr_gparts; + s->size_parts = engine_redistribute_alloc_margin * nr_parts; + s->size_gparts = engine_redistribute_alloc_margin * nr_gparts; - /* Be verbose about what just happened. */ - for (int k = 0; k < nr_cells; k++) - if (cells[k].nodeID == nodeID) my_cells += 1; - if (e->verbose) - message("node %i now has %zi parts in %i cells.", nodeID, nr_parts, - my_cells); - - /* Clean up other stuff. */ + /* Clean up the temporary stuff. */ free(reqs); free(counts); free(dest); + /* Be verbose about what just happened. */ + if (e->verbose) { + int my_cells = 0; + for (int k = 0; k < nr_cells; k++) + if (cells[k].nodeID == nodeID) my_cells += 1; + message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID, + nr_parts, nr_gparts, my_cells); + } + if (e->verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), clocks_getunit()); @@ -549,31 +743,40 @@ void engine_exchange_cells(struct engine *e) { /* Count the number of particles we need to import and re-allocate the buffer if needed. */ - int count_in = 0; + int count_parts_in = 0, count_gparts_in = 0; for (int k = 0; k < nr_proxies; k++) - for (int j = 0; j < e->proxies[k].nr_cells_in; j++) - count_in += e->proxies[k].cells_in[j]->count; - if (count_in > s->size_parts_foreign) { + for (int j = 0; j < e->proxies[k].nr_cells_in; j++) { + count_parts_in += e->proxies[k].cells_in[j]->count; + count_gparts_in += e->proxies[k].cells_in[j]->gcount; + } + if (count_parts_in > s->size_parts_foreign) { if (s->parts_foreign != NULL) free(s->parts_foreign); - s->size_parts_foreign = 1.1 * count_in; + s->size_parts_foreign = 1.1 * count_parts_in; if (posix_memalign((void **)&s->parts_foreign, part_align, sizeof(struct part) * s->size_parts_foreign) != 0) error("Failed to allocate foreign part data."); } + if (count_gparts_in > s->size_gparts_foreign) { + if (s->gparts_foreign != NULL) free(s->gparts_foreign); + s->size_gparts_foreign = 1.1 * count_gparts_in; + if (posix_memalign((void **)&s->gparts_foreign, gpart_align, + sizeof(struct gpart) * s->size_gparts_foreign) != 0) + error("Failed to allocate foreign gpart data."); + } /* Unpack the cells and link to the particle data. */ struct part *parts = s->parts_foreign; + struct gpart *gparts = s->gparts_foreign; for (int k = 0; k < nr_proxies; k++) { for (int j = 0; j < e->proxies[k].nr_cells_in; j++) { - cell_link(e->proxies[k].cells_in[j], parts); + cell_link_parts(e->proxies[k].cells_in[j], parts); + cell_link_gparts(e->proxies[k].cells_in[j], gparts); parts = &parts[e->proxies[k].cells_in[j]->count]; + gparts = &gparts[e->proxies[k].cells_in[j]->gcount]; } } s->nr_parts_foreign = parts - s->parts_foreign; - - /* Is the parts buffer large enough? */ - if (s->nr_parts_foreign > s->size_parts_foreign) - error("Foreign parts buffer too small."); + s->nr_gparts_foreign = gparts - s->gparts_foreign; /* Free the pcell buffer. */ free(pcells); @@ -591,16 +794,24 @@ void engine_exchange_cells(struct engine *e) { * @brief Exchange straying parts with other nodes. * * @param e The #engine. - * @param offset The index in the parts array as of which the foreign parts - *reside. - * @param ind The ID of the foreign #cell. - * @param N The number of stray parts. + * @param offset_parts The index in the parts array as of which the foreign + * parts reside. + * @param ind_part The foreign #cell ID of each part. + * @param Npart The number of stray parts, contains the number of parts received + * on return. + * @param offset_gparts The index in the gparts array as of which the foreign + * parts reside. + * @param ind_gpart The foreign #cell ID of each gpart. + * @param Ngpart The number of stray gparts, contains the number of gparts + * received on return. * - * @return The number of arrived parts copied to parts and xparts. + * Note that this function does not mess-up the linkage between parts and + * gparts, i.e. the received particles have correct linkeage. */ -int engine_exchange_strays(struct engine *e, int offset, size_t *ind, - size_t N) { +void engine_exchange_strays(struct engine *e, size_t offset_parts, + int *ind_part, size_t *Npart, size_t offset_gparts, + int *ind_gpart, size_t *Ngpart) { #ifdef WITH_MPI @@ -610,25 +821,49 @@ int engine_exchange_strays(struct engine *e, int offset, size_t *ind, /* Re-set the proxies. */ for (int k = 0; k < e->nr_proxies; k++) e->proxies[k].nr_parts_out = 0; - /* Put the parts into the corresponding proxies. */ - for (size_t k = 0; k < N; k++) { - const int node_id = e->s->cells[ind[k]].nodeID; + /* Put the parts and gparts into the corresponding proxies. */ + for (size_t k = 0; k < *Npart; k++) { + /* Get the target node and proxy ID. */ + const int node_id = e->s->cells[ind_part[k]].nodeID; if (node_id < 0 || node_id >= e->nr_nodes) error("Bad node ID %i.", node_id); const int pid = e->proxy_ind[node_id]; - if (pid < 0) + if (pid < 0) { error( "Do not have a proxy for the requested nodeID %i for part with " "id=%llu, x=[%e,%e,%e].", - node_id, s->parts[offset + k].id, s->parts[offset + k].x[0], - s->parts[offset + k].x[1], s->parts[offset + k].x[2]); - proxy_parts_load(&e->proxies[pid], &s->parts[offset + k], - &s->xparts[offset + k], 1); + node_id, s->parts[offset_parts + k].id, + s->parts[offset_parts + k].x[0], s->parts[offset_parts + k].x[1], + s->parts[offset_parts + k].x[2]); + } + + /* Re-link the associated gpart with the buffer offset of the part. */ + if (s->parts[offset_parts + k].gpart != NULL) { + s->parts[offset_parts + k].gpart->id = e->proxies[pid].nr_parts_in; + } + + /* Load the part and xpart into the proxy. */ + proxy_parts_load(&e->proxies[pid], &s->parts[offset_parts + k], + &s->xparts[offset_parts + k], 1); + } + for (size_t k = 0; k < *Ngpart; k++) { + const int node_id = e->s->cells[ind_gpart[k]].nodeID; + if (node_id < 0 || node_id >= e->nr_nodes) + error("Bad node ID %i.", node_id); + const int pid = e->proxy_ind[node_id]; + if (pid < 0) + error( + "Do not have a proxy for the requested nodeID %i for part with " + "id=%lli, x=[%e,%e,%e].", + node_id, s->gparts[offset_parts + k].id, + s->gparts[offset_gparts + k].x[0], s->gparts[offset_parts + k].x[1], + s->gparts[offset_gparts + k].x[2]); + proxy_gparts_load(&e->proxies[pid], &s->gparts[offset_gparts + k], 1); } /* Launch the proxies. */ - MPI_Request reqs_in[2 * engine_maxproxies]; - MPI_Request reqs_out[2 * engine_maxproxies]; + MPI_Request reqs_in[3 * engine_maxproxies]; + MPI_Request reqs_out[3 * engine_maxproxies]; for (int k = 0; k < e->nr_proxies; k++) { proxy_parts_exch1(&e->proxies[k]); reqs_in[k] = e->proxies[k].req_parts_count_in; @@ -652,11 +887,18 @@ int engine_exchange_strays(struct engine *e, int offset, size_t *ind, /* Count the total number of incoming particles and make sure we have enough space to accommodate them. */ - size_t count_in = 0; - for (int k = 0; k < e->nr_proxies; k++) count_in += e->proxies[k].nr_parts_in; - if (e->verbose) message("sent out %zi particles, got %zi back.", N, count_in); - if (offset + count_in > s->size_parts) { - s->size_parts = (offset + count_in) * 1.05; + int count_parts_in = 0; + int count_gparts_in = 0; + for (int k = 0; k < e->nr_proxies; k++) { + count_parts_in += e->proxies[k].nr_parts_in; + count_gparts_in += e->proxies[k].nr_gparts_in; + } + if (e->verbose) { + message("sent out %zi/%zi parts/gparts, got %i/%i back.", *Npart, *Ngpart, + count_parts_in, count_gparts_in); + } + if (offset_parts + count_parts_in > s->size_parts) { + s->size_parts = (offset_parts + count_parts_in) * engine_parts_size_grow; struct part *parts_new = NULL; struct xpart *xparts_new = NULL; if (posix_memalign((void **)&parts_new, part_align, @@ -664,37 +906,61 @@ int engine_exchange_strays(struct engine *e, int offset, size_t *ind, posix_memalign((void **)&xparts_new, part_align, sizeof(struct xpart) * s->size_parts) != 0) error("Failed to allocate new part data."); - memcpy(parts_new, s->parts, sizeof(struct part) * offset); - memcpy(xparts_new, s->xparts, sizeof(struct xpart) * offset); + memcpy(parts_new, s->parts, sizeof(struct part) * offset_parts); + memcpy(xparts_new, s->xparts, sizeof(struct xpart) * offset_parts); free(s->parts); free(s->xparts); s->parts = parts_new; s->xparts = xparts_new; } + if (offset_gparts + count_gparts_in > s->size_gparts) { + s->size_gparts = (offset_gparts + count_gparts_in) * engine_parts_size_grow; + struct gpart *gparts_new = NULL; + if (posix_memalign((void **)&gparts_new, gpart_align, + sizeof(struct gpart) * s->size_gparts) != 0) + error("Failed to allocate new gpart data."); + memcpy(gparts_new, s->gparts, sizeof(struct gpart) * offset_gparts); + free(s->gparts); + s->gparts = gparts_new; + } /* Collect the requests for the particle data from the proxies. */ int nr_in = 0, nr_out = 0; for (int k = 0; k < e->nr_proxies; k++) { if (e->proxies[k].nr_parts_in > 0) { - reqs_in[2 * k] = e->proxies[k].req_parts_in; - reqs_in[2 * k + 1] = e->proxies[k].req_xparts_in; + reqs_in[3 * k] = e->proxies[k].req_parts_in; + reqs_in[3 * k + 1] = e->proxies[k].req_xparts_in; + nr_in += 2; + } else { + reqs_in[3 * k] = reqs_in[3 * k + 1] = MPI_REQUEST_NULL; + } + if (e->proxies[k].nr_gparts_in > 0) { + reqs_in[3 * k + 2] = e->proxies[k].req_gparts_in; nr_in += 1; - } else - reqs_in[2 * k] = reqs_in[2 * k + 1] = MPI_REQUEST_NULL; + } else { + reqs_in[3 * k + 2] = MPI_REQUEST_NULL; + } if (e->proxies[k].nr_parts_out > 0) { - reqs_out[2 * k] = e->proxies[k].req_parts_out; - reqs_out[2 * k + 1] = e->proxies[k].req_xparts_out; + reqs_out[3 * k] = e->proxies[k].req_parts_out; + reqs_out[3 * k + 1] = e->proxies[k].req_xparts_out; + nr_out += 2; + } else { + reqs_out[3 * k] = reqs_out[3 * k + 1] = MPI_REQUEST_NULL; + } + if (e->proxies[k].nr_gparts_out > 0) { + reqs_out[3 * k + 2] = e->proxies[k].req_gparts_out; nr_out += 1; - } else - reqs_out[2 * k] = reqs_out[2 * k + 1] = MPI_REQUEST_NULL; + } else { + reqs_out[3 * k + 2] = MPI_REQUEST_NULL; + } } /* Wait for each part array to come in and collect the new parts from the proxies. */ - size_t count = 0; - for (int k = 0; k < 2 * (nr_in + nr_out); k++) { - int err = 0, pid = MPI_UNDEFINED; - if ((err = MPI_Waitany(2 * e->nr_proxies, reqs_in, &pid, + int count_parts = 0, count_gparts = 0; + for (int k = 0; k < nr_in; k++) { + int err, pid; + if ((err = MPI_Waitany(3 * e->nr_proxies, reqs_in, &pid, MPI_STATUS_IGNORE)) != MPI_SUCCESS) { char buff[MPI_MAX_ERROR_STRING]; int res; @@ -702,26 +968,46 @@ int engine_exchange_strays(struct engine *e, int offset, size_t *ind, error("MPI_Waitany failed (%s).", buff); } if (pid == MPI_UNDEFINED) break; - // message( "request from proxy %i has arrived." , pid ); - if (reqs_in[pid & ~1] == MPI_REQUEST_NULL && - reqs_in[pid | 1] == MPI_REQUEST_NULL) { + // message( "request from proxy %i has arrived." , pid / 3 ); + pid = 3 * (pid / 3); + + /* If all the requests for a given proxy have arrived... */ + if (reqs_in[pid + 0] == MPI_REQUEST_NULL && + reqs_in[pid + 1] == MPI_REQUEST_NULL && + reqs_in[pid + 2] == MPI_REQUEST_NULL) { + /* Copy the particle data to the part/xpart/gpart arrays. */ struct proxy *p = &e->proxies[pid >> 1]; - memcpy(&s->parts[offset + count], p->parts_in, + memcpy(&s->parts[offset_parts + count_parts], p->parts_in, sizeof(struct part) * p->nr_parts_in); - memcpy(&s->xparts[offset + count], p->xparts_in, + memcpy(&s->xparts[offset_parts + count_parts], p->xparts_in, sizeof(struct xpart) * p->nr_parts_in); + memcpy(&s->gparts[offset_gparts + count_gparts], p->gparts_in, + sizeof(struct gpart) * p->nr_gparts_in); /* for (int k = offset; k < offset + count; k++) message( "received particle %lli, x=[%.3e %.3e %.3e], h=%.3e, from node %i.", s->parts[k].id, s->parts[k].x[0], s->parts[k].x[1], s->parts[k].x[2], s->parts[k].h, p->nodeID); */ - count += p->nr_parts_in; + + /* Re-link the gparts. */ + for (int k = 0; k < p->nr_gparts_in; k++) { + struct gpart *gp = &s->gparts[offset_gparts + count_gparts + k]; + if (gp->id >= 0) { + struct part *p = &s->parts[offset_gparts + count_parts + gp->id]; + gp->part = p; + p->gpart = gp; + } + } + + /* Advance the counters. */ + count_parts += p->nr_parts_in; + count_gparts += p->nr_gparts_in; } } /* Wait for all the sends to have finished too. */ if (nr_out > 0) - if (MPI_Waitall(2 * e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != + if (MPI_Waitall(3 * e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS) error("MPI_Waitall on sends failed."); @@ -730,11 +1016,11 @@ int engine_exchange_strays(struct engine *e, int offset, size_t *ind, clocks_getunit()); /* Return the number of harvested parts. */ - return count; + *Npart = count_parts; + *Ngpart = count_gparts; #else error("SWIFT was not compiled with MPI support."); - return 0; #endif } @@ -1929,7 +2215,7 @@ void engine_split(struct engine *e, struct partition *initial_partition) { engine_makeproxies(e); /* Re-allocate the local parts. */ - if (e->nodeID == 0) + if (e->verbose) message("Re-allocating parts array from %zi to %zi.", s->size_parts, (size_t)(s->nr_parts * 1.2)); s->size_parts = s->nr_parts * 1.2; @@ -1937,7 +2223,7 @@ void engine_split(struct engine *e, struct partition *initial_partition) { struct xpart *xparts_new = NULL; if (posix_memalign((void **)&parts_new, part_align, sizeof(struct part) * s->size_parts) != 0 || - posix_memalign((void **)&xparts_new, part_align, + posix_memalign((void **)&xparts_new, xpart_align, sizeof(struct xpart) * s->size_parts) != 0) error("Failed to allocate new part data."); memcpy(parts_new, s->parts, sizeof(struct part) * s->nr_parts); @@ -1946,6 +2232,28 @@ void engine_split(struct engine *e, struct partition *initial_partition) { free(s->xparts); s->parts = parts_new; s->xparts = xparts_new; + + /* Re-link the gparts. */ + for (size_t k = 0; k < s->nr_parts; k++) + if (s->parts[k].gpart != NULL) s->parts[k].gpart->part = &s->parts[k]; + + /* Re-allocate the local gparts. */ + if (e->verbose) + message("Re-allocating gparts array from %zi to %zi.", s->size_gparts, + (size_t)(s->nr_gparts * 1.2)); + s->size_gparts = s->nr_gparts * 1.2; + struct gpart *gparts_new = NULL; + if (posix_memalign((void **)&gparts_new, gpart_align, + sizeof(struct gpart) * s->size_gparts) != 0) + error("Failed to allocate new gpart data."); + memcpy(gparts_new, s->gparts, sizeof(struct gpart) * s->nr_gparts); + free(s->gparts); + s->gparts = gparts_new; + + /* Re-link the parts. */ + for (size_t k = 0; k < s->nr_gparts; k++) + if (s->gparts[k].id > 0) s->gparts[k].part->gpart = &s->gparts[k]; + #else error("SWIFT was not compiled with MPI support."); #endif @@ -2178,8 +2486,7 @@ void engine_init(struct engine *e, struct space *s, float dt, int nr_threads, /* Construct types for MPI communications */ #ifdef WITH_MPI - part_create_mpi_type(&e->part_mpi_type); - xpart_create_mpi_type(&e->xpart_mpi_type); + part_create_mpi_types(); #endif /* First of all, init the barrier and lock it. */ diff --git a/src/engine.h b/src/engine.h index 741ae1f553494e435394f529606b4cb794b0e3d2..4d1860b9eed0203bf9bf75711ec6e6549d837fe7 100644 --- a/src/engine.h +++ b/src/engine.h @@ -62,6 +62,8 @@ extern const char *engine_policy_names[]; #define engine_maxtaskspercell 96 #define engine_maxproxies 64 #define engine_tasksreweight 10 +#define engine_parts_size_grow 1.05 +#define engine_redistribute_alloc_margin 1.2 /* The rank of the engine as a global variable (for messages). */ extern int engine_rank; @@ -160,12 +162,6 @@ struct engine { /* Are we talkative ? */ int verbose; - -#ifdef WITH_MPI - /* MPI data type for the particle transfers */ - MPI_Datatype part_mpi_type; - MPI_Datatype xpart_mpi_type; -#endif }; /* Function prototypes. */ @@ -182,7 +178,9 @@ void engine_init_particles(struct engine *e); void engine_step(struct engine *e); void engine_maketasks(struct engine *e); void engine_split(struct engine *e, struct partition *initial_partition); -int engine_exchange_strays(struct engine *e, int offset, size_t *ind, size_t N); +void engine_exchange_strays(struct engine *e, size_t offset_parts, + int *ind_part, size_t *Npart, size_t offset_gparts, + int *ind_gpart, size_t *Ngpart); void engine_rebuild(struct engine *e); void engine_repartition(struct engine *e); void engine_makeproxies(struct engine *e); diff --git a/src/part.c b/src/part.c index 6a99325ef23a7062fafb387fa3f3bd6b2203d057..b89abdde40fe8c7a57d1e9ac9e18fece83ba1f21 100644 --- a/src/part.c +++ b/src/part.c @@ -26,33 +26,21 @@ #endif /* This object's header. */ +#include "error.h" #include "part.h" #ifdef WITH_MPI -/** - * @brief Registers and returns an MPI type for the particles - * - * @param part_type The type container - */ -void part_create_mpi_type(MPI_Datatype* part_type) { - - /* This is not the recommended way of doing this. - One should define the structure field by field - But as long as we don't do serialization via MPI-IO - we don't really care. - Also we would have to modify this function everytime something - is added to the part structure. */ - MPI_Type_contiguous(sizeof(struct part) / sizeof(unsigned char), MPI_BYTE, - part_type); - MPI_Type_commit(part_type); -} +/* MPI data type for the particle transfers */ +MPI_Datatype part_mpi_type; +MPI_Datatype xpart_mpi_type; +MPI_Datatype gpart_mpi_type; +#endif +#ifdef WITH_MPI /** - * @brief Registers and returns an MPI type for the xparticles - * - * @param xpart_type The type container + * @brief Registers MPI particle types. */ -void xpart_create_mpi_type(MPI_Datatype* xpart_type) { +void part_create_mpi_types() { /* This is not the recommended way of doing this. One should define the structure field by field @@ -60,9 +48,20 @@ void xpart_create_mpi_type(MPI_Datatype* xpart_type) { we don't really care. Also we would have to modify this function everytime something is added to the part structure. */ - MPI_Type_contiguous(sizeof(struct xpart) / sizeof(unsigned char), MPI_BYTE, - xpart_type); - MPI_Type_commit(xpart_type); + if (MPI_Type_contiguous(sizeof(struct part) / sizeof(unsigned char), MPI_BYTE, + &part_mpi_type) != MPI_SUCCESS || + MPI_Type_commit(&part_mpi_type) != MPI_SUCCESS) { + error("Failed to create MPI type for parts."); + } + if (MPI_Type_contiguous(sizeof(struct xpart) / sizeof(unsigned char), + MPI_BYTE, &xpart_mpi_type) != MPI_SUCCESS || + MPI_Type_commit(&xpart_mpi_type) != MPI_SUCCESS) { + error("Failed to create MPI type for xparts."); + } + if (MPI_Type_contiguous(sizeof(struct gpart) / sizeof(unsigned char), + MPI_BYTE, &gpart_mpi_type) != MPI_SUCCESS || + MPI_Type_commit(&gpart_mpi_type) != MPI_SUCCESS) { + error("Failed to create MPI type for gparts."); + } } - #endif diff --git a/src/part.h b/src/part.h index 865403e8c2c157dc5a8ff7a32bc41be676d7919b..5d4c9c88a1acadea3d23a3df618c04da389fb61d 100644 --- a/src/part.h +++ b/src/part.h @@ -35,8 +35,8 @@ /* Some constants. */ #define part_align 64 -#define gpart_align 32 #define xpart_align 32 +#define gpart_align 32 /* Import the right particle definition */ #if defined(MINIMAL_SPH) @@ -52,8 +52,12 @@ #include "./gravity/Default/gravity_part.h" #ifdef WITH_MPI -void part_create_mpi_type(MPI_Datatype* part_type); -void xpart_create_mpi_type(MPI_Datatype* xpart_type); +/* MPI data type for the particle transfers */ +extern MPI_Datatype part_mpi_type; +extern MPI_Datatype xpart_mpi_type; +extern MPI_Datatype gpart_mpi_type; + +void part_create_mpi_types(); #endif #endif /* SWIFT_PART_H */ diff --git a/src/proxy.c b/src/proxy.c index f58847988946c347367a0f172048f8cc96f26914..02263a5653bdcdd2d1bf0a86523ed1a599d4bf21 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -189,20 +189,21 @@ void proxy_parts_exch1(struct proxy *p) { #ifdef WITH_MPI /* Send the number of particles. */ - if (MPI_Isend(&p->nr_parts_out, 1, MPI_INT, p->nodeID, + p->buff_out[0] = p->nr_parts_out; + p->buff_out[1] = p->nr_gparts_out; + if (MPI_Isend(p->buff_out, 2, MPI_INT, p->nodeID, p->mynodeID * proxy_tag_shift + proxy_tag_count, MPI_COMM_WORLD, &p->req_parts_count_out) != MPI_SUCCESS) error("Failed to isend nr of parts."); - // message( "isent particle count (%i) from node %i to node %i." , - // p->nr_parts_out , p->mynodeID , p->nodeID ); fflush(stdout); + /* message( "isent particle counts [%i, %i] from node %i to node %i." , + p->buff_out[0], p->buff_out[1], p->mynodeID , p->nodeID ); fflush(stdout); */ /* Send the particle buffers. */ if (p->nr_parts_out > 0) { - if (MPI_Isend(p->parts_out, sizeof(struct part) * p->nr_parts_out, MPI_BYTE, - p->nodeID, p->mynodeID * proxy_tag_shift + proxy_tag_parts, + if (MPI_Isend(p->parts_out, p->nr_parts_out, part_mpi_type, p->nodeID, + p->mynodeID * proxy_tag_shift + proxy_tag_parts, MPI_COMM_WORLD, &p->req_parts_out) != MPI_SUCCESS || - MPI_Isend(p->xparts_out, sizeof(struct xpart) * p->nr_parts_out, - MPI_BYTE, p->nodeID, + MPI_Isend(p->xparts_out, p->nr_parts_out, xpart_mpi_type, p->nodeID, p->mynodeID * proxy_tag_shift + proxy_tag_xparts, MPI_COMM_WORLD, &p->req_xparts_out) != MPI_SUCCESS) error("Failed to isend part data."); @@ -213,14 +214,20 @@ void proxy_parts_exch1(struct proxy *p) { p->parts_out[k].id, p->parts_out[k].x[0], p->parts_out[k].x[1], p->parts_out[k].x[2], p->parts_out[k].h, p->nodeID);*/ } + if (p->nr_gparts_out > 0) { + if (MPI_Isend(p->gparts_out, p->nr_gparts_out, gpart_mpi_type, p->nodeID, + p->mynodeID * proxy_tag_shift + proxy_tag_gparts, + MPI_COMM_WORLD, &p->req_gparts_out) != MPI_SUCCESS) + error("Failed to isend part data."); + // message( "isent gpart data (%i) to node %i." , p->nr_parts_out , + // p->nodeID ); fflush(stdout); + } /* Receive the number of particles. */ - if (MPI_Irecv(&p->nr_parts_in, 1, MPI_INT, p->nodeID, + if (MPI_Irecv(p->buff_in, 2, MPI_INT, p->nodeID, p->nodeID * proxy_tag_shift + proxy_tag_count, MPI_COMM_WORLD, &p->req_parts_count_in) != MPI_SUCCESS) error("Failed to irecv nr of parts."); -// message( "irecv particle count on node %i from node %i." , p->mynodeID , -// p->nodeID ); fflush(stdout); #else error("SWIFT was not compiled with MPI support."); @@ -231,6 +238,10 @@ void proxy_parts_exch2(struct proxy *p) { #ifdef WITH_MPI + /* Unpack the incomming parts counts. */ + p->nr_parts_in = p->buff_in[0]; + p->nr_gparts_in = p->buff_in[1]; + /* Is there enough space in the buffer? */ if (p->nr_parts_in > p->size_parts_in) { do { @@ -244,19 +255,36 @@ void proxy_parts_exch2(struct proxy *p) { p->size_parts_in)) == NULL) error("Failed to re-allocate parts_in buffers."); } + if (p->nr_gparts_in > p->size_gparts_in) { + do { + p->size_gparts_in *= proxy_buffgrow; + } while (p->nr_gparts_in > p->size_gparts_in); + free(p->gparts_in); + if ((p->gparts_in = (struct gpart *)malloc(sizeof(struct gpart) * + p->size_gparts_in)) == NULL) + error("Failed to re-allocate gparts_in buffers."); + } /* Receive the particle buffers. */ if (p->nr_parts_in > 0) { - if (MPI_Irecv(p->parts_in, sizeof(struct part) * p->nr_parts_in, MPI_BYTE, - p->nodeID, p->nodeID * proxy_tag_shift + proxy_tag_parts, - MPI_COMM_WORLD, &p->req_parts_in) != MPI_SUCCESS || - MPI_Irecv(p->xparts_in, sizeof(struct xpart) * p->nr_parts_in, MPI_BYTE, - p->nodeID, p->nodeID * proxy_tag_shift + proxy_tag_xparts, + if (MPI_Irecv(p->parts_in, p->nr_parts_in, part_mpi_type, p->nodeID, + p->nodeID * proxy_tag_shift + proxy_tag_parts, MPI_COMM_WORLD, + &p->req_parts_in) != MPI_SUCCESS || + MPI_Irecv(p->xparts_in, p->nr_parts_in, xpart_mpi_type, p->nodeID, + p->nodeID * proxy_tag_shift + proxy_tag_xparts, MPI_COMM_WORLD, &p->req_xparts_in) != MPI_SUCCESS) error("Failed to irecv part data."); // message( "irecv particle data (%i) from node %i." , p->nr_parts_in , // p->nodeID ); fflush(stdout); } + if (p->nr_gparts_in > 0) { + if (MPI_Irecv(p->gparts_in, p->nr_gparts_in, gpart_mpi_type, p->nodeID, + p->nodeID * proxy_tag_shift + proxy_tag_gparts, + MPI_COMM_WORLD, &p->req_gparts_in) != MPI_SUCCESS) + error("Failed to irecv gpart data."); + // message( "irecv gpart data (%i) from node %i." , p->nr_gparts_in , + // p->nodeID ); fflush(stdout); + } #else error("SWIFT was not compiled with MPI support."); @@ -303,6 +331,37 @@ void proxy_parts_load(struct proxy *p, const struct part *parts, p->nr_parts_out += N; } +/** + * @brief Load parts onto a proxy for exchange. + * + * @param p The #proxy. + * @param gparts Pointer to an array of #gpart to send. + * @param N The number of parts. + */ + +void proxy_gparts_load(struct proxy *p, const struct gpart *gparts, int N) { + + /* Is there enough space in the buffer? */ + if (p->nr_gparts_out + N > p->size_gparts_out) { + do { + p->size_gparts_out *= proxy_buffgrow; + } while (p->nr_gparts_out + N > p->size_gparts_out); + struct gpart *tp; + if ((tp = (struct gpart *)malloc(sizeof(struct gpart) * + p->size_gparts_out)) == NULL) + error("Failed to re-allocate gparts_out buffers."); + memcpy(tp, p->gparts_out, sizeof(struct gpart) * p->nr_gparts_out); + free(p->gparts_out); + p->gparts_out = tp; + } + + /* Copy the parts and xparts data to the buffer. */ + memcpy(&p->gparts_out[p->nr_gparts_out], gparts, sizeof(struct gpart) * N); + + /* Increase the counters. */ + p->nr_gparts_out += N; +} + /** * @brief Initialize the given proxy. * @@ -352,4 +411,20 @@ void proxy_init(struct proxy *p, int mynodeID, int nodeID) { error("Failed to allocate parts_out buffers."); } p->nr_parts_out = 0; + + /* Allocate the gpart send and receive buffers, if needed. */ + if (p->gparts_in == NULL) { + p->size_gparts_in = proxy_buffinit; + if ((p->gparts_in = (struct gpart *)malloc(sizeof(struct gpart) * + p->size_gparts_in)) == NULL) + error("Failed to allocate gparts_in buffers."); + } + p->nr_gparts_in = 0; + if (p->gparts_out == NULL) { + p->size_gparts_out = proxy_buffinit; + if ((p->gparts_out = (struct gpart *)malloc(sizeof(struct gpart) * + p->size_gparts_out)) == NULL) + error("Failed to allocate gparts_out buffers."); + } + p->nr_gparts_out = 0; } diff --git a/src/proxy.h b/src/proxy.h index 1a0272dd6c948df0130f733bb573477eeff1d0db..5a747187e05a78a109ce4523ebb3c9d5fe2ad717 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -32,7 +32,8 @@ #define proxy_tag_count 0 #define proxy_tag_parts 1 #define proxy_tag_xparts 2 -#define proxy_tag_cells 3 +#define proxy_tag_gparts 3 +#define proxy_tag_cells 4 /* Data structure for the proxy. */ struct proxy { @@ -53,14 +54,21 @@ struct proxy { /* The parts and xparts buffers for input and output. */ struct part *parts_in, *parts_out; struct xpart *xparts_in, *xparts_out; + struct gpart *gparts_in, *gparts_out; int size_parts_in, size_parts_out; int nr_parts_in, nr_parts_out; + int size_gparts_in, size_gparts_out; + int nr_gparts_in, nr_gparts_out; + + /* Buffer to hold the incomming/outgoing particle counts. */ + int buff_out[2], buff_in[2]; /* MPI request handles. */ #ifdef WITH_MPI MPI_Request req_parts_count_out, req_parts_count_in; MPI_Request req_parts_out, req_parts_in; MPI_Request req_xparts_out, req_xparts_in; + MPI_Request req_gparts_out, req_gparts_in; MPI_Request req_cells_count_out, req_cells_count_in; MPI_Request req_cells_out, req_cells_in; #endif @@ -70,6 +78,7 @@ struct proxy { void proxy_init(struct proxy *p, int mynodeID, int nodeID); void proxy_parts_load(struct proxy *p, const struct part *parts, const struct xpart *xparts, int N); +void proxy_gparts_load(struct proxy *p, const struct gpart *gparts, int N); void proxy_parts_exch1(struct proxy *p); void proxy_parts_exch2(struct proxy *p); void proxy_addcell_in(struct proxy *p, struct cell *c); diff --git a/src/scheduler.c b/src/scheduler.c index 327c7b115520ea1ec9ac5d3e22e528ee5f99d572..7bd2efbdd06ff794fa590e2c00ebab63d79ad4a0 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -1098,7 +1098,7 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { break; case task_type_recv: #ifdef WITH_MPI - err = MPI_Irecv(t->ci->parts, t->ci->count, s->part_mpi_type, + err = MPI_Irecv(t->ci->parts, t->ci->count, part_mpi_type, t->ci->nodeID, t->flags, MPI_COMM_WORLD, &t->req); if (err != MPI_SUCCESS) { mpi_error(err, "Failed to emit irecv for particle data."); @@ -1113,7 +1113,7 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { break; case task_type_send: #ifdef WITH_MPI - err = MPI_Isend(t->ci->parts, t->ci->count, s->part_mpi_type, + err = MPI_Isend(t->ci->parts, t->ci->count, part_mpi_type, t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req); if (err != MPI_SUCCESS) { mpi_error(err, "Failed to emit isend for particle data."); @@ -1354,12 +1354,6 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks, s->tasks = NULL; s->tasks_ind = NULL; scheduler_reset(s, nr_tasks); - -/* Construct types for MPI communications */ -#ifdef WITH_MPI - part_create_mpi_type(&s->part_mpi_type); - xpart_create_mpi_type(&s->xpart_mpi_type); -#endif } /** diff --git a/src/scheduler.h b/src/scheduler.h index 9c2a5bd7307817bd536930200ad08e7e37502e08..64c694aea295c13810a20b626055fc6c15eb0af8 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -100,12 +100,6 @@ struct scheduler { /* The node we are working on. */ int nodeID; - -#ifdef WITH_MPI - /* MPI data type for the particle transfers */ - MPI_Datatype part_mpi_type; - MPI_Datatype xpart_mpi_type; -#endif }; /* Function prototypes. */ diff --git a/src/space.c b/src/space.c index 873ad7a395c50a03d357f594105eea42cf23b75b..72801c84619148e2bda34a2f7ae1bf1b1e7227b4 100644 --- a/src/space.c +++ b/src/space.c @@ -305,7 +305,7 @@ void space_regrid(struct space *s, double cell_max, int verbose) { */ void space_rebuild(struct space *s, double cell_max, int verbose) { - + const ticks tic = getticks(); /* Be verbose about this. */ @@ -318,23 +318,15 @@ void space_rebuild(struct space *s, double cell_max, int verbose) { int nr_gparts = s->nr_gparts; struct cell *restrict cells = s->cells; - double ih[3], dim[3]; - int cdim[3]; - ih[0] = s->ih[0]; - ih[1] = s->ih[1]; - ih[2] = s->ih[2]; - dim[0] = s->dim[0]; - dim[1] = s->dim[1]; - dim[2] = s->dim[2]; - cdim[0] = s->cdim[0]; - cdim[1] = s->cdim[1]; - cdim[2] = s->cdim[2]; + const double ih[3] = {s->ih[0], s->ih[1], s->ih[2]}; + const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]}; + const int cdim[3] = {s->cdim[0], s->cdim[1], s->cdim[2]}; /* Run through the particles and get their cell index. */ // tic = getticks(); const size_t ind_size = s->size_parts; - size_t *ind; - if ((ind = (size_t *)malloc(sizeof(size_t) * ind_size)) == NULL) + int *ind; + if ((ind = (int *)malloc(sizeof(int) * ind_size)) == NULL) error("Failed to allocate temporary particle indices."); for (int k = 0; k < nr_parts; k++) { struct part *restrict p = &s->parts[k]; @@ -347,37 +339,91 @@ void space_rebuild(struct space *s, double cell_max, int verbose) { cell_getid(cdim, p->x[0] * ih[0], p->x[1] * ih[1], p->x[2] * ih[2]); cells[ind[k]].count++; } + // message( "getting particle indices took %.3f %s." , + // clocks_from_ticks(getticks() - tic), clocks_getunit()): + + /* Run through the gravity particles and get their cell index. */ + // tic = getticks(); + const size_t gind_size = s->size_gparts; + int *gind; + if ((gind = (int *)malloc(sizeof(int) * gind_size)) == NULL) + error("Failed to allocate temporary g-particle indices."); + for (int k = 0; k < nr_gparts; k++) { + struct gpart *restrict gp = &s->gparts[k]; + for (int j = 0; j < 3; j++) + if (gp->x[j] < 0.0) + gp->x[j] += dim[j]; + else if (gp->x[j] >= dim[j]) + gp->x[j] -= dim[j]; + gind[k] = + cell_getid(cdim, gp->x[0] * ih[0], gp->x[1] * ih[1], gp->x[2] * ih[2]); + cells[gind[k]].gcount++; + } // message( "getting particle indices took %.3f %s." , -// clocks_from_ticks(getticks() - tic), clocks_getunit()): +// clocks_from_ticks(getticks() - tic), clocks_getunit()); #ifdef WITH_MPI /* Move non-local parts to the end of the list. */ - const int nodeID = s->e->nodeID; + const int local_nodeID = s->e->nodeID; for (int k = 0; k < nr_parts; k++) - if (cells[ind[k]].nodeID != nodeID) { + if (cells[ind[k]].nodeID != local_nodeID) { cells[ind[k]].count -= 1; nr_parts -= 1; - struct part tp = s->parts[k]; + const struct part tp = s->parts[k]; s->parts[k] = s->parts[nr_parts]; s->parts[nr_parts] = tp; - struct xpart txp = s->xparts[k]; + if (s->parts[k].gpart != NULL) { + s->parts[k].gpart->part = &s->parts[k]; + } + if (s->parts[nr_parts].gpart != NULL) { + s->parts[nr_parts].gpart->part = &s->parts[nr_parts]; + } + const struct xpart txp = s->xparts[k]; s->xparts[k] = s->xparts[nr_parts]; s->xparts[nr_parts] = txp; - int t = ind[k]; + const int t = ind[k]; ind[k] = ind[nr_parts]; ind[nr_parts] = t; } + /* Move non-local gparts to the end of the list. */ + for (int k = 0; k < nr_gparts; k++) + if (cells[gind[k]].nodeID != local_nodeID) { + cells[gind[k]].gcount -= 1; + nr_gparts -= 1; + const struct gpart tp = s->gparts[k]; + s->gparts[k] = s->gparts[nr_gparts]; + s->gparts[nr_gparts] = tp; + if (s->gparts[k].id > 0) { + s->gparts[k].part->gpart = &s->gparts[k]; + } + if (s->gparts[nr_gparts].id > 0) { + s->gparts[nr_gparts].part->gpart = &s->gparts[nr_gparts]; + } + const int t = gind[k]; + gind[k] = gind[nr_gparts]; + gind[nr_gparts] = t; + } + /* Exchange the strays, note that this potentially re-allocates the parts arrays. */ - s->nr_parts = - nr_parts + engine_exchange_strays(s->e, nr_parts, &ind[nr_parts], - s->nr_parts - nr_parts); + /* TODO: This function also exchanges gparts, but this is shorted-out + until they are fully implemented. */ + size_t nr_parts_exchanged = s->nr_parts - nr_parts; + size_t nr_gparts_exchanged = s->nr_gparts - nr_gparts; + engine_exchange_strays(s->e, nr_parts, &ind[nr_parts], &nr_parts_exchanged, + nr_gparts, &gind[nr_gparts], &nr_gparts_exchanged); + + /* Add post-processing, i.e. re-linking/creating of gparts here. */ + + /* Set the new particle counts. */ + s->nr_parts = nr_parts + nr_parts_exchanged; + s->nr_gparts = nr_gparts + nr_gparts_exchanged; /* Re-allocate the index array if needed.. */ if (s->nr_parts > ind_size) { - size_t *ind_new; - if ((ind_new = (size_t *)malloc(sizeof(size_t) * s->nr_parts)) == NULL) + int *ind_new; + if ((ind_new = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL) error("Failed to allocate temporary particle indices."); memcpy(ind_new, ind, sizeof(size_t) * nr_parts); free(ind); @@ -386,7 +432,7 @@ void space_rebuild(struct space *s, double cell_max, int verbose) { /* Assign each particle to its cell. */ for (int k = nr_parts; k < s->nr_parts; k++) { - struct part *p = &s->parts[k]; + const struct part *const p = &s->parts[k]; ind[k] = cell_getid(cdim, p->x[0] * ih[0], p->x[1] * ih[1], p->x[2] * ih[2]); cells[ind[k]].count += 1; @@ -417,62 +463,21 @@ void space_rebuild(struct space *s, double cell_max, int verbose) { /* We no longer need the indices as of here. */ free(ind); - /* Run through the gravity particles and get their cell index. */ - // tic = getticks(); - const size_t gind_size = s->size_gparts; - size_t *gind; - if ((gind = (size_t *)malloc(sizeof(size_t) * gind_size)) == NULL) - error("Failed to allocate temporary g-particle indices."); - for (int k = 0; k < nr_gparts; k++) { - struct gpart *gp = &s->gparts[k]; - for (int j = 0; j < 3; j++) - if (gp->x[j] < 0.0) - gp->x[j] += dim[j]; - else if (gp->x[j] >= dim[j]) - gp->x[j] -= dim[j]; - gind[k] = - cell_getid(cdim, gp->x[0] * ih[0], gp->x[1] * ih[1], gp->x[2] * ih[2]); - cells[gind[k]].gcount++; - } -// message( "getting particle indices took %.3f %s." , -// clocks_from_ticks(getticks() - tic), clocks_getunit()); - #ifdef WITH_MPI - /* Move non-local gparts to the end of the list. */ - for (int k = 0; k < nr_gparts; k++) - if (cells[ind[k]].nodeID != nodeID) { - cells[ind[k]].gcount -= 1; - nr_gparts -= 1; - struct gpart tp = s->gparts[k]; - s->gparts[k] = s->gparts[nr_gparts]; - s->gparts[nr_gparts] = tp; - int t = ind[k]; - ind[k] = ind[nr_gparts]; - ind[nr_gparts] = t; - } - - /* Exchange the strays, note that this potentially re-allocates - the parts arrays. */ - // s->nr_gparts = - // nr_gparts + engine_exchange_strays(s->e, nr_gparts, &ind[nr_gparts], - // s->nr_gparts - nr_gparts); - if (nr_gparts > 0) - error("Need to implement the exchange of strays for the gparts"); - /* Re-allocate the index array if needed.. */ if (s->nr_gparts > gind_size) { - size_t *gind_new; - if ((gind_new = (size_t *)malloc(sizeof(size_t) * s->nr_gparts)) == NULL) + int *gind_new; + if ((gind_new = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL) error("Failed to allocate temporary g-particle indices."); - memcpy(gind_new, gind, sizeof(size_t) * nr_gparts); + memcpy(gind_new, gind, sizeof(int) * nr_gparts); free(gind); gind = gind_new; } /* Assign each particle to its cell. */ for (int k = nr_gparts; k < s->nr_gparts; k++) { - struct gpart *p = &s->gparts[k]; + const struct gpart *const p = &s->gparts[k]; gind[k] = cell_getid(cdim, p->x[0] * ih[0], p->x[1] * ih[1], p->x[2] * ih[2]); cells[gind[k]].count += 1; @@ -484,8 +489,8 @@ void space_rebuild(struct space *s, double cell_max, int verbose) { #endif - /* Sort the gparts according to their cells. */ - space_gparts_sort(s, gind, nr_gparts, 0, s->nr_cells - 1, verbose); + /* Sort the parts according to their cells. */ + space_gparts_sort(s->gparts, gind, nr_gparts, 0, s->nr_cells - 1); /* Re-link the parts. */ for (int k = 0; k < nr_gparts; k++) @@ -552,7 +557,8 @@ void space_split(struct space *s, struct cell *cells, int verbose) { * @param max highest index. * @param verbose Are we talkative ? */ -void space_parts_sort(struct space *s, size_t *ind, size_t N, int min, int max, + +void space_parts_sort(struct space *s, int *ind, size_t N, int min, int max, int verbose) { const ticks tic = getticks(); @@ -600,7 +606,7 @@ void space_parts_sort(struct space *s, size_t *ind, size_t N, int min, int max, void space_do_parts_sort() { /* Pointers to the sorting data. */ - size_t *ind = space_sort_struct.ind; + int *ind = space_sort_struct.ind; struct part *parts = space_sort_struct.parts; struct xpart *xparts = space_sort_struct.xparts; @@ -896,6 +902,15 @@ void space_do_gparts_sort() { atomic_dec(&space_sort_struct.waiting); } /* main loop. */ + + /* Verify space_sort_struct. */ + /* for ( i = 1 ; i < N ; i++ ) + if ( ind[i-1] > ind[i] ) + error( "Sorting failed (ind[%i]=%i,ind[%i]=%i)." , i-1 , ind[i-1] , i + , ind[i] ); */ + + /* Clean up. */ + free(qstack); } /** diff --git a/src/space.h b/src/space.h index db9463e03084fa52dc94ae58aae31e668faee547..0a19151152d155383a978eb1d27e95e39ebd7323 100644 --- a/src/space.h +++ b/src/space.h @@ -106,6 +106,8 @@ struct space { /* Buffers for parts that we will receive from foreign cells. */ struct part *parts_foreign; size_t nr_parts_foreign, size_parts_foreign; + struct gpart *gparts_foreign; + size_t nr_gparts_foreign, size_gparts_foreign; }; /* Interval stack necessary for parallel particle sorting. */ @@ -118,7 +120,7 @@ struct parallel_sort { struct part *parts; struct gpart *gparts; struct xpart *xparts; - size_t *ind; + int *ind; struct qstack *stack; unsigned int stack_size; volatile unsigned int first, last, waiting; @@ -126,7 +128,7 @@ struct parallel_sort { extern struct parallel_sort space_sort_struct; /* function prototypes. */ -void space_parts_sort(struct space *s, size_t *ind, size_t N, int min, int max, +void space_parts_sort(struct space *s, int *ind, size_t N, int min, int max, int verbose); void space_gparts_sort(struct space *s, size_t *ind, size_t N, int min, int max, int verbose);