Commit b07c911a authored by Loic Hausammann's avatar Loic Hausammann
Browse files

Logger: first review of MPI

parent 511bef09
......@@ -169,6 +169,7 @@ int main(int argc, char *argv[]) {
int with_drift_all = 0;
int with_mpole_reconstruction = 0;
int with_structure_finding = 0;
int with_logger = 0;
int with_eagle = 0;
int verbose = 0;
int nr_threads = 1;
......@@ -252,6 +253,8 @@ int main(int argc, char *argv[]) {
"Overwrite the CPU "
"frequency (Hz) to be used for time measurements.",
NULL, 0, 0),
OPT_BOOLEAN(0, "logger", &with_logger,
"Run with the logger.", NULL, 0, 0),
OPT_INTEGER('n', "steps", &nsteps,
"Execute a fixed number of time steps. When unset use the "
"time_end parameter to stop.",
......@@ -330,6 +333,13 @@ int main(int argc, char *argv[]) {
}
#endif
#if !defined(WITH_LOGGER)
if (with_logger) {
printf("Error: the logger is not available, please compile with --enable-logger.");
return 1;
}
#endif
#ifndef HAVE_FE_ENABLE_EXCEPT
if (with_fp_exceptions) {
printf("Error: no support for floating point exceptions\n");
......@@ -1144,6 +1154,7 @@ int main(int argc, char *argv[]) {
if (with_structure_finding)
engine_policies |= engine_policy_structure_finding;
if (with_fof) engine_policies |= engine_policy_fof;
if (with_logger) engine_policies |= engine_policy_logger;
/* Initialize the engine with the space and policies. */
if (myrank == 0) clocks_gettime(&tic);
......@@ -1206,7 +1217,15 @@ int main(int argc, char *argv[]) {
#ifdef WITH_MPI
/* Split the space. */
engine_split(&e, &initial_partition);
engine_redistribute(&e, /* initial_redistribute */ 1);
if (with_logger) {
/* Turn off the logger to avoid writing the communications */
e.policy &= ~engine_policy_logger;
}
engine_redistribute(&e);
if (with_logger) {
/* Turn it back on */
e.policy |= engine_policy_logger;
}
#endif
/* Initialise the particles */
......@@ -1214,8 +1233,10 @@ int main(int argc, char *argv[]) {
/* Write the state of the system before starting time integration. */
#ifdef WITH_LOGGER
if (e.policy & engine_policy_logger) {
logger_log_all(e.logger, &e);
engine_dump_index(&e);
}
#endif
/* Dump initial state snapshot, if not working with an output list */
if (!e.output_list_snapshots) engine_dump_snapshot(&e);
......@@ -1417,6 +1438,7 @@ int main(int argc, char *argv[]) {
engine_print_stats(&e);
}
#ifdef WITH_LOGGER
if (e.policy & engine_policy_logger) {
logger_log_all(e.logger, &e);
/* Write a final index file */
......@@ -1425,6 +1447,7 @@ int main(int argc, char *argv[]) {
/* Write a sentinel timestamp */
logger_log_timestamp(e.logger, e.ti_current, e.time,
&e.logger->timestamp_offset);
}
#endif
/* Write final snapshot? */
......
......@@ -122,7 +122,9 @@ const char *engine_policy_names[] = {"none",
"black holes",
"fof search",
"time-step limiter",
"time-step sync"};
"time-step sync",
"logger"
};
/** The rank of the engine as a global variable (for messages). */
int engine_rank;
......@@ -223,7 +225,7 @@ void engine_repartition(struct engine *e) {
Finally, the space, tasks, and proxies need to be rebuilt. */
/* Redistribute the particles between the nodes. */
engine_redistribute(e, /* initial_redistribute */ 0);
engine_redistribute(e);
/* Make the proxies. */
engine_makeproxies(e);
......@@ -454,21 +456,17 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
&s->xparts[offset_parts + k], 1);
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
const int logger_flag = logger_generate_flag(
logger_flag_mpi | logger_flag_delete, node_id);
logger_flag_mpi_exit, node_id);
/* Log the particle when leaving a rank. */
logger_log_part(e->logger, &s->parts[offset_parts + k],
logger_mask_data[logger_x].mask |
logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask |
logger_mask_data[logger_u].mask |
logger_mask_data[logger_h].mask |
logger_mask_data[logger_rho].mask |
logger_mask_data[logger_consts].mask |
logger_masks_all_part |
logger_mask_data[logger_special_flags].mask,
&s->xparts[offset_parts + k].logger_data.last_offset,
logger_flag);
}
#endif
}
......@@ -507,17 +505,17 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
proxy_sparts_load(&e->proxies[pid], &s->sparts[offset_sparts + k], 1);
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
const int logger_flag = logger_generate_flag(
logger_flag_mpi | logger_flag_delete, node_id);
logger_flag_mpi_exit, node_id);
/* Log the particle when leaving a rank. */
logger_log_spart(e->logger, &s->sparts[offset_sparts + k],
logger_mask_data[logger_x].mask |
logger_mask_data[logger_v].mask |
logger_mask_data[logger_consts].mask |
logger_masks_all_spart |
logger_mask_data[logger_special_flags].mask,
&s->sparts[offset_parts + k].logger_data.last_offset,
logger_flag);
}
#endif
}
......@@ -556,7 +554,9 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
proxy_bparts_load(&e->proxies[pid], &s->bparts[offset_bparts + k], 1);
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
error("TODO");
}
#endif
}
......@@ -590,17 +590,15 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
#ifdef WITH_LOGGER
/* Write only the dark matter particles */
if (s->gparts[offset_gparts + k].type == swift_type_dark_matter) {
if ((e->policy & engine_policy_logger) &&
s->gparts[offset_gparts + k].type == swift_type_dark_matter) {
const int logger_flag = logger_generate_flag(
logger_flag_mpi | logger_flag_delete, node_id);
logger_flag_mpi_exit, node_id);
/* Log the particle when leaving a rank. */
logger_log_gpart(e->logger, &s->gparts[offset_gparts + k],
logger_mask_data[logger_x].mask |
logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask |
logger_mask_data[logger_consts].mask |
logger_masks_all_gpart |
logger_mask_data[logger_special_flags].mask,
&s->sparts[offset_parts + k].logger_data.last_offset,
logger_flag);
......@@ -827,12 +825,57 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
sizeof(struct bpart) * prox->nr_bparts_in);
#ifdef WITH_LOGGER
logger_log_recv_strays(e->logger, &s->parts[offset_parts + count_parts],
&s->xparts[offset_parts + count_parts], prox->nr_parts_in,
&s->gparts[offset_gparts + count_gparts], prox->nr_gparts_in,
&s->sparts[offset_sparts + count_sparts], prox->nr_sparts_in,
&s->bparts[offset_bparts + count_bparts], prox->nr_bparts_in,
if (e->policy & engine_policy_logger) {
const int flag = logger_generate_flag(logger_flag_mpi_enter,
prox->nodeID);
struct part *parts = &s->parts[offset_parts + count_parts];
struct xpart *xparts = &s->xparts[offset_parts + count_parts];
struct spart *sparts = &s->sparts[offset_sparts + count_sparts];
struct gpart *gparts = &s->gparts[offset_gparts + count_gparts];
/* Log the gas particles */
const unsigned int mask_hydro =
logger_masks_all_part |
logger_mask_data[logger_special_flags].mask;
for(int i = 0; i < prox->nr_parts_in; i++) {
logger_log_part(e->logger, &parts[i], mask_hydro,
&xparts[i].logger_data.last_offset,
flag);
/* Reset the counter */
xparts[i].logger_data.steps_since_last_output = 0;
}
/* Log the stellar particles */
const unsigned int mask_stars = logger_masks_all_spart |
logger_mask_data[logger_special_flags].mask;
for(int i = 0; i < prox->nr_sparts_in; i++) {
logger_log_spart(e->logger, &sparts[i], mask_stars,
&sparts[i].logger_data.last_offset,
flag);
sparts[i].logger_data.steps_since_last_output = 0;
}
/* Log the gparts */
const unsigned int mask_grav =
logger_masks_all_gpart |
logger_mask_data[logger_special_flags].mask;
for(int i = 0; i < prox->nr_gparts_in; i++) {
/* Log only the dark matter */
if (gparts[i].type != swift_type_dark_matter) continue;
logger_log_gpart(e->logger, &gparts[i], mask_grav,
&gparts[i].logger_data.last_offset,
flag);
gparts[i].logger_data.steps_since_last_output = 0;
}
/* Log the bparts */
if (prox->nr_bparts_in > 0) {
error("TODO");
}
}
#endif
/* for (int k = offset; k < offset + count; k++)
message(
......@@ -1458,7 +1501,9 @@ int engine_estimate_nr_tasks(const struct engine *e) {
}
#if defined(WITH_LOGGER)
/* each cell logs its particles */
if (e->policy & engine_policy_logger) {
n1 += 1;
}
#endif
#ifdef WITH_MPI
......@@ -2027,6 +2072,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs,
cooling_update(e->cosmology, e->cooling_func, e->s);
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
/* Mark the first time step in the particle logger file. */
logger_log_timestamp(e->logger, e->ti_current, e->time,
&e->logger->timestamp_offset);
......@@ -2034,6 +2080,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs,
* to store the particles in current time step. */
logger_ensure_size(e->logger, s->nr_parts, s->nr_gparts, s->nr_sparts);
logger_write_description(e->logger, e);
}
#endif
/* Now, launch the calculation */
......@@ -2350,6 +2397,7 @@ void engine_step(struct engine *e) {
}
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
/* Mark the current time step in the particle logger file. */
logger_log_timestamp(e->logger, e->ti_current, e->time,
&e->logger->timestamp_offset);
......@@ -2357,6 +2405,7 @@ void engine_step(struct engine *e) {
* to store the particles in current time step. */
logger_ensure_size(e->logger, e->s->nr_parts, e->s->nr_gparts,
e->s->nr_sparts);
}
#endif
/* Are we drifting everything (a la Gadget/GIZMO) ? */
......@@ -2459,7 +2508,9 @@ void engine_step(struct engine *e) {
engine_check_for_dumps(e);
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
engine_check_for_index_dump(e);
}
#endif
TIMER_TOC2(timer_step);
......@@ -3645,8 +3696,10 @@ void engine_init(struct engine *e, struct space *s, struct swift_params *params,
e->total_nr_tasks = 0;
#if defined(WITH_LOGGER)
if (e->policy & engine_policy_logger) {
e->logger = (struct logger_writer *)malloc(sizeof(struct logger_writer));
logger_init(e->logger, params);
}
#endif
/* Make the space link back to the engine. */
......@@ -4168,7 +4221,7 @@ void engine_config(int restart, int fof, struct engine *e,
#endif
#if defined(WITH_LOGGER)
if (e->nodeID == 0)
if ((e->policy & engine_policy_logger) && e->nodeID == 0)
message(
"WARNING: There is currently no way of predicting the output "
"size, please use it carefully");
......@@ -4391,7 +4444,7 @@ void engine_config(int restart, int fof, struct engine *e,
}
#ifdef WITH_LOGGER
if (!restart) {
if ((e->policy & engine_policy_logger) && !restart) {
/* Write the particle logger header */
logger_write_file_header(e->logger);
}
......@@ -4917,8 +4970,10 @@ void engine_clean(struct engine *e, const int fof) {
swift_free("links", e->links);
#if defined(WITH_LOGGER)
if (e->policy & engine_policy_logger) {
logger_free(e->logger);
free(e->logger);
}
#endif
scheduler_clean(&e->sched);
space_clean(e->s);
......@@ -4999,7 +5054,9 @@ void engine_struct_dump(struct engine *e, FILE *stream) {
if (e->output_list_stf) output_list_struct_dump(e->output_list_stf, stream);
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
logger_struct_dump(e->logger, stream);
}
#endif
}
......@@ -5146,10 +5203,12 @@ void engine_struct_restore(struct engine *e, FILE *stream) {
}
#ifdef WITH_LOGGER
if (e->policy & engine_policy_logger) {
struct logger_writer *log =
(struct logger_writer *)malloc(sizeof(struct logger_writer));
logger_struct_restore(log, stream);
e->logger = log;
}
#endif
#ifdef EOS_PLANETARY
......
......@@ -77,9 +77,10 @@ enum engine_policy {
engine_policy_black_holes = (1 << 19),
engine_policy_fof = (1 << 20),
engine_policy_timestep_limiter = (1 << 21),
engine_policy_timestep_sync = (1 << 22)
engine_policy_timestep_sync = (1 << 22),
engine_policy_logger = (1 << 23),
};
#define engine_maxpolicy 23
#define engine_maxpolicy 24
extern const char *engine_policy_names[engine_maxpolicy + 1];
/**
......@@ -544,7 +545,7 @@ void engine_rebuild(struct engine *e, int redistributed, int clean_h_values);
void engine_repartition(struct engine *e);
void engine_repartition_trigger(struct engine *e);
void engine_makeproxies(struct engine *e);
void engine_redistribute(struct engine *e, int initial_redistribute);
void engine_redistribute(struct engine *e);
void engine_print_policy(struct engine *e);
int engine_is_done(struct engine *e);
void engine_pin(void);
......
......@@ -831,6 +831,7 @@ void engine_make_hierarchical_tasks_common(struct engine *e, struct cell *c) {
const int with_timestep_limiter =
(e->policy & engine_policy_timestep_limiter);
const int with_timestep_sync = (e->policy & engine_policy_timestep_sync);
const int with_logger = e->policy & engine_policy_logger;
/* Are we at the top-level? */
if (c->top == c && c->nodeID == e->nodeID) {
......@@ -855,6 +856,8 @@ void engine_make_hierarchical_tasks_common(struct engine *e, struct cell *c) {
c, NULL);
#if defined(WITH_LOGGER)
struct task *kick2_or_logger;
if (with_logger) {
/* Add the hydro logger task. */
c->logger = scheduler_addtask(s, task_type_logger, task_subtype_none, 0,
0, c, NULL);
......@@ -863,7 +866,11 @@ void engine_make_hierarchical_tasks_common(struct engine *e, struct cell *c) {
scheduler_addunlock(s, c->kick2, c->logger);
/* Create a variable in order to avoid to many ifdef */
struct task *kick2_or_logger = c->logger;
kick2_or_logger = c->logger;
}
else {
kick2_or_logger = c->kick2;
}
#else
struct task *kick2_or_logger = c->kick2;
#endif
......@@ -1070,6 +1077,7 @@ void engine_make_hierarchical_tasks_hydro(struct engine *e, struct cell *c,
const int with_cooling = (e->policy & engine_policy_cooling);
const int with_star_formation = (e->policy & engine_policy_star_formation);
const int with_black_holes = (e->policy & engine_policy_black_holes);
const int with_logger = (e->policy & engine_policy_logger);
/* Are we are the level where we create the stars' resort tasks?
* If the tree is shallow, we need to do this at the super-level if the
......@@ -1176,7 +1184,12 @@ void engine_make_hierarchical_tasks_hydro(struct engine *e, struct cell *c,
task_subtype_none, 0, 0, c, NULL);
#ifdef WITH_LOGGER
if (with_logger) {
scheduler_addunlock(s, c->super->logger, c->stars.stars_in);
}
else {
scheduler_addunlock(s, c->super->kick2, c->stars.stars_in);
}
#else
scheduler_addunlock(s, c->super->kick2, c->stars.stars_in);
#endif
......@@ -1209,7 +1222,12 @@ void engine_make_hierarchical_tasks_hydro(struct engine *e, struct cell *c,
s, task_type_bh_swallow_ghost3, task_subtype_none, 0, 0, c, NULL);
#ifdef WITH_LOGGER
if (with_logger) {
scheduler_addunlock(s, c->super->logger, c->black_holes.black_holes_in);
}
else {
scheduler_addunlock(s, c->super->kick2, c->black_holes.black_holes_in);
}
#else
scheduler_addunlock(s, c->super->kick2, c->black_holes.black_holes_in);
#endif
......
......@@ -514,9 +514,8 @@ static void engine_redistribute_relink_mapper(void *map_data, int num_elements,
*
*
* @param e The #engine.
* @param initial_redistribute Is it the initial redistribute (just after reading the particles).
*/
void engine_redistribute(struct engine *e, int initial_redistribute) {
void engine_redistribute(struct engine *e) {
#ifdef WITH_MPI
......@@ -978,7 +977,7 @@ void engine_redistribute(struct engine *e, int initial_redistribute) {
nr_bparts_new += b_counts[k * nr_nodes + nodeID];
#ifdef WITH_LOGGER
if (!initial_redistribute) {
if (e->policy & engine_policy_logger) {
/* Log the particles before sending them out */
const int sending = 1;
logger_log_repartition(e->logger, nr_nodes, sending, s->parts,
......@@ -1042,10 +1041,10 @@ void engine_redistribute(struct engine *e, int initial_redistribute) {
stuff we just received */
#ifdef WITH_LOGGER
if (!initial_redistribute) {
if (e->policy & engine_policy_logger) {
/* Log the received particles */
const int receiving = 0;
logger_log_repartition(e->logger, nr_nodes, receiving, s->parts,
const int sending = 0;
logger_log_repartition(e->logger, nr_nodes, sending, s->parts,
s->xparts, nr_parts_new, counts,
s->gparts, nr_gparts_new, g_counts,
s->sparts, nr_sparts_new, s_counts,
......
......@@ -178,42 +178,29 @@ void logger_log_all(struct logger_writer *log, const struct engine *e) {
/* some constants. */
const struct space *s = e->s;
const unsigned int mask_hydro =
logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask | logger_mask_data[logger_u].mask |
logger_mask_data[logger_h].mask | logger_mask_data[logger_rho].mask |
logger_mask_data[logger_consts].mask;
/* loop over all parts. */
for (size_t i = 0; i < s->nr_parts; i++) {
logger_log_part(log, &s->parts[i], mask_hydro,
logger_log_part(log, &s->parts[i], logger_masks_all_part,
&s->xparts[i].logger_data.last_offset,
/* Special flags */ 0);
s->xparts[i].logger_data.steps_since_last_output = 0;
}
const unsigned int mask_grav =
logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask | logger_mask_data[logger_consts].mask;
/* loop over all gparts */
for (size_t i = 0; i < s->nr_gparts; i++) {
/* Log only the dark matter */
if (s->gparts[i].type != swift_type_dark_matter) continue;
logger_log_gpart(log, &s->gparts[i], mask_grav,
logger_log_gpart(log, &s->gparts[i], logger_masks_all_gpart,
&s->gparts[i].logger_data.last_offset,
/* Special flags */ 0);
s->gparts[i].logger_data.steps_since_last_output = 0;
}
const unsigned int mask_stars = logger_mask_data[logger_x].mask |
logger_mask_data[logger_v].mask |
logger_mask_data[logger_consts].mask;
/* loop over all sparts */
for (size_t i = 0; i < s->nr_sparts; i++) {
logger_log_spart(log, &s->sparts[i], mask_stars,
logger_log_spart(log, &s->sparts[i], logger_masks_all_spart,
&s->sparts[i].logger_data.last_offset,
/* Special flags */ 0);
s->sparts[i].logger_data.steps_since_last_output = 0;
......@@ -229,8 +216,8 @@ void logger_log_all(struct logger_writer *log, const struct engine *e) {
* @param p The #part to dump.
* @param mask The mask of the data to dump.
* @param offset Pointer to the offset of the previous log of this particle;
* @param special_flags The value of the special flag.
* (return) offset of this log.
* @param special_flags The value of the special flag.
*/
void logger_log_part(struct logger_writer *log, const struct part *p,
unsigned int mask, size_t *offset,
......@@ -891,16 +878,13 @@ void logger_log_repartition(
continue;
}
const enum logger_special_flags create_delete =
sending? logger_flag_delete : logger_flag_create;
const enum logger_special_flags receive_or_send =
sending? logger_flag_mpi_exit : logger_flag_mpi_enter;
const int flag = logger_generate_flag(
logger_flag_mpi | create_delete, i);
receive_or_send, i);
const unsigned int mask_hydro =
logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask | logger_mask_data[logger_u].mask |
logger_mask_data[logger_h].mask | logger_mask_data[logger_rho].mask |
logger_mask_data[logger_consts].mask |
logger_masks_all_part |
logger_mask_data[logger_special_flags].mask;
/* Log the hydro parts. */
......@@ -912,9 +896,7 @@ void logger_log_repartition(
xparts[ind].logger_data.steps_since_last_output = 0;
}
const unsigned int mask_stars = logger_mask_data[logger_x].mask |
logger_mask_data[logger_v].mask |
logger_mask_data[logger_consts].mask |
const unsigned int mask_stars = logger_masks_all_spart |
logger_mask_data[logger_special_flags].mask;
/* Log the stellar parts. */
......@@ -927,8 +909,7 @@ void logger_log_repartition(
}
const unsigned int mask_grav =
logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask | logger_mask_data[logger_consts].mask |
logger_masks_all_gpart |
logger_mask_data[logger_special_flags].mask;
/* Log the gparts */
......@@ -955,84 +936,6 @@ void logger_log_repartition(
bpart_offset += b_counts[c_ind];
}
}
/**
* @brief Log all the particles arriving in the current rank.
*
* @param parts The list of #part.
* @param nr_parts The number of parts.
* @param count The number of parts in each ranks.
* @param gparts The list of #gpart.
* @param nr_gparts The number of gparts.
* @param gcount The number of gparts in each ranks.
* @param sparts The list of #spart.
* @param nr_sparts The number of sparts.
* @param s_counts The number of sparts in each ranks.
* @param bparts The list of #bpart.
* @param nr_bparts The number of bparts.
* @param b_counts The number of bparts in each ranks.
*
*/
void logger_log_recv_strays(
struct logger_writer *log,
struct part *parts, struct xpart *xparts, size_t nr_parts,
struct gpart *gparts, size_t nr_gparts,
struct spart *sparts, size_t nr_sparts,
struct bpart *bparts, size_t nr_bparts,
int node_id) {
const int flag = logger_generate_flag(logger_flag_mpi | logger_flag_create,
node_id);
/* Log the gas particles */
const unsigned int mask_hydro =
logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask | logger_mask_data[logger_u].mask |
logger_mask_data[logger_h].mask | logger_mask_data[logger_rho].mask |
logger_mask_data[logger_consts].mask |
logger_mask_data[logger_special_flags].mask;
for(size_t i = 0; i < nr_parts; i++) {
logger_log_part(log, &parts[i], mask_hydro,
&xparts[i].logger_data.last_offset,
flag);
xparts[i].logger_data.steps_since_last_output = 0;
}
/* Log the stellar particles */
const unsigned int mask_stars = logger_mask_data[logger_x].mask |
logger_mask_data[logger_v].mask |
logger_mask_data[logger_consts].mask |
logger_mask_data[logger_special_flags].mask;
for(size_t i = 0; i < nr_sparts; i++) {
logger_log_spart(log, &sparts[i], mask_stars,
&sparts[i].logger_data.last_offset,
/* Special flags */ 0);
sparts[i].logger_data.steps_since_last_output = 0;
}
/* Log the gparts */
const unsigned int mask_grav =
logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask | logg