diff --git a/src/Makefile.am b/src/Makefile.am index 14e435f663f01d8faa5f12720398b58633300093..1da7a0d955e488c0b96ee209080b5438356a36bc 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -46,7 +46,7 @@ include_HEADERS = space.h runner.h queue.h task.h lock.h cell.h part.h const.h \ hydro_properties.h riemann.h threadpool.h cooling.h cooling_struct.h sourceterms.h \ sourceterms_struct.h statistics.h memswap.h cache.h runner_doiact_vec.h profiler.h \ dump.h logger.h active.h timeline.h xmf.h gravity_properties.h gravity_derivatives.h \ - vector_power.h hydro_space.h sort_part.h + vector_power.h collectgroup.h hydro_space.h sort_part.h # Common source files AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \ @@ -57,7 +57,7 @@ AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \ runner_doiact_fft.c threadpool.c cooling.c sourceterms.c \ statistics.c runner_doiact_vec.c profiler.c dump.c logger.c \ part_type.c xmf.c gravity_properties.c gravity.c \ - hydro_space.c + collectgroup.c hydro_space.c # Include files for distribution, not installation. nobase_noinst_HEADERS = align.h approx_math.h atomic.h cycle.h error.h inline.h kernel_hydro.h kernel_gravity.h \ diff --git a/src/collectgroup.c b/src/collectgroup.c new file mode 100644 index 0000000000000000000000000000000000000000..d115f5ab1b612fc0c82fb219fca392c5e28d7a6f --- /dev/null +++ b/src/collectgroup.c @@ -0,0 +1,200 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2017 Peter W. Draper (p.w.draper@durham.ac.uk) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +/* Config parameters. */ +#include "../config.h" + +/* MPI headers. */ +#ifdef WITH_MPI +#include <mpi.h> +#endif + +/* This object's header. */ +#include "collectgroup.h" + +/* Local headers. */ +#include "engine.h" +#include "error.h" + +#ifdef WITH_MPI + +/* Local collections for MPI reduces. */ +struct mpicollectgroup1 { + size_t updates, g_updates, s_updates; + integertime_t ti_end_min; + int forcerebuild; +}; + +/* Forward declarations. */ +static void mpicollect_create_MPI_type(); + +/** + * @brief MPI datatype for the #mpicollectgroup1 structure. + */ +static MPI_Datatype mpicollectgroup1_type; + +/** + * @brief MPI operator to reduce #mpicollectgroup1 structures. + */ +static MPI_Op mpicollectgroup1_reduce_op; + +#endif + +/** + * @brief Perform any once only initialisations. Must be called once. + */ +void collectgroup_init() { + +#ifdef WITH_MPI + /* Initialise the MPI types. */ + mpicollect_create_MPI_type(); +#endif +} + +/** + * @brief Apply the collectgroup1 values to the engine by copying all the + * values to the engine fields. + * + * @param grp1 The #collectgroup1 + * @param e The #engine + */ +void collectgroup1_apply(struct collectgroup1 *grp1, struct engine *e) { + e->ti_end_min = grp1->ti_end_min; + e->ti_end_max = grp1->ti_end_max; + e->ti_beg_max = grp1->ti_beg_max; + e->updates = grp1->updates; + e->g_updates = grp1->g_updates; + e->s_updates = grp1->s_updates; + e->forcerebuild = grp1->forcerebuild; +} + +/** + * @brief Initialises a collectgroup1 struct ready for processing. + * + * @param grp1 The #collectgroup1 to initialise + * @param updates the number of updated hydro particles on this node this step. + * @param g_updates the number of updated gravity particles on this node this step. + * @param s_updates the number of updated star particles on this node this step. + * @param ti_end_min the minimum end time for next time step after this step. + * @param ti_end_max the maximum end time for next time step after this step. + * @param ti_beg_max the maximum begin time for next time step after this step. + * @param forcerebuild whether a rebuild is required after this step. + */ +void collectgroup1_init(struct collectgroup1 *grp1, size_t updates, + size_t g_updates, size_t s_updates, + integertime_t ti_end_min, + integertime_t ti_end_max, + integertime_t ti_beg_max, + int forcerebuild) { + grp1->updates = updates; + grp1->g_updates = g_updates; + grp1->s_updates = s_updates; + grp1->ti_end_min = ti_end_min; + grp1->ti_end_max = ti_end_max; + grp1->ti_beg_max = ti_beg_max; + grp1->forcerebuild = forcerebuild; +} + +/** + * @brief Do any processing necessary to the group before it can be used. + * + * This may involve an MPI reduction across all nodes. + * + * @param grp1 the #collectgroup1 struct already initialised by a call + * to collectgroup1_init. + */ +void collectgroup1_reduce(struct collectgroup1 *grp1) { + +#ifdef WITH_MPI + + /* Populate an MPI group struct and reduce this across all nodes. */ + struct mpicollectgroup1 mpigrp11; + mpigrp11.updates = grp1->updates; + mpigrp11.g_updates = grp1->g_updates; + mpigrp11.s_updates = grp1->s_updates; + mpigrp11.ti_end_min = grp1->ti_end_min; + mpigrp11.forcerebuild = grp1->forcerebuild; + + struct mpicollectgroup1 mpigrp12; + if (MPI_Allreduce(&mpigrp11, &mpigrp12, 1, mpicollectgroup1_type, + mpicollectgroup1_reduce_op, MPI_COMM_WORLD) + != MPI_SUCCESS) + error("Failed to reduce mpicollection1."); + + /* And update. */ + grp1->updates = mpigrp12.updates; + grp1->g_updates = mpigrp12.g_updates; + grp1->s_updates = mpigrp12.s_updates; + grp1->ti_end_min = mpigrp12.ti_end_min; + grp1->forcerebuild = mpigrp12.forcerebuild; + +#endif +} + + +#ifdef WITH_MPI +/** + * @brief Do the reduction of two structs. + * + * @param mpigrp11 the first struct, this is updated on exit. + * @param mpigrp12 the second struct + */ +static void doreduce1(struct mpicollectgroup1 *mpigrp11, + const struct mpicollectgroup1 *mpigrp12) { + + /* Do what is needed for each part of the collection. */ + /* Sum of updates. */ + mpigrp11->updates += mpigrp12->updates; + mpigrp11->g_updates += mpigrp12->g_updates; + mpigrp11->s_updates += mpigrp12->s_updates; + + /* Minimum end time. */ + mpigrp11->ti_end_min = min(mpigrp11->ti_end_min, mpigrp12->ti_end_min); + + /* Everyone must agree to not rebuild. */ + if (mpigrp11->forcerebuild || mpigrp12->forcerebuild) + mpigrp11->forcerebuild = 1; +} + +/** + * @brief MPI reduce operator for #mpicollectgroup structures. + */ +static void mpicollectgroup1_reduce(void *in, void *inout, int *len, + MPI_Datatype *datatype) { + + for (int i = 0; i < *len; ++i) + doreduce1(&((struct mpicollectgroup1 *)inout)[0], + &((const struct mpicollectgroup1 *)in)[i]); +} + +/** + * @brief Registers any MPI collection types and reduction functions. + */ +static void mpicollect_create_MPI_type() { + + if (MPI_Type_contiguous(sizeof(struct mpicollectgroup1), MPI_BYTE, + &mpicollectgroup1_type) != MPI_SUCCESS || + MPI_Type_commit(&mpicollectgroup1_type) != MPI_SUCCESS) { + error("Failed to create MPI type for mpicollection1."); + } + + /* Create the reduction operation */ + MPI_Op_create(mpicollectgroup1_reduce, 1, &mpicollectgroup1_reduce_op); +} +#endif diff --git a/src/collectgroup.h b/src/collectgroup.h new file mode 100644 index 0000000000000000000000000000000000000000..f9b8e9ccca335a1fdfe2d6cd60af1573754feccd --- /dev/null +++ b/src/collectgroup.h @@ -0,0 +1,57 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2017 Peter W. Draper (p.w.draper@durham.ac.uk) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ +#ifndef SWIFT_COLLECTGROUP_H +#define SWIFT_COLLECTGROUP_H + +/* Config parameters. */ +#include "../config.h" + +/* Standard headers. */ +#include <stddef.h> + +/* Local headers. */ +#include "timeline.h" + +/* Forward declaration of engine struct (to avoid cyclic include). */ +struct engine; + +/* A collection of global quantities that can be processed at the same time. */ +struct collectgroup1 { + + /* Number of particles updated */ + size_t updates, g_updates, s_updates; + + /* Times for the time-step */ + integertime_t ti_end_min, ti_end_max, ti_beg_max; + + /* Force the engine to rebuild? */ + int forcerebuild; +}; + +void collectgroup_init(); +void collectgroup1_apply(struct collectgroup1 *grp1, struct engine *e); +void collectgroup1_init(struct collectgroup1 *grp1, size_t updates, + size_t g_updates, size_t s_updates, + integertime_t ti_end_min, + integertime_t ti_end_max, + integertime_t ti_beg_max, + int forcerebuild); +void collectgroup1_reduce(struct collectgroup1 *grp1); + +#endif /* SWIFT_COLLECTGROUP_H */ diff --git a/src/engine.c b/src/engine.c index afaa5ab297fdd5f4d4a4a1cebaa87cb1318236cd..d38d2c6ada6551fd2b8ff07de6fc86990a80f242 100644 --- a/src/engine.c +++ b/src/engine.c @@ -2802,12 +2802,23 @@ void engine_collect_kick(struct cell *c) { } /** - * @brief Collects the next time-step by making each super-cell recurse - * to collect the minimal of ti_end and the number of updated particles. + * @brief Collects the next time-step and rebuild flag. + * + * The next time-step is determined by making each super-cell recurse to + * collect the minimal of ti_end and the number of updated particles. When in + * MPI mode this routines reduces these across all nodes and also collects the + * forcerebuild flag -- this is so that we only use a single collective MPI + * call per step for all these values. + * + * Note that the results are stored in e->collect_group1 struct not in the + * engine fields, unless apply is true. These can be applied field-by-field + * or all at once using collectgroup1_copy(); * * @param e The #engine. + * @param apply whether to apply the results to the engine or just keep in the + * group1 struct. */ -void engine_collect_timestep(struct engine *e) { +void engine_collect_timestep_and_rebuild(struct engine *e, int apply) { const ticks tic = getticks(); int updates = 0, g_updates = 0, s_updates = 0; @@ -2837,37 +2848,58 @@ void engine_collect_timestep(struct engine *e) { } } -/* Aggregate the data from the different nodes. */ + /* Store these in the temporary collection group. */ + collectgroup1_init(&e->collect_group1, updates,g_updates, s_updates, + ti_end_min,ti_end_max,ti_beg_max,e->forcerebuild); + +/* Aggregate collective data from the different nodes for this step. */ #ifdef WITH_MPI + collectgroup1_reduce(&e->collect_group1); + +#ifdef SWIFT_DEBUG_CHECKS { + /* Check the above using the original MPI calls. */ integertime_t in_i[1], out_i[1]; in_i[0] = 0; out_i[0] = ti_end_min; if (MPI_Allreduce(out_i, in_i, 1, MPI_LONG_LONG_INT, MPI_MIN, MPI_COMM_WORLD) != MPI_SUCCESS) - error("Failed to aggregate t_end_min."); - ti_end_min = in_i[0]; - } - { + error("Failed to aggregate ti_end_min."); + if (in_i[0] != (long long)e->collect_group1.ti_end_min) + error("Failed to get same ti_end_min, is %lld, should be %lld", + in_i[0], e->collect_group1.ti_end_min); + long long in_ll[3], out_ll[3]; out_ll[0] = updates; out_ll[1] = g_updates; out_ll[2] = s_updates; if (MPI_Allreduce(out_ll, in_ll, 3, MPI_LONG_LONG_INT, MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS) - error("Failed to aggregate energies."); - updates = in_ll[0]; - g_updates = in_ll[1]; - s_updates = in_ll[2]; + error("Failed to aggregate particle counts."); + if (in_ll[0] != (long long)e->collect_group1.updates) + error("Failed to get same updates, is %lld, should be %ld", + in_ll[0], e->collect_group1.updates); + if (in_ll[1] != (long long)e->collect_group1.g_updates) + error("Failed to get same g_updates, is %lld, should be %ld", + in_ll[1], e->collect_group1.g_updates); + if (in_ll[2] != (long long)e->collect_group1.s_updates) + error("Failed to get same s_updates, is %lld, should be %ld", + in_ll[2], e->collect_group1.s_updates); + + int buff = 0; + if (MPI_Allreduce(&e->forcerebuild, &buff, 1, MPI_INT, MPI_MAX, + MPI_COMM_WORLD) != MPI_SUCCESS) + error("Failed to aggregate the rebuild flag across nodes."); + if (!!buff != !!e->collect_group1.forcerebuild) + error("Failed to get same rebuild flag from all nodes, is %d," + "should be %d", buff, e->collect_group1.forcerebuild); } +#endif #endif - e->ti_end_min = ti_end_min; - e->ti_end_max = ti_end_max; - e->ti_beg_max = ti_beg_max; - e->updates = updates; - e->g_updates = g_updates; - e->s_updates = s_updates; + /* Apply to the engine, if requested. */ + if (apply) + collectgroup1_apply(&e->collect_group1, e); if (e->verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), @@ -3108,7 +3140,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs) { #endif /* Recover the (integer) end of the next time-step */ - engine_collect_timestep(e); + engine_collect_timestep_and_rebuild(e, 1); clocks_gettime(&time2); @@ -3205,14 +3237,12 @@ void engine_step(struct engine *e) { gravity_exact_force_check(e->s, e, 1e-1); #endif -/* Collect the values of rebuild from all nodes. */ -#ifdef WITH_MPI - int buff = 0; - if (MPI_Allreduce(&e->forcerebuild, &buff, 1, MPI_INT, MPI_MAX, - MPI_COMM_WORLD) != MPI_SUCCESS) - error("Failed to aggregate the rebuild flag across nodes."); - e->forcerebuild = buff; -#endif + /* Collect the values of rebuild from all nodes and recover the (integer) + * end of the next time-step. Do these together to reduce the collective MPI + * calls per step, but some of the gathered information is not applied just + * yet (in case we save a snapshot or drift). */ + engine_collect_timestep_and_rebuild(e, 0); + e->forcerebuild = e->collect_group1.forcerebuild; /* Save some statistics ? */ if (e->time - e->timeLastStatistics >= e->deltaTimeStatistics) { @@ -3248,8 +3278,8 @@ void engine_step(struct engine *e) { e->timeLastStatistics += e->deltaTimeStatistics; } - /* Recover the (integer) end of the next time-step */ - engine_collect_timestep(e); + /* Now apply all the collected time step updates and particle counts. */ + collectgroup1_apply(&e->collect_group1, e); TIMER_TOC2(timer_step); @@ -3985,12 +4015,15 @@ void engine_init(struct engine *e, struct space *s, /* Find the time of the first output */ engine_compute_next_snapshot_time(e); -/* Construct types for MPI communications */ + /* Construct types for MPI communications */ #ifdef WITH_MPI part_create_mpi_types(); stats_create_MPI_type(); #endif + /* Initialise the collection group. */ + collectgroup_init(); + /* Initialize the threadpool. */ threadpool_init(&e->threadpool, e->nr_threads); diff --git a/src/engine.h b/src/engine.h index a0e32ad15b79c364d13d19589f8462ff8705ee29..97b978f9a22032a64f5798c8dcf14159f9f9d1af 100644 --- a/src/engine.h +++ b/src/engine.h @@ -38,6 +38,7 @@ /* Includes. */ #include "clocks.h" +#include "collectgroup.h" #include "cooling_struct.h" #include "gravity_properties.h" #include "parser.h" @@ -243,6 +244,10 @@ struct engine { /* The (parsed) parameter file */ const struct swift_params *parameter_file; + + /* Temporary struct to hold a group of deferable properties (in MPI mode + * these are reduced together, but may not be required just yet). */ + struct collectgroup1 collect_group1; }; /* Function prototypes. */