Commit 1c729318 authored by Matthieu Schaller's avatar Matthieu Schaller
Browse files

Add support for reduction over MPI of the 'statistics' structure.

parent e4486f24
......@@ -2460,61 +2460,28 @@ void engine_collect_timestep(struct engine *e) {
void engine_print_stats(struct engine *e) {
const ticks tic = getticks();
const struct space *s = e->s;
struct statistics stats;
bzero(&stats, sizeof(struct statistics));
stats_init(&stats);
/* Collect the stats on this node */
stats_collect(s, &stats);
stats_collect(e->s, &stats);
/* Aggregate the data from the different nodes. */
#ifdef WITH_MPI
/* { */
/* double in[12] = {0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.}; */
/* double out[12]; */
/* out[0] = e_kin; */
/* out[1] = e_int; */
/* out[2] = e_pot; */
/* out[3] = e_rad; */
/* out[4] = mom[0]; */
/* out[5] = mom[1]; */
/* out[6] = mom[2]; */
/* out[7] = ang_mom[0]; */
/* out[8] = ang_mom[1]; */
/* out[9] = ang_mom[2]; */
/* out[10] = mass; */
/* out[11] = entropy; */
/* if (MPI_Reduce(out, in, 11, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD) != */
/* MPI_SUCCESS) */
/* error("Failed to aggregate stats."); */
/* e_kin = out[0]; */
/* e_int = out[1]; */
/* e_pot = out[2]; */
/* e_rad = out[3]; */
/* mom[0] = out[4]; */
/* mom[1] = out[5]; */
/* mom[2] = out[6]; */
/* ang_mom[0] = out[7]; */
/* ang_mom[1] = out[8]; */
/* ang_mom[2] = out[9]; */
/* mass = out[10]; */
/* entropy = out[11]; */
/* } */
#endif
struct statistics global_stats;
stats_init(&global_stats);
const double E_tot = stats.E_kin + stats.E_int + stats.E_pot;
if (MPI_Reduce(&stats, &global_stats, 1, statistics_mpi_type,
statistics_mpi_reduce_op, 0, MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to aggregate stats.");
#else
struct statistics global_stats = stats;
#endif
/* Print info */
if (e->nodeID == 0) {
fprintf(e->file_stats,
" %14e %14e %14e %14e %14e %14e %14e %14e %14e %14e %14e %14e %14e "
"%14e\n",
e->time, stats.mass, E_tot, stats.E_kin, stats.E_int, stats.E_pot,
stats.E_rad, stats.entropy, stats.mom[0], stats.mom[1],
stats.mom[2], stats.ang_mom[0], stats.ang_mom[1], stats.ang_mom[2]);
fflush(e->file_stats);
}
if (e->nodeID == 0)
stats_print_to_file(e->file_stats, &global_stats, e->time);
if (e->verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
......@@ -3483,6 +3450,7 @@ void engine_init(struct engine *e, struct space *s,
/* Construct types for MPI communications */
#ifdef WITH_MPI
part_create_mpi_types();
stats_create_MPI_type();
#endif
/* Initialize the threadpool. */
......
......@@ -23,6 +23,11 @@
/* Some standard headers. */
#include <string.h>
/* MPI headers. */
#ifdef WITH_MPI
#include <mpi.h>
#endif
/* This object's header. */
#include "statistics.h"
......@@ -35,13 +40,58 @@
#include "threadpool.h"
/**
* @brief Information required to compute the statistics
* @brief Information required to compute the statistics in the mapper
*/
struct index_data {
const struct space *s;
struct statistics *stats;
};
/**
* @brief Adds the content of one #statistics aggregator to another one.
*
* Performs a += b;
*
* @param a The #statistics structure to update.
* @param b The #statistics structure to add to a.
*/
void stats_add(struct statistics *a, const struct statistics *b) {
a->E_kin += b->E_kin;
a->E_int += b->E_int;
a->E_pot += b->E_pot;
a->E_rad += b->E_rad;
a->entropy += b->entropy;
a->mass += b->mass;
a->mom[0] += b->mom[0];
a->mom[1] += b->mom[1];
a->mom[2] += b->mom[2];
a->ang_mom[0] += b->ang_mom[0];
a->ang_mom[1] += b->ang_mom[1];
a->ang_mom[2] += b->ang_mom[2];
}
/**
* @brief Initialises a statistics aggregator to a valid state.
*
* @param s The #statistics aggregator to initialise
*/
void stats_init(struct statistics *s) {
/* Zero everything */
bzero(s, sizeof(struct statistics));
/* Set the lock */
lock_init(&s->lock);
}
/**
* @brief The #threadpool mapper function used to collect statistics
*
* @param map_data Pointer to the particles.
* @param nr_parts The number of particles in this chunk
* @param extra_data The #statistics aggregator.
*/
void stats_collect_part_mapper(void *map_data, int nr_parts, void *extra_data) {
/* Unpack the data */
......@@ -96,25 +146,16 @@ void stats_collect_part_mapper(void *map_data, int nr_parts, void *extra_data) {
}
/* Now write back to memory */
if (lock_lock(&global_stats->lock) == 0) {
global_stats->E_kin += stats.E_kin;
global_stats->E_int += stats.E_int;
global_stats->E_pot += stats.E_pot;
global_stats->E_rad += stats.E_rad;
global_stats->entropy += stats.entropy;
global_stats->mass += stats.mass;
global_stats->mom[0] += stats.mom[0];
global_stats->mom[1] += stats.mom[1];
global_stats->mom[2] += stats.mom[2];
global_stats->ang_mom[0] += stats.ang_mom[0];
global_stats->ang_mom[1] += stats.ang_mom[1];
global_stats->ang_mom[2] += stats.ang_mom[2];
}
if (lock_unlock(&global_stats->lock) != 0)
error("Failed to unlock stats holder.");
if (lock_lock(&global_stats->lock) == 0) stats_add(global_stats, &stats);
if (lock_unlock(&global_stats->lock) != 0) error("Failed to unlock stats.");
}
/**
* @brief Collect physical statistics over all particles in a #space.
*
* @param s The #space to collect from.
* @param stats The #statistics aggregator to fill.
*/
void stats_collect(const struct space *s, struct statistics *stats) {
const int chunk_size = 1000;
......@@ -128,3 +169,61 @@ void stats_collect(const struct space *s, struct statistics *stats) {
threadpool_map(&s->e->threadpool, stats_collect_part_mapper, s->parts,
s->nr_parts, sizeof(struct part), chunk_size, &extra_data);
}
/**
* @brief Prints the content of a #statistics aggregator to a file
*
* @param file File to write to.
* @param stats The #statistics object to write to the file
* @param time The current physical time.
*/
void stats_print_to_file(FILE *file, const struct statistics *stats,
double time) {
const double E_tot = stats->E_kin + stats->E_int + stats->E_pot;
fprintf(file,
" %14e %14e %14e %14e %14e %14e %14e %14e %14e %14e %14e %14e %14e "
"%14e\n",
time, stats->mass, E_tot, stats->E_kin, stats->E_int, stats->E_pot,
stats->E_rad, stats->entropy, stats->mom[0], stats->mom[1],
stats->mom[2], stats->ang_mom[0], stats->ang_mom[1],
stats->ang_mom[2]);
fflush(file);
}
#ifdef WITH_MPI
MPI_Datatype statistics_mpi_type;
MPI_Op statistics_mpi_reduce_op;
/**
* @brief MPI reduce operator for #statistics structures.
*/
void stats_add_MPI(void *in, void *inout, int *len, MPI_Datatype *datatype) {
for (int i = 0; i < *len; ++i)
stats_add(&((struct statistics *)inout)[0], &((struct statistics *)in)[i]);
}
/**
* @brief Registers MPI #statistics type and reduction function.
*/
void stats_create_MPI_type() {
/* This is not the recommended way of doing this.
One should define the structure field by field
But as long as we don't do serialization via MPI-IO
we don't really care.
Also we would have to modify this function everytime something
is added to the statistics structure. */
if (MPI_Type_contiguous(sizeof(struct statistics) / sizeof(unsigned char),
MPI_BYTE, &statistics_mpi_type) != MPI_SUCCESS ||
MPI_Type_commit(&statistics_mpi_type) != MPI_SUCCESS) {
error("Failed to create MPI type for statistics.");
}
/* Create the reduction operation */
MPI_Op_create(stats_add_MPI, 1, &statistics_mpi_reduce_op);
}
#endif
......@@ -60,5 +60,17 @@ struct statistics {
};
void stats_collect(const struct space* s, struct statistics* stats);
void stats_add(struct statistics* a, const struct statistics* b);
void stats_print_to_file(FILE* file, const struct statistics* stats,
double time);
void stats_init(struct statistics* s);
#ifdef WITH_MPI
extern MPI_Datatype statistics_mpi_type;
extern MPI_Op statistics_mpi_reduce_op;
void stats_add_MPI(void* in, void* out, int* len, MPI_Datatype* datatype);
void stats_create_MPI_type();
#endif
#endif /* SWIFT_STATISTICS_H */
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment