diff --git a/src/partition.c b/src/partition.c index d3b7f7d285904c38c6424656ee27f3fbd0e1f1a2..b3e785ff9c9148069fa0868e382d33f819bbf91c 100644 --- a/src/partition.c +++ b/src/partition.c @@ -1056,55 +1056,54 @@ static void pick_metis(int nodeID, struct space *s, int nregions, #endif #if defined(WITH_MPI) && (defined(HAVE_METIS) || defined(HAVE_PARMETIS)) + +/* Helper struct for partition_gather weights. */ +struct weights_mapper_data { + double *weights_e; + double *weights_v; + idx_t *inds; + int eweights; + int nodeID; + int timebins; + int vweights; + int nr_cells; + struct cell *cells; +}; + +#ifdef SWIFT_DEBUG_CHECKS +static void check_weights(struct task *tasks, int nr_tasks, + struct weights_mapper_data *weights_data, + double *weights_v, double *weights_e); +#endif + /** - * @brief Repartition the cells amongst the nodes using weights of - * various kinds. + * @brief Threadpool mapper function to gather cell edge and vertex weights + * from the associated tasks. * - * @param vweights whether vertex weights will be used. - * @param eweights whether weights will be used. - * @param timebins use timebins as the edge weights. - * @param repartition the partition struct of the local engine. - * @param nodeID our nodeID. - * @param nr_nodes the number of nodes. - * @param s the space of cells holding our local particles. - * @param tasks the completed tasks from the last engine step for our node. - * @param nr_tasks the number of tasks. + * @map_data part of the data to process in this mapper. + * @num_elements the number of data elements to process. + * @extra_data additional data for the mapper context. */ -static void repart_edge_metis(int vweights, int eweights, int timebins, - struct repartition *repartition, int nodeID, - int nr_nodes, struct space *s, struct task *tasks, - int nr_tasks) { +void partition_gather_weights(void *map_data, int num_elements, + void *extra_data) { - /* Create weight arrays using task ticks for vertices and edges (edges - * assume the same graph structure as used in the part_ calls). */ - int nr_cells = s->nr_cells; - struct cell *cells = s->cells_top; + struct task *tasks = (struct task *)map_data; + struct weights_mapper_data *mydata = (struct weights_mapper_data *)extra_data; - /* Allocate and fill the adjncy indexing array defining the graph of - * cells. */ - idx_t *inds; - if ((inds = (idx_t *)malloc(sizeof(idx_t) * 26 * nr_cells)) == NULL) - error("Failed to allocate the inds array"); - graph_init(s, inds, NULL); + double *weights_e = mydata->weights_e; + double *weights_v = mydata->weights_v; + idx_t *inds = mydata->inds; + int eweights = mydata->eweights; + int nodeID = mydata->nodeID; + int nr_cells = mydata->nr_cells; + int timebins = mydata->timebins; + int vweights = mydata->vweights; - /* Allocate and init weights. */ - double *weights_v = NULL; - double *weights_e = NULL; - if (vweights) { - if ((weights_v = (double *)malloc(sizeof(double) * nr_cells)) == NULL) - error("Failed to allocate vertex weights arrays."); - bzero(weights_v, sizeof(double) * nr_cells); - } - if (eweights) { - if ((weights_e = (double *)malloc(sizeof(double) * 26 * nr_cells)) == NULL) - error("Failed to allocate edge weights arrays."); - bzero(weights_e, sizeof(double) * 26 * nr_cells); - } + struct cell *cells = mydata->cells; /* Loop over the tasks... */ - for (int j = 0; j < nr_tasks; j++) { - /* Get a pointer to the kth task. */ - struct task *t = &tasks[j]; + for (int i = 0; i < num_elements; i++) { + struct task *t = &tasks[i]; /* Skip un-interesting tasks. */ if (t->cost == 0.f) continue; @@ -1135,7 +1134,7 @@ static void repart_edge_metis(int vweights, int eweights, int timebins, t->type == task_type_grav_long_range) { /* Particle updates add only to vertex weight. */ - if (vweights) weights_v[cid] += w; + if (vweights) atomic_add_d(&weights_v[cid], w); } /* Self interaction? */ @@ -1143,7 +1142,7 @@ static void repart_edge_metis(int vweights, int eweights, int timebins, (t->type == task_type_sub_self && cj == NULL && ci->nodeID == nodeID)) { /* Self interactions add only to vertex weight. */ - if (vweights) weights_v[cid] += w; + if (vweights) atomic_add_d(&weights_v[cid], w); } @@ -1152,7 +1151,7 @@ static void repart_edge_metis(int vweights, int eweights, int timebins, /* In-cell pair? */ if (ci == cj) { /* Add weight to vertex for ci. */ - if (vweights) weights_v[cid] += w; + if (vweights) atomic_add_d(&weights_v[cid], w); } @@ -1163,8 +1162,8 @@ static void repart_edge_metis(int vweights, int eweights, int timebins, /* Local cells add weight to vertices. */ if (vweights && ci->nodeID == nodeID) { - weights_v[cid] += 0.5 * w; - if (cj->nodeID == nodeID) weights_v[cjd] += 0.5 * w; + atomic_add_d(&weights_v[cid], 0.5 * w); + if (cj->nodeID == nodeID) atomic_add_d(&weights_v[cjd], 0.5 * w); } if (eweights) { @@ -1200,20 +1199,91 @@ static void repart_edge_metis(int vweights, int eweights, int timebins, int dti = num_time_bins - get_time_bin(ci->hydro.ti_end_min); int dtj = num_time_bins - get_time_bin(cj->hydro.ti_end_min); double dt = (double)(1 << dti) + (double)(1 << dtj); - weights_e[ik] += dt; - weights_e[jk] += dt; + atomic_add_d(&weights_e[ik], dt); + atomic_add_d(&weights_e[jk], dt); } else { /* Add weights from task costs to the edge. */ - weights_e[ik] += w; - weights_e[jk] += w; + atomic_add_d(&weights_e[ik], w); + atomic_add_d(&weights_e[jk], w); } } } } } } +} + +/** + * @brief Repartition the cells amongst the nodes using weights of + * various kinds. + * + * @param vweights whether vertex weights will be used. + * @param eweights whether weights will be used. + * @param timebins use timebins as the edge weights. + * @param repartition the partition struct of the local engine. + * @param nodeID our nodeID. + * @param nr_nodes the number of nodes. + * @param s the space of cells holding our local particles. + * @param tasks the completed tasks from the last engine step for our node. + * @param nr_tasks the number of tasks. + */ +static void repart_edge_metis(int vweights, int eweights, int timebins, + struct repartition *repartition, int nodeID, + int nr_nodes, struct space *s, struct task *tasks, + int nr_tasks) { + + /* Create weight arrays using task ticks for vertices and edges (edges + * assume the same graph structure as used in the part_ calls). */ + int nr_cells = s->nr_cells; + struct cell *cells = s->cells_top; + + /* Allocate and fill the adjncy indexing array defining the graph of + * cells. */ + idx_t *inds; + if ((inds = (idx_t *)malloc(sizeof(idx_t) * 26 * nr_cells)) == NULL) + error("Failed to allocate the inds array"); + graph_init(s, inds, NULL); + + /* Allocate and init weights. */ + double *weights_v = NULL; + double *weights_e = NULL; + if (vweights) { + if ((weights_v = (double *)malloc(sizeof(double) * nr_cells)) == NULL) + error("Failed to allocate vertex weights arrays."); + bzero(weights_v, sizeof(double) * nr_cells); + } + if (eweights) { + if ((weights_e = (double *)malloc(sizeof(double) * 26 * nr_cells)) == NULL) + error("Failed to allocate edge weights arrays."); + bzero(weights_e, sizeof(double) * 26 * nr_cells); + } + + /* Gather weights. */ + struct weights_mapper_data weights_data; + + weights_data.cells = cells; + weights_data.eweights = eweights; + weights_data.inds = inds; + weights_data.nodeID = nodeID; + weights_data.nr_cells = nr_cells; + weights_data.timebins = timebins; + weights_data.vweights = vweights; + weights_data.weights_e = weights_e; + weights_data.weights_v = weights_v; + + ticks tic = getticks(); + + threadpool_map(&s->e->threadpool, partition_gather_weights, tasks, nr_tasks, + sizeof(struct task), 0, &weights_data); + if (s->e->verbose) + message("weight mapper took %.3f %s.", clocks_from_ticks(getticks() - tic), + clocks_getunit()); + +#ifdef SWIFT_DEBUG_CHECKS + check_weights(tasks, nr_tasks, &weights_data, weights_v, weights_e); +#endif /* Merge the weights arrays across all nodes. */ int res; @@ -1717,6 +1787,185 @@ static int check_complete(struct space *s, int verbose, int nregions) { return (!failed); } +#if defined(WITH_MPI) && (defined(HAVE_METIS) || defined(HAVE_PARMETIS)) +#ifdef SWIFT_DEBUG_CHECKS +/** + * @brief Check that the threadpool version of the weights construction is + * correct by comparing to the old serial code. + * + * @tasks the list of tasks + * @nr_tasks number of tasks + * @mydata additional values as passed to threadpool + * @ref_weights_v vertex weights to check + * @ref_weights_e edge weights to check + */ +static void check_weights(struct task *tasks, int nr_tasks, + struct weights_mapper_data *mydata, + double *ref_weights_v, double *ref_weights_e) { + + idx_t *inds = mydata->inds; + int eweights = mydata->eweights; + int nodeID = mydata->nodeID; + int nr_cells = mydata->nr_cells; + int timebins = mydata->timebins; + int vweights = mydata->vweights; + + struct cell *cells = mydata->cells; + + /* Allocate and init weights. */ + double *weights_v = NULL; + double *weights_e = NULL; + if (vweights) { + if ((weights_v = (double *)malloc(sizeof(double) * nr_cells)) == NULL) + error("Failed to allocate vertex weights arrays."); + bzero(weights_v, sizeof(double) * nr_cells); + } + if (eweights) { + if ((weights_e = (double *)malloc(sizeof(double) * 26 * nr_cells)) == NULL) + error("Failed to allocate edge weights arrays."); + bzero(weights_e, sizeof(double) * 26 * nr_cells); + } + + /* Loop over the tasks... */ + for (int j = 0; j < nr_tasks; j++) { + + /* Get a pointer to the kth task. */ + struct task *t = &tasks[j]; + + /* Skip un-interesting tasks. */ + if (t->cost == 0.f) continue; + + /* Get the task weight based on costs. */ + double w = (double)t->cost; + + /* Get the top-level cells involved. */ + struct cell *ci, *cj; + for (ci = t->ci; ci->parent != NULL; ci = ci->parent) + ; + if (t->cj != NULL) + for (cj = t->cj; cj->parent != NULL; cj = cj->parent) + ; + else + cj = NULL; + + /* Get the cell IDs. */ + int cid = ci - cells; + + /* Different weights for different tasks. */ + if (t->type == task_type_drift_part || t->type == task_type_drift_gpart || + t->type == task_type_ghost || t->type == task_type_extra_ghost || + t->type == task_type_kick1 || t->type == task_type_kick2 || + t->type == task_type_end_force || t->type == task_type_cooling || + t->type == task_type_timestep || t->type == task_type_init_grav || + t->type == task_type_grav_down || + t->type == task_type_grav_long_range) { + + /* Particle updates add only to vertex weight. */ + if (vweights) weights_v[cid] += w; + } + + /* Self interaction? */ + else if ((t->type == task_type_self && ci->nodeID == nodeID) || + (t->type == task_type_sub_self && cj == NULL && + ci->nodeID == nodeID)) { + /* Self interactions add only to vertex weight. */ + if (vweights) weights_v[cid] += w; + + } + + /* Pair? */ + else if (t->type == task_type_pair || (t->type == task_type_sub_pair)) { + /* In-cell pair? */ + if (ci == cj) { + /* Add weight to vertex for ci. */ + if (vweights) weights_v[cid] += w; + + } + + /* Distinct cells. */ + else { + /* Index of the jth cell. */ + int cjd = cj - cells; + + /* Local cells add weight to vertices. */ + if (vweights && ci->nodeID == nodeID) { + weights_v[cid] += 0.5 * w; + if (cj->nodeID == nodeID) weights_v[cjd] += 0.5 * w; + } + + if (eweights) { + + /* Find indices of ci/cj neighbours. Note with gravity these cells may + * not be neighbours, in that case we ignore any edge weight for that + * pair. */ + int ik = -1; + for (int k = 26 * cid; k < 26 * nr_cells; k++) { + if (inds[k] == cjd) { + ik = k; + break; + } + } + + /* cj */ + int jk = -1; + for (int k = 26 * cjd; k < 26 * nr_cells; k++) { + if (inds[k] == cid) { + jk = k; + break; + } + } + if (ik != -1 && jk != -1) { + + if (timebins) { + /* Add weights to edge for all cells based on the expected + * interaction time (calculated as the time to the last expected + * time) as we want to avoid having active cells on the edges, so + * we cut for that. Note that weight is added to the local and + * remote cells, as we want to keep both away from any cuts, this + * can overflow int, so take care. */ + int dti = num_time_bins - get_time_bin(ci->hydro.ti_end_min); + int dtj = num_time_bins - get_time_bin(cj->hydro.ti_end_min); + double dt = (double)(1 << dti) + (double)(1 << dtj); + weights_e[ik] += dt; + weights_e[jk] += dt; + + } else { + + /* Add weights from task costs to the edge. */ + weights_e[ik] += w; + weights_e[jk] += w; + } + } + } + } + } + } + + /* Now do the comparisons. */ + double refsum = 0.0; + double sum = 0.0; + for (int k = 0; k < nr_cells; k++) { + refsum += ref_weights_v[k]; + sum += weights_v[k]; + } + if (fabs(sum - refsum) > 1.0) { + error("vertex partition weights are not consistent (%f!=%f)", sum, refsum); + } else { + refsum = 0.0; + sum = 0.0; + for (int k = 0; k < 26 * nr_cells; k++) { + refsum += ref_weights_e[k]; + sum += weights_e[k]; + } + if (fabs(sum - refsum) > 1.0) { + error("edge partition weights are not consistent (%f!=%f)", sum, refsum); + } + } + message("partition weights checked successfully"); +} +#endif +#endif + /** * @brief Partition a space of cells based on another space of cells. *