Commit 48bd17a9 authored by Matthieu Schaller's avatar Matthieu Schaller
Browse files

Merge branch 'logger_mpi2' into 'master'

Logger mpi

See merge request !970
parents 39ff4fb3 a6ed09c1
......@@ -169,6 +169,7 @@ int main(int argc, char *argv[]) {
int with_drift_all = 0;
int with_mpole_reconstruction = 0;
int with_structure_finding = 0;
int with_logger = 0;
int with_eagle = 0;
int verbose = 0;
int nr_threads = 1;
......@@ -252,6 +253,8 @@ int main(int argc, char *argv[]) {
"Overwrite the CPU "
"frequency (Hz) to be used for time measurements.",
NULL, 0, 0),
OPT_BOOLEAN(0, "logger", &with_logger, "Run with the particle logger.",
NULL, 0, 0),
OPT_INTEGER('n', "steps", &nsteps,
"Execute a fixed number of time steps. When unset use the "
"time_end parameter to stop.",
......@@ -330,6 +333,15 @@ int main(int argc, char *argv[]) {
}
#endif
#if !defined(WITH_LOGGER)
if (with_logger) {
printf(
"Error: the particle logger is not available, please compile with "
"--enable-logger.");
return 1;
}
#endif
#ifndef HAVE_FE_ENABLE_EXCEPT
if (with_fp_exceptions) {
printf("Error: no support for floating point exceptions\n");
......@@ -583,12 +595,9 @@ int main(int argc, char *argv[]) {
#ifdef WITH_MPI
if (with_mpole_reconstruction && nr_nodes > 1)
error("Cannot reconstruct m-poles every step over MPI (yet).");
#ifdef WITH_LOGGER
error("Can't run with the particle logger over MPI (yet)");
#endif
#endif
/* Temporary early aborts for modes not supported with hand-vec. */
/* Temporary early aborts for modes not supported with hand-vec. */
#if defined(WITH_VECTORIZATION) && defined(GADGET2_SPH) && \
!defined(CHEMISTRY_NONE)
error(
......@@ -1143,6 +1152,7 @@ int main(int argc, char *argv[]) {
if (with_structure_finding)
engine_policies |= engine_policy_structure_finding;
if (with_fof) engine_policies |= engine_policy_fof;
if (with_logger) engine_policies |= engine_policy_logger;
/* Initialize the engine with the space and policies. */
if (myrank == 0) clocks_gettime(&tic);
......@@ -1205,7 +1215,12 @@ int main(int argc, char *argv[]) {
#ifdef WITH_MPI
/* Split the space. */
engine_split(&e, &initial_partition);
/* Turn off the logger to avoid writing the communications */
if (with_logger) e.policy &= ~engine_policy_logger;
engine_redistribute(&e);
/* Turn it back on */
if (with_logger) e.policy |= engine_policy_logger;
#endif
/* Initialise the particles */
......@@ -1213,8 +1228,10 @@ int main(int argc, char *argv[]) {
/* Write the state of the system before starting time integration. */
#ifdef WITH_LOGGER
logger_log_all(e.logger, &e);
engine_dump_index(&e);
if (e.policy & engine_policy_logger) {
logger_log_all(e.logger, &e);
engine_dump_index(&e);
}
#endif
/* Dump initial state snapshot, if not working with an output list */
if (!e.output_list_snapshots) engine_dump_snapshot(&e);
......@@ -1416,11 +1433,16 @@ int main(int argc, char *argv[]) {
engine_print_stats(&e);
}
#ifdef WITH_LOGGER
logger_log_all(e.logger, &e);
if (e.policy & engine_policy_logger) {
logger_log_all(e.logger, &e);
/* Write a final index file */
engine_dump_index(&e);
/* Write a sentinel timestamp */
logger_log_timestamp(e.logger, e.ti_current, e.time,
&e.logger->timestamp_offset);
/* Write a sentinel timestamp */
logger_log_timestamp(e.logger, e.ti_current, e.time,
&e.logger->timestamp_offset);
}
#endif
/* Write final snapshot? */
......
......@@ -122,7 +122,8 @@ const char *engine_policy_names[] = {"none",
"black holes",
"fof search",
"time-step limiter",
"time-step sync"};
"time-step sync",
"logger"};
/** The rank of the engine as a global variable (for messages). */
int engine_rank;
......@@ -494,7 +495,6 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
size_t *Nbpart) {
#ifdef WITH_MPI
struct space *s = e->s;
ticks tic = getticks();
......@@ -540,6 +540,19 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
/* Load the part and xpart into the proxy. */
proxy_parts_load(&e->proxies[pid], &s->parts[offset_parts + k],
&s->xparts[offset_parts + k], 1);
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
const uint32_t logger_flag =
logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
/* Log the particle when leaving a rank. */
logger_log_part(
e->logger, &s->parts[offset_parts + k], &s->xparts[offset_parts + k],
logger_masks_all_part | logger_mask_data[logger_special_flags].mask,
logger_flag);
}
#endif
}
/* Put the sparts into the corresponding proxies. */
......@@ -575,6 +588,19 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
/* Load the spart into the proxy */
proxy_sparts_load(&e->proxies[pid], &s->sparts[offset_sparts + k], 1);
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
const uint32_t logger_flag =
logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
/* Log the particle when leaving a rank. */
logger_log_spart(
e->logger, &s->sparts[offset_sparts + k],
logger_masks_all_spart | logger_mask_data[logger_special_flags].mask,
logger_flag);
}
#endif
}
/* Put the bparts into the corresponding proxies. */
......@@ -610,6 +636,12 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
/* Load the bpart into the proxy */
proxy_bparts_load(&e->proxies[pid], &s->bparts[offset_bparts + k], 1);
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
error("Not yet implemented.");
}
#endif
}
/* Put the gparts into the corresponding proxies. */
......@@ -639,6 +671,22 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
/* Load the gpart into the proxy */
proxy_gparts_load(&e->proxies[pid], &s->gparts[offset_gparts + k], 1);
#ifdef WITH_LOGGER
/* Write only the dark matter particles */
if ((e->policy & engine_policy_logger) &&
s->gparts[offset_gparts + k].type == swift_type_dark_matter) {
const uint32_t logger_flag =
logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
/* Log the particle when leaving a rank. */
logger_log_gpart(
e->logger, &s->gparts[offset_gparts + k],
logger_masks_all_gpart | logger_mask_data[logger_special_flags].mask,
logger_flag);
}
#endif
}
/* Launch the proxies. */
......@@ -858,6 +906,41 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
sizeof(struct spart) * prox->nr_sparts_in);
memcpy(&s->bparts[offset_bparts + count_bparts], prox->bparts_in,
sizeof(struct bpart) * prox->nr_bparts_in);
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
const uint32_t flag =
logger_pack_flags_and_data(logger_flag_mpi_enter, prox->nodeID);
struct part *parts = &s->parts[offset_parts + count_parts];
struct xpart *xparts = &s->xparts[offset_parts + count_parts];
struct spart *sparts = &s->sparts[offset_sparts + count_sparts];
struct gpart *gparts = &s->gparts[offset_gparts + count_gparts];
/* Log the gas particles */
logger_log_parts(
e->logger, parts, xparts, prox->nr_parts_in,
logger_masks_all_part | logger_mask_data[logger_special_flags].mask,
flag);
/* Log the stellar particles */
logger_log_sparts(e->logger, sparts, prox->nr_sparts_in,
logger_masks_all_spart |
logger_mask_data[logger_special_flags].mask,
flag);
/* Log the gparts */
logger_log_gparts(e->logger, gparts, prox->nr_gparts_in,
logger_masks_all_gpart |
logger_mask_data[logger_special_flags].mask,
flag);
/* Log the bparts */
if (prox->nr_bparts_in > 0) {
error("TODO");
}
}
#endif
/* for (int k = offset; k < offset + count; k++)
message(
"received particle %lli, x=[%.3e %.3e %.3e], h=%.3e, from node %i.",
......@@ -1482,7 +1565,9 @@ int engine_estimate_nr_tasks(const struct engine *e) {
}
#if defined(WITH_LOGGER)
/* each cell logs its particles */
n1 += 1;
if (e->policy & engine_policy_logger) {
n1 += 1;
}
#endif
#ifdef WITH_MPI
......@@ -2073,13 +2158,15 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs,
cooling_update(e->cosmology, e->cooling_func, e->s);
#ifdef WITH_LOGGER
/* Mark the first time step in the particle logger file. */
logger_log_timestamp(e->logger, e->ti_current, e->time,
&e->logger->timestamp_offset);
/* Make sure that we have enough space in the particle logger file
* to store the particles in current time step. */
logger_ensure_size(e->logger, s->nr_parts, s->nr_gparts, s->nr_sparts);
logger_write_description(e->logger, e);
if (e->policy & engine_policy_logger) {
/* Mark the first time step in the particle logger file. */
logger_log_timestamp(e->logger, e->ti_current, e->time,
&e->logger->timestamp_offset);
/* Make sure that we have enough space in the particle logger file
* to store the particles in current time step. */
logger_ensure_size(e->logger, s->nr_parts, s->nr_gparts, s->nr_sparts);
logger_write_description(e->logger, e);
}
#endif
/* Now, launch the calculation */
......@@ -2396,13 +2483,15 @@ void engine_step(struct engine *e) {
}
#ifdef WITH_LOGGER
/* Mark the current time step in the particle logger file. */
logger_log_timestamp(e->logger, e->ti_current, e->time,
&e->logger->timestamp_offset);
/* Make sure that we have enough space in the particle logger file
* to store the particles in current time step. */
logger_ensure_size(e->logger, e->s->nr_parts, e->s->nr_gparts,
e->s->nr_sparts);
if (e->policy & engine_policy_logger) {
/* Mark the current time step in the particle logger file. */
logger_log_timestamp(e->logger, e->ti_current, e->time,
&e->logger->timestamp_offset);
/* Make sure that we have enough space in the particle logger file
* to store the particles in current time step. */
logger_ensure_size(e->logger, e->s->nr_parts, e->s->nr_gparts,
e->s->nr_sparts);
}
#endif
/* Are we drifting everything (a la Gadget/GIZMO) ? */
......@@ -2523,7 +2612,9 @@ void engine_step(struct engine *e) {
engine_check_for_dumps(e);
#ifdef WITH_LOGGER
engine_check_for_index_dump(e);
if (e->policy & engine_policy_logger) {
engine_check_for_index_dump(e);
}
#endif
TIMER_TOC2(timer_step);
......@@ -3424,7 +3515,7 @@ void engine_dump_snapshot(struct engine *e) {
*/
void engine_dump_index(struct engine *e) {
#if defined(WITH_LOGGER) && !defined(WITH_MPI)
#if defined(WITH_LOGGER)
struct clocks_time time1, time2;
clocks_gettime(&time1);
......@@ -3710,8 +3801,10 @@ void engine_init(struct engine *e, struct space *s, struct swift_params *params,
e->total_nr_tasks = 0;
#if defined(WITH_LOGGER)
e->logger = (struct logger_writer *)malloc(sizeof(struct logger_writer));
logger_init(e->logger, params);
if (e->policy & engine_policy_logger) {
e->logger = (struct logger_writer *)malloc(sizeof(struct logger_writer));
logger_init(e->logger, params);
}
#endif
/* Make the space link back to the engine. */
......@@ -4233,7 +4326,7 @@ void engine_config(int restart, int fof, struct engine *e,
#endif
#if defined(WITH_LOGGER)
if (e->nodeID == 0)
if ((e->policy & engine_policy_logger) && e->nodeID == 0)
message(
"WARNING: There is currently no way of predicting the output "
"size, please use it carefully");
......@@ -4456,7 +4549,7 @@ void engine_config(int restart, int fof, struct engine *e,
}
#ifdef WITH_LOGGER
if (!restart) {
if ((e->policy & engine_policy_logger) && !restart) {
/* Write the particle logger header */
logger_write_file_header(e->logger);
}
......@@ -4982,8 +5075,10 @@ void engine_clean(struct engine *e, const int fof) {
swift_free("links", e->links);
#if defined(WITH_LOGGER)
logger_free(e->logger);
free(e->logger);
if (e->policy & engine_policy_logger) {
logger_free(e->logger);
free(e->logger);
}
#endif
scheduler_clean(&e->sched);
space_clean(e->s);
......@@ -5064,7 +5159,9 @@ void engine_struct_dump(struct engine *e, FILE *stream) {
if (e->output_list_stf) output_list_struct_dump(e->output_list_stf, stream);
#ifdef WITH_LOGGER
logger_struct_dump(e->logger, stream);
if (e->policy & engine_policy_logger) {
logger_struct_dump(e->logger, stream);
}
#endif
}
......@@ -5211,10 +5308,12 @@ void engine_struct_restore(struct engine *e, FILE *stream) {
}
#ifdef WITH_LOGGER
struct logger_writer *log =
(struct logger_writer *)malloc(sizeof(struct logger_writer));
logger_struct_restore(log, stream);
e->logger = log;
if (e->policy & engine_policy_logger) {
struct logger_writer *log =
(struct logger_writer *)malloc(sizeof(struct logger_writer));
logger_struct_restore(log, stream);
e->logger = log;
}
#endif
#ifdef EOS_PLANETARY
......
......@@ -77,9 +77,10 @@ enum engine_policy {
engine_policy_black_holes = (1 << 19),
engine_policy_fof = (1 << 20),
engine_policy_timestep_limiter = (1 << 21),
engine_policy_timestep_sync = (1 << 22)
engine_policy_timestep_sync = (1 << 22),
engine_policy_logger = (1 << 23),
};
#define engine_maxpolicy 23
#define engine_maxpolicy 24
extern const char *engine_policy_names[engine_maxpolicy + 1];
/**
......
......@@ -831,6 +831,9 @@ void engine_make_hierarchical_tasks_common(struct engine *e, struct cell *c) {
const int with_timestep_limiter =
(e->policy & engine_policy_timestep_limiter);
const int with_timestep_sync = (e->policy & engine_policy_timestep_sync);
#ifdef WITH_LOGGER
const int with_logger = e->policy & engine_policy_logger;
#endif
/* Are we at the top-level? */
if (c->top == c && c->nodeID == e->nodeID) {
......@@ -855,15 +858,20 @@ void engine_make_hierarchical_tasks_common(struct engine *e, struct cell *c) {
c, NULL);
#if defined(WITH_LOGGER)
/* Add the hydro logger task. */
c->logger = scheduler_addtask(s, task_type_logger, task_subtype_none, 0,
0, c, NULL);
struct task *kick2_or_logger;
if (with_logger) {
/* Add the hydro logger task. */
c->logger = scheduler_addtask(s, task_type_logger, task_subtype_none, 0,
0, c, NULL);
/* Add the kick2 dependency */
scheduler_addunlock(s, c->kick2, c->logger);
/* Add the kick2 dependency */
scheduler_addunlock(s, c->kick2, c->logger);
/* Create a variable in order to avoid to many ifdef */
struct task *kick2_or_logger = c->logger;
/* Create a variable in order to avoid to many ifdef */
kick2_or_logger = c->logger;
} else {
kick2_or_logger = c->kick2;
}
#else
struct task *kick2_or_logger = c->kick2;
#endif
......@@ -1070,6 +1078,9 @@ void engine_make_hierarchical_tasks_hydro(struct engine *e, struct cell *c,
const int with_cooling = (e->policy & engine_policy_cooling);
const int with_star_formation = (e->policy & engine_policy_star_formation);
const int with_black_holes = (e->policy & engine_policy_black_holes);
#ifdef WITH_LOGGER
const int with_logger = (e->policy & engine_policy_logger);
#endif
/* Are we are the level where we create the stars' resort tasks?
* If the tree is shallow, we need to do this at the super-level if the
......@@ -1176,7 +1187,11 @@ void engine_make_hierarchical_tasks_hydro(struct engine *e, struct cell *c,
task_subtype_none, 0, 0, c, NULL);
#ifdef WITH_LOGGER
scheduler_addunlock(s, c->super->logger, c->stars.stars_in);
if (with_logger) {
scheduler_addunlock(s, c->super->logger, c->stars.stars_in);
} else {
scheduler_addunlock(s, c->super->kick2, c->stars.stars_in);
}
#else
scheduler_addunlock(s, c->super->kick2, c->stars.stars_in);
#endif
......@@ -1209,7 +1224,13 @@ void engine_make_hierarchical_tasks_hydro(struct engine *e, struct cell *c,
s, task_type_bh_swallow_ghost3, task_subtype_none, 0, 0, c, NULL);
#ifdef WITH_LOGGER
scheduler_addunlock(s, c->super->logger, c->black_holes.black_holes_in);
if (with_logger) {
scheduler_addunlock(s, c->super->logger,
c->black_holes.black_holes_in);
} else {
scheduler_addunlock(s, c->super->kick2,
c->black_holes.black_holes_in);
}
#else
scheduler_addunlock(s, c->super->kick2, c->black_holes.black_holes_in);
#endif
......
......@@ -976,6 +976,59 @@ void engine_redistribute(struct engine *e) {
for (int k = 0; k < nr_nodes; k++)
nr_bparts_new += b_counts[k * nr_nodes + nodeID];
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
/* Log the particles before sending them out */
size_t part_offset = 0;
size_t spart_offset = 0;
size_t gpart_offset = 0;
size_t bpart_offset = 0;
for (int i = 0; i < nr_nodes; i++) {
const size_t c_ind = engine_rank * nr_nodes + i;
/* No need to log the local particles. */
if (i == engine_rank) {
part_offset += counts[c_ind];
spart_offset += s_counts[c_ind];
gpart_offset += g_counts[c_ind];
bpart_offset += b_counts[c_ind];
continue;
}
const uint32_t flag = logger_pack_flags_and_data(logger_flag_mpi_exit, i);
/* Log the hydro parts. */
logger_log_parts(
e->logger, &parts[part_offset], &xparts[part_offset], counts[c_ind],
logger_masks_all_part | logger_mask_data[logger_special_flags].mask,
flag);
/* Log the stellar parts. */
logger_log_sparts(
e->logger, &sparts[spart_offset], s_counts[c_ind],
logger_masks_all_spart | logger_mask_data[logger_special_flags].mask,
flag);
/* Log the gparts */
logger_log_gparts(
e->logger, &gparts[gpart_offset], g_counts[c_ind],
logger_masks_all_gpart | logger_mask_data[logger_special_flags].mask,
flag);
/* Log the bparts */
if (b_counts[c_ind] > 0) {
error("TODO");
}
/* Update the counters */
part_offset += counts[c_ind];
spart_offset += s_counts[c_ind];
gpart_offset += g_counts[c_ind];
bpart_offset += b_counts[c_ind];
}
}
#endif
/* Now exchange the particles, type by type to keep the memory required
* under control. */
......@@ -1028,6 +1081,60 @@ void engine_redistribute(struct engine *e) {
/* All particles have now arrived. Time for some final operations on the
stuff we just received */
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
size_t part_offset = 0;
size_t spart_offset = 0;
size_t gpart_offset = 0;
size_t bpart_offset = 0;
for (int i = 0; i < nr_nodes; i++) {
const size_t c_ind = i * nr_nodes + engine_rank;
/* No need to log the local particles. */
if (i == engine_rank) {
part_offset += counts[c_ind];
spart_offset += s_counts[c_ind];
gpart_offset += g_counts[c_ind];
bpart_offset += b_counts[c_ind];
continue;
}
const uint32_t flag =
logger_pack_flags_and_data(logger_flag_mpi_enter, i);
/* Log the hydro parts. */
logger_log_parts(
e->logger, &s->parts[part_offset], &s->xparts[part_offset], counts[c_ind],
logger_masks_all_part | logger_mask_data[logger_special_flags].mask,
flag);
/* Log the stellar parts. */
logger_log_sparts(
e->logger, &s->sparts[spart_offset], s_counts[c_ind],
logger_masks_all_spart | logger_mask_data[logger_special_flags].mask,
flag);
/* Log the gparts */
logger_log_gparts(
e->logger, &s->gparts[gpart_offset], g_counts[c_ind],
logger_masks_all_gpart | logger_mask_data[logger_special_flags].mask,
flag);
/* Log the bparts */
if (b_counts[c_ind] > 0) {
error("TODO");
}
/* Update the counters */
part_offset += counts[c_ind];
spart_offset += s_counts[c_ind];
gpart_offset += g_counts[c_ind];
bpart_offset += b_counts[c_ind];
}
}
#endif
/* Restore the part<->gpart and spart<->gpart links.
* Generate indices and counts for threadpool tasks. Note we process a node
* at a time. */
......
......@@ -78,7 +78,7 @@ const struct mask_data logger_mask_data[logger_count_mask] = {
/* Particle's constants: mass (float) and ID (long long). */
{sizeof(float) + sizeof(long long), 1 << logger_consts, "consts"},
/* Flag for special cases (e.g. change of MPI rank, star formation, ...) */
{sizeof(int), 1 << logger_special_flags, "special flags"},
{sizeof(uint32_t), 1 << logger_special_flags, "special flags"},
/* Simulation time stamp: integertime and double time (e.g. scale
factor or time). */
{sizeof(integertime_t) + sizeof(double), 1 << logger_timestamp,
......@@ -178,74 +178,44 @@ void logger_log_all(struct logger_writer *log, const struct engine *e) {
/* some constants. */
const struct space *s = e->s;
const unsigned int mask_hydro =
logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask | logger_mask_data[logger_u].mask |
logger_mask_data[logger_h].mask | logger_mask_data[logger_rho].mask |
logger_mask_data[logger_consts].mask;
/* loop over all parts. */
for (size_t i = 0; i < s->nr_parts; i++) {
logger_log_part(log, &s->parts[i], mask_hydro,
&s->xparts[i].logger_data.last_offset,
logger_log_part(log, &s->parts[i], &s->xparts[i], logger_masks_all_part,
/* Special flags */ 0);
s->xparts[i].logger_data.steps_since_last_output = 0;
}
const unsigned int mask_grav =
logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask | logger_mask_data[logger_consts].mask;
/* loop over all gparts */
for (size_t i = 0; i < s->nr_gparts; i++) {
/* Log only the dark matter */
if (s->gparts[i].type != swift_type_dark_matter) continue;
logger_log_gpart(log, &s->gparts[i], mask_grav,
&s->gparts[i].logger_data.last_offset,