diff --git a/examples/test_fmm.c b/examples/test_fmm.c index 26e8c6a3046bb6df85428f66ad5e45773fbcb615..03ea2c5f93bb24a23915cac365bfa642cb870154 100644 --- a/examples/test_fmm.c +++ b/examples/test_fmm.c @@ -379,11 +379,44 @@ void cell_split(struct cell *c, struct qsched *s) { /* New tree walk */ /* -------------------------------------------------------------------------- */ + +/** + * @brief Checks whether the cells are direct neighbours ot not. Both cells have + * to be of the same size + * + */ +static inline int are_neighbours(struct cell *ci, struct cell *cj) { + +#ifdef SANITY_CHECKS + if (ci->h != cj->h) + error(" Cells of different size in distance calculation."); +#endif + + /* Maximum allowed distance */ + float min_dist = ci->h; + + /* (Manhattan) Distance between the cells */ + for (int k = 0; k < 3; k++) { + float center_i = ci->loc[k]; + float center_j = cj->loc[k]; + if (fabsf(center_i - center_j) > min_dist) return 0; + } + + return 1; +} + + + + /** - * @brief Interacts all particles in ci with the monopole in cj + * @brief Interacts all count particles in parts + * with the monopole in cj + * + * @param parts An array of particles + * @param count The number of particles in the array + * @param cj The cell with which the particles will interact */ -static inline void make_interact_pc(struct part *parts, int count, - double *loc, struct cell *cj) { +static inline void make_interact_pc(struct part *parts, int count, struct cell *cj) { int j, k; float com[3] = {0.0, 0.0, 0.0}; @@ -429,42 +462,51 @@ static inline void make_interact_pc(struct part *parts, int count, } + + /** - * @brief Checks whether the cells are direct neighbours ot not. Both cells have - * to be of the same size + * @brief Loops all siblings of cells ci and cj and launches all + * the cell-monopole interactions that can be done between the siblings + * + * @param ci The first cell + * @param cj The second cell */ -static inline int are_neighbours(struct cell *ci, struct cell *cj) { +static inline void iact_pair_pc(struct cell *ci, struct cell *cj) { + struct cell *cp, *cps; + #ifdef SANITY_CHECKS if (ci->h != cj->h) - error(" Cells of different size in distance calculation."); + error(" Cells of different size !."); + + if ( ! are_neighbours(ci, cj) ) + error(" Cells that are not neighbours !"); #endif - /* Maximum allowed distance */ - float min_dist = ci->h; + /* Let's split both cells and build all possible non-neighbouring pairs */ + for (cp = ci->firstchild; cp != ci->sibling; cp = cp->sibling) { + for (cps = cj->firstchild; cps != cj->sibling; cps = cps->sibling) { - /* (Manhattan) Distance between the cells */ - for (int k = 0; k < 3; k++) { - float center_i = ci->loc[k]; - float center_j = cj->loc[k]; - if (fabsf(center_i - center_j) > min_dist) return 0; - } + if ( ! are_neighbours(cp, cps) ) { - return 1; -} + make_interact_pc(cp->parts, cp->count, cps); + make_interact_pc(cps->parts, cps->count, cp ); + } + } + } +} -static inline void iact_pair_pc(struct cell *ci, struct cell *cj) { -#ifdef SANITY_CHECKS - if (ci->h != cj->h) - error(" Cells of different size in distance calculation."); -#endif - make_interact_pc(ci->parts, ci->count, ci->loc, cj); - make_interact_pc(cj->parts, cj->count, cj->loc, ci); -} +/** + * @brief Interacts all particles in a cell ci with all particles in another + * cell cj using a simple double for-loop. + * + * @param ci The first cell + * @param cj The second cell + */ static inline void iact_pair_direct(struct cell *ci, struct cell *cj) { @@ -556,6 +598,8 @@ static inline void iact_pair_direct(struct cell *ci, struct cell *cj) { } } + + /** * @brief Decides whether two cells use the direct summation interaction or the * multipole interactions @@ -599,17 +643,25 @@ void iact_pair(struct cell *ci, struct cell *cj) { if (are_neighbours(cp, cps)) { iact_pair(cp, cps); } - else { /* Otherwise do cell-mulitpole interactions */ - iact_pair_pc(cp, cps); - } } } + + /* Build all possible cell-multipole interaction pairs */ + iact_pair_pc(ci, cj); + } else { /* Otherwise, compute the interactions at this level directly. */ iact_pair_direct(ci, cj); } } + +/** + * @brief Interacts all particles in cell c with themselves + * either by recursing or by direct summation + * + * @param c The #cell containing the particles + */ void iact_self_direct(struct cell *c) { int i, j, k, count = c->count; float xi[3] = {0.0, 0.0, 0.0}; @@ -702,6 +754,7 @@ void iact_self_direct(struct cell *c) { } /* otherwise, compute interactions directly. */ } + /** * @brief Create the tasks for the cell pair/self. * @@ -746,14 +799,13 @@ void create_tasks(struct qsched *s, struct cell *ci, struct cell *cj) { data[1] = NULL; /* Create the task. */ - tid = - qsched_addtask(s, task_type_self, task_flag_none, data, - sizeof(struct cell *) * 2, ci->count * ci->count / 2); + tid = qsched_addtask(s, task_type_self, task_flag_none, data, + sizeof(struct cell *) * 2, ci->count * ci->count / 2); /* Add the resource (i.e. the cell) to the new task. */ qsched_addlock(s, tid, ci->res); -/* If this call might recurse, add a dependency on the cell's COM +/* Since this call might recurse, add a dependency on the cell's COM task. */ #ifdef COM_AS_TASK if (ci->split) qsched_addunlock(s, ci->com_tid, tid); @@ -763,55 +815,55 @@ void create_tasks(struct qsched *s, struct cell *ci, struct cell *cj) { /* Otherwise, it's a pair. */ } else { - /* Are the cells NOT neighbours ? */ - if (!are_neighbours(ci, cj)) { - - /* We can do particle-monopole tasks */ + /* Are both cells split and we are above the task limit ? */ + if (ci->split && cj->split && ci->count > task_limit / cj->count) { + /* Let's split both cells and build all possible pairs */ + for (cp = ci->firstchild; cp != ci->sibling; cp = cp->sibling) { + for (cps = cj->firstchild; cps != cj->sibling; cps = cps->sibling) { + + /* Recurse */ + if (are_neighbours(cp, cps)) { + create_tasks(s, cp, cps); + } + } + } + + /* Let's also build a particle-monopole task */ + /* Create the task. */ data[0] = ci; data[1] = cj; tid = qsched_addtask(s, task_type_pair_pc, task_flag_none, data, sizeof(struct cell *) * 2, ci->count + cj->count); - + /* Add the resource and dependance */ qsched_addlock(s, tid, ci->res); qsched_addlock(s, tid, cj->res); + #ifdef COM_AS_TASK qsched_addunlock(s, cj->com_tid, tid); qsched_addunlock(s, ci->com_tid, tid); #endif + + } else { /* Otherwise, at least one of the cells is not split, build a direct + * interaction. */ - } else {/* Cells are direct neighbours */ - - /* Are both cells split and we are above the task limit ? */ - if (ci->split && cj->split && ci->count > task_limit / cj->count) { - - /* Let's split both cells and build all possible pairs */ - for (cp = ci->firstchild; cp != ci->sibling; cp = cp->sibling) { - for (cps = cj->firstchild; cps != cj->sibling; cps = cps->sibling) { - /* Recurse */ - create_tasks(s, cp, cps); - } - } - /* Otherwise, at least one of the cells is not split, build a direct - * interaction. */ - } else { - - /* Set the data. */ - data[0] = ci; - data[1] = cj; + /* Set the data. */ + data[0] = ci; + data[1] = cj; + + /* Create the task. */ + tid = qsched_addtask(s, task_type_pair, task_flag_none, data, + sizeof(struct cell *) * 2, ci->count * cj->count); + + /* Add the resources. */ + qsched_addlock(s, tid, ci->res); + qsched_addlock(s, tid, cj->res); - /* Create the task. */ - tid = qsched_addtask(s, task_type_pair, task_flag_none, data, - sizeof(struct cell *) * 2, ci->count * cj->count); + } - /* Add the resources. */ - qsched_addlock(s, tid, ci->res); - qsched_addlock(s, tid, cj->res); - } - } /* Cells are direct neighbours */ } /* Otherwise it's a pair */ }