diff --git a/src/engine.c b/src/engine.c index d8139468ee66296ac9da602716276fcd3feda0fd..e75c05ad6e40f75f572d692c83a60d141c12c53d 100644 --- a/src/engine.c +++ b/src/engine.c @@ -494,50 +494,111 @@ static void *engine_do_redistribute(int *counts, char *parts, } #endif -struct redist_mapper_data { - struct space *s; - int *dest; +#ifdef WITH_MPI + +/* Support for engine_redistribute threadpool mappers. */ +/* Struct for sharing data with various mappers. */ +struct redist_mapper { int *counts; + int *dest; + int nodeID; + int nr_nodes; + struct cell *cells; + struct space *s; void *base; }; -void engine_parts_dest_mapper(void *map_data, int num_elements, - void *extra_data) { - struct part *parts = (struct part *)map_data; - - struct redist_mapper_data *data = (struct redist_mapper_data *)extra_data; - const double *dim = data->s->dim; - const double *iwidth = data->s->iwidth; - const int *cdim = data->s->cdim; - struct cell *cells = data->s->cells_top; +/* Generic function for accumulating counts for TYPE parts. */ +#define engine_redistribute_dest_mapper_simple(TYPE) \ + engine_redistribute_dest_mapper_##TYPE(void *map_data, int num_elements, \ + void *extra_data) { \ + struct TYPE *parts = (struct TYPE *)map_data; \ + struct redist_mapper *mydata = (struct redist_mapper *)extra_data; \ + struct space *s = mydata->s; \ + int *dest = \ + mydata->dest + (ptrdiff_t)(parts - (struct TYPE *)mydata->base); \ + for (int k = 0; k < num_elements; k++) { \ + for (int j = 0; j < 3; j++) { \ + if (parts[k].x[j] < 0.0) \ + parts[k].x[j] += s->dim[j]; \ + else if (parts[k].x[j] >= s->dim[j]) \ + parts[k].x[j] -= s->dim[j]; \ + } \ + const int cid = cell_getid(s->cdim, parts[k].x[0] * s->iwidth[0], \ + parts[k].x[1] * s->iwidth[1], \ + parts[k].x[2] * s->iwidth[2]); \ + dest[k] = s->cells_top[cid].nodeID; \ + size_t ind = mydata->nodeID * mydata->nr_nodes + dest[k]; \ + atomic_inc(&mydata->counts[ind]); \ + } \ + } - int *dest = data->dest + (parts - (struct part *)data->base); - int *counts = data->counts; +/* Generic function for accumulating counts for TYPE parts. Note + * we use a local counts array to avoid the atomic_add in the parts + * loop. */ +#define engine_redistribute_dest_mapper_fast(TYPE) \ + engine_redistribute_dest_mapper_##TYPE(void *map_data, int num_elements, \ + void *extra_data) { \ + struct TYPE *parts = (struct TYPE *)map_data; \ + struct redist_mapper *mydata = (struct redist_mapper *)extra_data; \ + struct space *s = mydata->s; \ + int *dest = \ + mydata->dest + (ptrdiff_t)(parts - (struct TYPE *)mydata->base); \ + int *lcounts = NULL; \ + if ((lcounts = (int *)malloc(sizeof(int) * mydata->nr_nodes * \ + mydata->nr_nodes)) == NULL) \ + error("Failed to allocate counts thread-specific buffer"); \ + bzero(lcounts, sizeof(int) * mydata->nr_nodes * mydata->nr_nodes); \ + for (int k = 0; k < num_elements; k++) { \ + for (int j = 0; j < 3; j++) { \ + if (parts[k].x[j] < 0.0) \ + parts[k].x[j] += s->dim[j]; \ + else if (parts[k].x[j] >= s->dim[j]) \ + parts[k].x[j] -= s->dim[j]; \ + } \ + const int cid = cell_getid(s->cdim, parts[k].x[0] * s->iwidth[0], \ + parts[k].x[1] * s->iwidth[1], \ + parts[k].x[2] * s->iwidth[2]); \ + dest[k] = s->cells_top[cid].nodeID; \ + size_t ind = mydata->nodeID * mydata->nr_nodes + dest[k]; \ + lcounts[ind] += 1; \ + } \ + for (int k = 0; k < (mydata->nr_nodes * mydata->nr_nodes); k++) \ + atomic_add(&mydata->counts[k], lcounts[k]); \ + free(lcounts); \ + } - for (int k = 0; k < num_elements; k++) { +/** + * @brief Accumulate the counts of particles per cell. + * @brief Threadpool helper for accumulating the counts of particles per cell. + * + * @param map_data address of the part of parts to process. + * @param num_elements the number of parts to process. + * @param extra_data additional data defining the parts. + */ +static void engine_redistribute_dest_mapper_simple(part); - /* Periodic boundary conditions */ - for (int j = 0; j < 3; j++) { - if (parts[k].x[j] < 0.0) - parts[k].x[j] += dim[j]; - else if (parts[k].x[j] >= dim[j]) - parts[k].x[j] -= dim[j]; - } - const int cid = - cell_getid(cdim, parts[k].x[0] * iwidth[0], parts[k].x[1] * iwidth[1], - parts[k].x[2] * iwidth[2]); -#ifdef SWIFT_DEBUG_CHECKS - if (cid < 0 || cid >= s->nr_cells) - error("Bad cell id %i for part %zu at [%.3e,%.3e,%.3e].", cid, k, - parts[k].x[0], parts[k].x[1], parts[k].x[2]); -#endif +/** + * @brief Accumulate the counts of star particles per cell. + * @brief Threadpool helper for accumulating the counts of particles per cell. + * + * @param map_data address of the spart of sparts to process. + * @param num_elements the number of sparts to process. + * @param extra_data additional data defining the sparts. + */ +static void engine_redistribute_dest_mapper_simple(spart); - dest[k] = cells[cid].nodeID; +/** + * @brief Accumulate the counts of gravity particles per cell. + * @brief Threadpool helper for accumulating the counts of particles per cell. + * + * @param map_data address of the gpart of gparts to process. + * @param num_elements the number of gparts to process. + * @param extra_data additional data defining the gparts. + */ +static void engine_redistribute_dest_mapper_simple(gpart); - /* The counts array is indexed as count[from * nr_nodes + to]. */ - atomic_inc(&counts[dest[k]]); - } -} +#endif /** * @brief Redistribute the particles amongst the nodes according @@ -564,9 +625,6 @@ void engine_redistribute(struct engine *e) { struct space *s = e->s; struct cell *cells = s->cells_top; const int nr_cells = s->nr_cells; - const int *cdim = s->cdim; - const double iwidth[3] = {s->iwidth[0], s->iwidth[1], s->iwidth[2]}; - const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]}; struct part *parts = s->parts; struct gpart *gparts = s->gparts; struct spart *sparts = s->sparts; @@ -583,15 +641,29 @@ void engine_redistribute(struct engine *e) { if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL) error("Failed to allocate dest temporary buffer."); + ticks tic1 = getticks(); + /* Get destination of each particle */ - struct redist_mapper_data parts_data = {s, dest, &counts[nodeID * nr_nodes], - parts}; - threadpool_map(&e->threadpool, engine_parts_dest_mapper, parts, s->nr_parts, - sizeof(struct part), 0, &parts_data); + struct redist_mapper extra_data; + extra_data.s = s; + extra_data.nodeID = nodeID; + extra_data.nr_nodes = nr_nodes; + + extra_data.counts = counts; + extra_data.dest = dest; + extra_data.base = (void *)parts; + + threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_part, parts, + s->nr_parts, sizeof(struct part), 0, &extra_data); + message("1: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); /* Sort the particles according to their cell index. */ + tic1 = getticks(); if (s->nr_parts > 0) space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose); + message("2: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); #ifdef SWIFT_DEBUG_CHECKS /* Verify that the part have been sorted correctly. */ @@ -618,7 +690,8 @@ void engine_redistribute(struct engine *e) { #endif /* We need to re-link the gpart partners of parts. */ - if (s->nr_parts > 0) { + tic1 = getticks(); + if (s->nr_parts > 0 && s->nr_gparts > 0) { int current_dest = dest[0]; size_t count_this_dest = 0; for (size_t k = 0; k < s->nr_parts; ++k) { @@ -645,40 +718,33 @@ void engine_redistribute(struct engine *e) { } } free(dest); + message("3: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); /* Get destination of each s-particle */ + tic1 = getticks(); int *s_counts; if ((s_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL) error("Failed to allocate s_counts temporary buffer."); bzero(s_counts, sizeof(int) * nr_nodes * nr_nodes); + message("4: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); + tic1 = getticks(); int *s_dest; if ((s_dest = (int *)malloc(sizeof(int) * s->nr_sparts)) == NULL) error("Failed to allocate s_dest temporary buffer."); + message("5: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); - for (size_t k = 0; k < s->nr_sparts; k++) { - - /* Periodic boundary conditions */ - for (int j = 0; j < 3; j++) { - if (sparts[k].x[j] < 0.0) - sparts[k].x[j] += dim[j]; - else if (sparts[k].x[j] >= dim[j]) - sparts[k].x[j] -= dim[j]; - } - const int cid = - cell_getid(cdim, sparts[k].x[0] * iwidth[0], sparts[k].x[1] * iwidth[1], - sparts[k].x[2] * iwidth[2]); -#ifdef SWIFT_DEBUG_CHECKS - if (cid < 0 || cid >= s->nr_cells) - error("Bad cell id %i for spart %zu at [%.3e,%.3e,%.3e].", cid, k, - sparts[k].x[0], sparts[k].x[1], sparts[k].x[2]); -#endif - - s_dest[k] = cells[cid].nodeID; + extra_data.counts = s_counts; + extra_data.dest = s_dest; + extra_data.base = (void *)sparts; - /* The counts array is indexed as count[from * nr_nodes + to]. */ - s_counts[nodeID * nr_nodes + s_dest[k]] += 1; - } + threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_spart, sparts, + s->nr_sparts, sizeof(struct spart), 0, &extra_data); + message("6: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); /* Sort the particles according to their cell index. */ if (s->nr_sparts > 0) @@ -739,6 +805,7 @@ void engine_redistribute(struct engine *e) { free(s_dest); /* Get destination of each g-particle */ + tic1 = getticks(); int *g_counts; if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL) error("Failed to allocate g_gcount temporary buffer."); @@ -748,29 +815,14 @@ void engine_redistribute(struct engine *e) { if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL) error("Failed to allocate g_dest temporary buffer."); - for (size_t k = 0; k < s->nr_gparts; k++) { - - /* Periodic boundary conditions */ - for (int j = 0; j < 3; j++) { - if (gparts[k].x[j] < 0.0) - gparts[k].x[j] += dim[j]; - else if (gparts[k].x[j] >= dim[j]) - gparts[k].x[j] -= dim[j]; - } - const int cid = - cell_getid(cdim, gparts[k].x[0] * iwidth[0], gparts[k].x[1] * iwidth[1], - gparts[k].x[2] * iwidth[2]); -#ifdef SWIFT_DEBUG_CHECKS - if (cid < 0 || cid >= s->nr_cells) - error("Bad cell id %i for gpart %zu at [%.3e,%.3e,%.3e].", cid, k, - gparts[k].x[0], gparts[k].x[1], gparts[k].x[2]); -#endif - - g_dest[k] = cells[cid].nodeID; + extra_data.counts = g_counts; + extra_data.dest = g_dest; + extra_data.base = (void *)gparts; - /* The counts array is indexed as count[from * nr_nodes + to]. */ - g_counts[nodeID * nr_nodes + g_dest[k]] += 1; - } + threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_gpart, gparts, + s->nr_gparts, sizeof(struct gpart), 0, &extra_data); + message("7: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); /* Sort the gparticles according to their cell index. */ if (s->nr_gparts > 0) @@ -803,6 +855,7 @@ void engine_redistribute(struct engine *e) { free(g_dest); + tic1 = getticks(); /* Get all the counts from all the nodes. */ if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS) @@ -817,53 +870,57 @@ void engine_redistribute(struct engine *e) { if (MPI_Allreduce(MPI_IN_PLACE, s_counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS) error("Failed to allreduce sparticle transfer counts."); + message("8: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); /* Report how many particles will be moved. */ - if (e->verbose) { - if (e->nodeID == 0) { - size_t total = 0, g_total = 0, s_total = 0; - size_t unmoved = 0, g_unmoved = 0, s_unmoved = 0; - for (int p = 0, r = 0; p < nr_nodes; p++) { - for (int n = 0; n < nr_nodes; n++) { - total += counts[r]; - g_total += g_counts[r]; - s_total += s_counts[r]; - if (p == n) { - unmoved += counts[r]; - g_unmoved += g_counts[r]; - s_unmoved += s_counts[r]; - } - r++; + // if (e->verbose) { + if (e->nodeID == 0) { + size_t total = 0, g_total = 0, s_total = 0; + size_t unmoved = 0, g_unmoved = 0, s_unmoved = 0; + for (int p = 0, r = 0; p < nr_nodes; p++) { + for (int n = 0; n < nr_nodes; n++) { + total += counts[r]; + g_total += g_counts[r]; + s_total += s_counts[r]; + if (p == n) { + unmoved += counts[r]; + g_unmoved += g_counts[r]; + s_unmoved += s_counts[r]; } + r++; } - if (total > 0) - message("%ld of %ld (%.2f%%) of particles moved", total - unmoved, - total, 100.0 * (double)(total - unmoved) / (double)total); - if (g_total > 0) - message("%ld of %ld (%.2f%%) of g-particles moved", g_total - g_unmoved, - g_total, - 100.0 * (double)(g_total - g_unmoved) / (double)g_total); - if (s_total > 0) - message("%ld of %ld (%.2f%%) of s-particles moved", s_total - s_unmoved, - s_total, - 100.0 * (double)(s_total - s_unmoved) / (double)s_total); } + if (total > 0) + message("%ld of %ld (%.2f%%) of particles moved", total - unmoved, total, + 100.0 * (double)(total - unmoved) / (double)total); + if (g_total > 0) + message("%ld of %ld (%.2f%%) of g-particles moved", g_total - g_unmoved, + g_total, 100.0 * (double)(g_total - g_unmoved) / (double)g_total); + if (s_total > 0) + message("%ld of %ld (%.2f%%) of s-particles moved", s_total - s_unmoved, + s_total, 100.0 * (double)(s_total - s_unmoved) / (double)s_total); } + //} /* Now each node knows how many parts, sparts and gparts will be transferred * to every other node. * Get the new numbers of particles for this node. */ + tic1 = getticks(); size_t nr_parts = 0, nr_gparts = 0, nr_sparts = 0; for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID]; for (int k = 0; k < nr_nodes; k++) nr_gparts += g_counts[k * nr_nodes + nodeID]; for (int k = 0; k < nr_nodes; k++) nr_sparts += s_counts[k * nr_nodes + nodeID]; + message("9: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); /* Now exchange the particles, type by type to keep the memory required * under control. */ /* SPH particles. */ + tic1 = getticks(); void *new_parts = engine_do_redistribute(counts, (char *)s->parts, nr_parts, sizeof(struct part), part_align, part_mpi_type, nr_nodes, nodeID); @@ -871,15 +928,21 @@ void engine_redistribute(struct engine *e) { s->parts = (struct part *)new_parts; s->nr_parts = nr_parts; s->size_parts = engine_redistribute_alloc_margin * nr_parts; + message("10: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); /* Extra SPH particle properties. */ + tic1 = getticks(); new_parts = engine_do_redistribute(counts, (char *)s->xparts, nr_parts, sizeof(struct xpart), xpart_align, xpart_mpi_type, nr_nodes, nodeID); free(s->xparts); s->xparts = (struct xpart *)new_parts; + message("11: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); /* Gravity particles. */ + tic1 = getticks(); new_parts = engine_do_redistribute(g_counts, (char *)s->gparts, nr_gparts, sizeof(struct gpart), gpart_align, gpart_mpi_type, nr_nodes, nodeID); @@ -887,8 +950,11 @@ void engine_redistribute(struct engine *e) { s->gparts = (struct gpart *)new_parts; s->nr_gparts = nr_gparts; s->size_gparts = engine_redistribute_alloc_margin * nr_gparts; + message("11: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); /* Star particles. */ + tic1 = getticks(); new_parts = engine_do_redistribute(s_counts, (char *)s->sparts, nr_sparts, sizeof(struct spart), spart_align, spart_mpi_type, nr_nodes, nodeID); @@ -896,11 +962,14 @@ void engine_redistribute(struct engine *e) { s->sparts = (struct spart *)new_parts; s->nr_sparts = nr_sparts; s->size_sparts = engine_redistribute_alloc_margin * nr_sparts; + message("12: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); /* All particles have now arrived. Time for some final operations on the stuff we just received */ /* Restore the part<->gpart and spart<->gpart links */ + tic1 = getticks(); size_t offset_parts = 0, offset_sparts = 0, offset_gparts = 0; for (int node = 0; node < nr_nodes; ++node) { @@ -939,6 +1008,8 @@ void engine_redistribute(struct engine *e) { offset_gparts += count_gparts; offset_sparts += count_sparts; } + message("13: took %.3f %s.", clocks_from_ticks(getticks() - tic1), + clocks_getunit()); /* Clean up the counts now we done. */ free(counts); @@ -978,6 +1049,7 @@ void engine_redistribute(struct engine *e) { #endif /* Be verbose about what just happened. */ + tic1 = getticks(); if (e->verbose) { int my_cells = 0; for (int k = 0; k < nr_cells; k++) @@ -989,8 +1061,10 @@ void engine_redistribute(struct engine *e) { /* Flag that a redistribute has taken place */ e->step_props |= engine_step_prop_redistribute; - // if (e->verbose) - message("took %.3f %s.", clocks_from_ticks(getticks() - tic), + if (e->verbose) + message("took %.3f %s.", clocks_from_ticks(getticks() - tic), + clocks_getunit()); + message("14: took %.3f %s.", clocks_from_ticks(getticks() - tic1), clocks_getunit()); #else error("SWIFT was not compiled with MPI support.");