Commit 33129ea4 authored by Loic Hausammann's avatar Loic Hausammann
Browse files

Logger: implement writing before/after communication

parent a3c1f0f9
...@@ -583,9 +583,10 @@ int main(int argc, char *argv[]) { ...@@ -583,9 +583,10 @@ int main(int argc, char *argv[]) {
#ifdef WITH_MPI #ifdef WITH_MPI
if (with_mpole_reconstruction && nr_nodes > 1) if (with_mpole_reconstruction && nr_nodes > 1)
error("Cannot reconstruct m-poles every step over MPI (yet)."); error("Cannot reconstruct m-poles every step over MPI (yet).");
#ifdef WITH_LOGGER if (with_timestep_limiter)
error("Can't run with the particle logger over MPI (yet)"); error("Can't run with time-step limiter over MPI (yet)");
#endif if (with_timestep_sync)
error("Can't run with time-step synchronization over MPI (yet)");
#endif #endif
/* Temporary early aborts for modes not supported with hand-vec. */ /* Temporary early aborts for modes not supported with hand-vec. */
...@@ -1205,7 +1206,7 @@ int main(int argc, char *argv[]) { ...@@ -1205,7 +1206,7 @@ int main(int argc, char *argv[]) {
#ifdef WITH_MPI #ifdef WITH_MPI
/* Split the space. */ /* Split the space. */
engine_split(&e, &initial_partition); engine_split(&e, &initial_partition);
engine_redistribute(&e); engine_redistribute(&e, /* initial_redistribute */ 1);
#endif #endif
/* Initialise the particles */ /* Initialise the particles */
...@@ -1418,6 +1419,9 @@ int main(int argc, char *argv[]) { ...@@ -1418,6 +1419,9 @@ int main(int argc, char *argv[]) {
#ifdef WITH_LOGGER #ifdef WITH_LOGGER
logger_log_all(e.logger, &e); logger_log_all(e.logger, &e);
/* Write a final index file */
engine_dump_index(&e);
/* Write a sentinel timestamp */ /* Write a sentinel timestamp */
logger_log_timestamp(e.logger, e.ti_current, e.time, logger_log_timestamp(e.logger, e.ti_current, e.time,
&e.logger->timestamp_offset); &e.logger->timestamp_offset);
......
...@@ -223,7 +223,7 @@ void engine_repartition(struct engine *e) { ...@@ -223,7 +223,7 @@ void engine_repartition(struct engine *e) {
Finally, the space, tasks, and proxies need to be rebuilt. */ Finally, the space, tasks, and proxies need to be rebuilt. */
/* Redistribute the particles between the nodes. */ /* Redistribute the particles between the nodes. */
engine_redistribute(e); engine_redistribute(e, /* initial_redistribute */ 0);
/* Make the proxies. */ /* Make the proxies. */
engine_makeproxies(e); engine_makeproxies(e);
......
...@@ -544,7 +544,7 @@ void engine_rebuild(struct engine *e, int redistributed, int clean_h_values); ...@@ -544,7 +544,7 @@ void engine_rebuild(struct engine *e, int redistributed, int clean_h_values);
void engine_repartition(struct engine *e); void engine_repartition(struct engine *e);
void engine_repartition_trigger(struct engine *e); void engine_repartition_trigger(struct engine *e);
void engine_makeproxies(struct engine *e); void engine_makeproxies(struct engine *e);
void engine_redistribute(struct engine *e); void engine_redistribute(struct engine *e, int initial_redistribute);
void engine_print_policy(struct engine *e); void engine_print_policy(struct engine *e);
int engine_is_done(struct engine *e); int engine_is_done(struct engine *e);
void engine_pin(void); void engine_pin(void);
......
...@@ -514,8 +514,9 @@ static void engine_redistribute_relink_mapper(void *map_data, int num_elements, ...@@ -514,8 +514,9 @@ static void engine_redistribute_relink_mapper(void *map_data, int num_elements,
* *
* *
* @param e The #engine. * @param e The #engine.
* @param initial_redistribute Is it the initial redistribute (just after reading the particles).
*/ */
void engine_redistribute(struct engine *e) { void engine_redistribute(struct engine *e, int initial_redistribute) {
#ifdef WITH_MPI #ifdef WITH_MPI
...@@ -977,11 +978,15 @@ void engine_redistribute(struct engine *e) { ...@@ -977,11 +978,15 @@ void engine_redistribute(struct engine *e) {
nr_bparts_new += b_counts[k * nr_nodes + nodeID]; nr_bparts_new += b_counts[k * nr_nodes + nodeID];
#ifdef WITH_LOGGER #ifdef WITH_LOGGER
/* Log the particles before sending them out */ if (!initial_redistribute) {
logger_log_before_communcations(s->parts, nr_parts_new, counts, /* Log the particles before sending them out */
s->gparts, nr_gparts_new, g_counts, const int sending = 1;
s->sparts, nr_sparts_new, s_counts, logger_log_repartition(e->logger, nr_nodes, sending, s->parts,
s->bparts, nr_bparts_new, b_counts); s->xparts, nr_parts_new, counts,
s->gparts, nr_gparts_new, g_counts,
s->sparts, nr_sparts_new, s_counts,
s->bparts, nr_bparts_new, b_counts);
}
#endif #endif
/* Now exchange the particles, type by type to keep the memory required /* Now exchange the particles, type by type to keep the memory required
...@@ -1037,11 +1042,15 @@ void engine_redistribute(struct engine *e) { ...@@ -1037,11 +1042,15 @@ void engine_redistribute(struct engine *e) {
stuff we just received */ stuff we just received */
#ifdef WITH_LOGGER #ifdef WITH_LOGGER
/* Log the received particles */ if (!initial_redistribute) {
logger_log_after_communcations(s->parts, s->nr_parts, counts, /* Log the received particles */
s->gparts, s->nr_gparts, g_counts, const int receiving = 0;
s->sparts, s->nr_sparts, s_counts, logger_log_repartition(e->logger, nr_nodes, receiving, s->parts,
s->bparts, s->nr_bparts, b_counts); s->xparts, nr_parts_new, counts,
s->gparts, nr_gparts_new, g_counts,
s->sparts, nr_sparts_new, s_counts,
s->bparts, nr_bparts_new, b_counts);
}
#endif #endif
/* Restore the part<->gpart and spart<->gpart links. /* Restore the part<->gpart and spart<->gpart links.
......
...@@ -849,6 +849,9 @@ int logger_read_timestamp(unsigned long long int *t, double *time, ...@@ -849,6 +849,9 @@ int logger_read_timestamp(unsigned long long int *t, double *time,
/** /**
* @brief Log all the particles leaving the current rank. * @brief Log all the particles leaving the current rank.
* *
* @param log The #logger_writer.
* @param nr_nodes Number of nodes used in the simulation.
* @param sneding Are we sending the particles (or receiving)?
* @param parts The list of #part. * @param parts The list of #part.
* @param nr_parts The number of parts. * @param nr_parts The number of parts.
* @param count The number of parts in each ranks. * @param count The number of parts in each ranks.
...@@ -863,37 +866,93 @@ int logger_read_timestamp(unsigned long long int *t, double *time, ...@@ -863,37 +866,93 @@ int logger_read_timestamp(unsigned long long int *t, double *time,
* @param b_counts The number of bparts in each ranks. * @param b_counts The number of bparts in each ranks.
* *
*/ */
void logger_log_before_communcations( void logger_log_repartition(
struct part *parts, size_t nr_parts, int *counts, struct logger_writer *log, int nr_nodes, int sending, struct part *parts,
struct xpart *xparts, size_t nr_parts, int *counts,
struct gpart *gparts, size_t nr_gparts, int *g_counts, struct gpart *gparts, size_t nr_gparts, int *g_counts,
struct spart *sparts, size_t nr_sparts, int *s_counts, struct spart *sparts, size_t nr_sparts, int *s_counts,
struct bpart *bparts, size_t nr_bparts, int *b_counts) { struct bpart *bparts, size_t nr_bparts, int *b_counts) {
error("TODO");
}
/** size_t part_offset = 0;
* @brief Log all the particles arriving in the current rank. size_t spart_offset = 0;
* size_t gpart_offset = 0;
* @param parts The list of #part. size_t bpart_offset = 0;
* @param nr_parts The number of parts.
* @param count The number of parts in each ranks. for(int i = 0; i < nr_nodes; i++) {
* @param gparts The list of #gpart. const size_t c_ind = sending ? engine_rank * nr_nodes + i:
* @param nr_gparts The number of gparts. i * nr_nodes + engine_rank;
* @param gcount The number of gparts in each ranks.
* @param sparts The list of #spart. /* No need to log the local particles. */
* @param nr_sparts The number of sparts. if (i == engine_rank) {
* @param s_counts The number of sparts in each ranks. part_offset += counts[c_ind];
* @param bparts The list of #bpart. spart_offset += s_counts[c_ind];
* @param nr_bparts The number of bparts. gpart_offset += g_counts[c_ind];
* @param b_counts The number of bparts in each ranks. bpart_offset += b_counts[c_ind];
* continue;
*/ }
void logger_log_after_communcations(
struct part *parts, size_t nr_parts, int *counts, const int flag = logger_generate_flag(
struct gpart *gparts, size_t nr_gparts, int *g_counts, logger_flag_mpi | logger_flag_delete, i);
struct spart *sparts, size_t nr_sparts, int *s_counts,
struct bpart *bparts, size_t nr_bparts, int *b_counts) { const unsigned int mask_hydro =
error("TODO"); logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask | logger_mask_data[logger_u].mask |
logger_mask_data[logger_h].mask | logger_mask_data[logger_rho].mask |
logger_mask_data[logger_consts].mask |
logger_mask_data[logger_special_flags].mask;
/* Log the hydro parts. */
for(int j = 0; j < counts[c_ind]; j++) {
size_t ind = part_offset + j;
message("%i: %lli", sending, parts[ind].id);
logger_log_part(log, &parts[ind], mask_hydro,
&xparts[ind].logger_data.last_offset,
flag);
xparts[ind].logger_data.steps_since_last_output = 0;
}
const unsigned int mask_stars = logger_mask_data[logger_x].mask |
logger_mask_data[logger_v].mask |
logger_mask_data[logger_consts].mask |
logger_mask_data[logger_special_flags].mask;
/* Log the stellar parts. */
for(int j = 0; j < s_counts[c_ind]; j++) {
size_t ind = spart_offset + j;
logger_log_spart(log, &sparts[ind], mask_stars,
&sparts[ind].logger_data.last_offset,
flag);
sparts[ind].logger_data.steps_since_last_output = 0;
}
const unsigned int mask_grav =
logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask | logger_mask_data[logger_consts].mask |
logger_mask_data[logger_special_flags].mask;
/* Log the gparts */
for(int j = 0; j < g_counts[c_ind]; j++) {
size_t ind = gpart_offset + j;
/* Log only the dark matter */
if (gparts[ind].type != swift_type_dark_matter) continue;
logger_log_gpart(log, &gparts[ind], mask_grav,
&gparts[ind].logger_data.last_offset,
flag);
gparts[ind].logger_data.steps_since_last_output = 0;
}
/* Log the bparts */
if (b_counts[c_ind] > 0) {
error("TODO");
}
/* Update the counters */
part_offset += counts[c_ind];
spart_offset += s_counts[c_ind];
gpart_offset += g_counts[c_ind];
bpart_offset += b_counts[c_ind];
}
} }
/** /**
......
...@@ -197,14 +197,9 @@ INLINE static int logger_generate_flag(enum logger_special_flags flag, int data) ...@@ -197,14 +197,9 @@ INLINE static int logger_generate_flag(enum logger_special_flags flag, int data)
} }
#ifdef WITH_MPI #ifdef WITH_MPI
void logger_log_before_communcations( void logger_log_repartition(
struct part *parts, size_t nr_parts, int *counts, struct logger_writer *log, int nr_nodes, int sending, struct part *parts,
struct gpart *gparts, size_t nr_gparts, int *g_counts, struct xpart *xparts, size_t nr_parts, int *counts,
struct spart *sparts, size_t nr_sparts, int *s_counts,
struct bpart *bparts, size_t nr_bparts, int *b_counts);
void logger_log_after_communcations(
struct part *parts, size_t nr_parts, int *counts,
struct gpart *gparts, size_t nr_gparts, int *g_counts, struct gpart *gparts, size_t nr_gparts, int *g_counts,
struct spart *sparts, size_t nr_sparts, int *s_counts, struct spart *sparts, size_t nr_sparts, int *s_counts,
struct bpart *bparts, size_t nr_bparts, int *b_counts); struct bpart *bparts, size_t nr_bparts, int *b_counts);
......
...@@ -654,7 +654,7 @@ void space_regrid(struct space *s, int verbose) { ...@@ -654,7 +654,7 @@ void space_regrid(struct space *s, int verbose) {
} }
/* Re-distribute the particles to their new nodes. */ /* Re-distribute the particles to their new nodes. */
engine_redistribute(s->e); engine_redistribute(s->e, /* initial_redistribute */ 0);
/* Make the proxies. */ /* Make the proxies. */
engine_makeproxies(s->e); engine_makeproxies(s->e);
...@@ -672,7 +672,7 @@ void space_regrid(struct space *s, int verbose) { ...@@ -672,7 +672,7 @@ void space_regrid(struct space *s, int verbose) {
partition_restore_celllist(s, s->e->reparttype); partition_restore_celllist(s, s->e->reparttype);
/* Now re-distribute the particles, should just add to cells? */ /* Now re-distribute the particles, should just add to cells? */
engine_redistribute(s->e); engine_redistribute(s->e, /* initial_redistribute */ 0);
/* Make the proxies. */ /* Make the proxies. */
engine_makeproxies(s->e); engine_makeproxies(s->e);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment