From 6e8bd998e573cf4859fe9728a91ca2d33aabc5f6 Mon Sep 17 00:00:00 2001
From: loikki <loic.hausammann@protonmail.ch>
Date: Wed, 27 Nov 2019 23:35:32 +0100
Subject: [PATCH] Logger: implementing strays communication

---
 src/engine.c              | 56 +++++++++++++++++++++-
 src/engine_redistribute.c | 16 +++++++
 src/logger.c              | 97 +++++++++++++++++++++++++++++++++++++--
 src/logger.h              | 29 ++++++++++++
 src/logger_io.c           |  8 +++-
 src/runner_others.c       |  2 +-
 6 files changed, 199 insertions(+), 9 deletions(-)

diff --git a/src/engine.c b/src/engine.c
index 61c720f34e..1e500b9deb 100644
--- a/src/engine.c
+++ b/src/engine.c
@@ -453,6 +453,21 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
     /* Load the part and xpart into the proxy. */
     proxy_parts_load(&e->proxies[pid], &s->parts[offset_parts + k],
                      &s->xparts[offset_parts + k], 1);
+
+#ifdef WITH_LOGGER
+    /* Log the particle when leaving a rank. */
+    logger_log_part(e->log, s->parts[offset_parts + k],
+                    logger_mask_data[logger_x].mask |
+                    logger_mask_data[logger_v].mask |
+                    logger_mask_data[logger_a].mask |
+                    logger_mask_data[logger_u].mask |
+                    logger_mask_data[logger_h].mask |
+                    logger_mask_data[logger_rho].mask |
+                    logger_mask_data[logger_consts].mask |
+                    logger_mask_data[logger_special_flags].mask,
+                    s->xparts[offset_parts + k].logger_data.last_offset,
+                    logger_generate_flag(logger_flag_mpi, node_id));
+#endif
   }
 
   /* Put the sparts into the corresponding proxies. */
@@ -488,6 +503,17 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
 
     /* Load the spart into the proxy */
     proxy_sparts_load(&e->proxies[pid], &s->sparts[offset_sparts + k], 1);
+
+#ifdef WITH_LOGGER
+    /* Log the particle when leaving a rank. */
+    logger_log_spart(e->log, s->sparts[offset_sparts + k],
+                     logger_mask_data[logger_x].mask |
+                     logger_mask_data[logger_v].mask |
+                     logger_mask_data[logger_consts].mask |
+                     logger_mask_data[logger_special_flags].mask,
+                     s->sparts[offset_parts + k].logger_data.last_offset,
+                     logger_generate_flag(logger_flag_mpi, node_id));
+#endif
   }
 
   /* Put the bparts into the corresponding proxies. */
@@ -523,6 +549,10 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
 
     /* Load the bpart into the proxy */
     proxy_bparts_load(&e->proxies[pid], &s->bparts[offset_bparts + k], 1);
+
+#ifdef WITH_LOGGER
+    error("TODO");
+#endif
   }
 
   /* Put the gparts into the corresponding proxies. */
@@ -552,6 +582,22 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
 
     /* Load the gpart into the proxy */
     proxy_gparts_load(&e->proxies[pid], &s->gparts[offset_gparts + k], 1);
+
+#ifdef WITH_LOGGER
+    /* Write only the dark matter particles */
+    if (gp->type == swift_type_dark_matter) {
+
+      /* Log the particle when leaving a rank. */
+      logger_log_gpart(e->log, s->gparts[offset_gparts + k],
+                       logger_mask_data[logger_x].mask |
+                       logger_mask_data[logger_v].mask |
+                       logger_mask_data[logger_a].mask |
+                       logger_mask_data[logger_consts].mask |
+                       logger_mask_data[logger_special_flags].mask,
+                       s->sparts[offset_parts + k].logger_data.last_offset,
+                       logger_generate_flag(logger_flag_mpi, node_id));
+    }
+#endif
   }
 
   /* Launch the proxies. */
@@ -771,6 +817,14 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
              sizeof(struct spart) * prox->nr_sparts_in);
       memcpy(&s->bparts[offset_bparts + count_bparts], prox->bparts_in,
              sizeof(struct bpart) * prox->nr_bparts_in);
+
+#ifdef WITH_LOGGER
+      logger_log_recv_strays(e->log, &s->parts[offset_parts + count_parts], prox->nr_parts_in,
+                             &s->gparts[offset_gparts + count_gparts], prox->nr_gparts_in,
+                             &s->sparts[offset_sparts + count_sparts], prox->nr_sparts_in,
+                             &s->bparts[offset_bparts + count_bparts], prox->nr_bparts_in,
+                             prox->nodeID);
+#endif
       /* for (int k = offset; k < offset + count; k++)
          message(
             "received particle %lli, x=[%.3e %.3e %.3e], h=%.3e, from node %i.",
@@ -3297,7 +3351,7 @@ void engine_dump_snapshot(struct engine *e) {
  */
 void engine_dump_index(struct engine *e) {
 
-#if defined(WITH_LOGGER) && !defined(WITH_MPI)
+#if defined(WITH_LOGGER)
   struct clocks_time time1, time2;
   clocks_gettime(&time1);
 
diff --git a/src/engine_redistribute.c b/src/engine_redistribute.c
index 9e9ce4842a..e7d4bee5eb 100644
--- a/src/engine_redistribute.c
+++ b/src/engine_redistribute.c
@@ -976,6 +976,14 @@ void engine_redistribute(struct engine *e) {
   for (int k = 0; k < nr_nodes; k++)
     nr_bparts_new += b_counts[k * nr_nodes + nodeID];
 
+#ifdef WITH_LOGGER
+  /* Log the particles before sending them out */
+  logger_log_before_communcations(s->parts, nr_parts_new, counts,
+                                  s->gparts, nr_gparts_new, g_counts,
+                                  s->sparts, nr_sparts_new, s_counts,
+                                  s->bparts, nr_bparts_new, b_counts);
+#endif
+
   /* Now exchange the particles, type by type to keep the memory required
    * under control. */
 
@@ -1028,6 +1036,14 @@ void engine_redistribute(struct engine *e) {
   /* All particles have now arrived. Time for some final operations on the
      stuff we just received */
 
+#ifdef WITH_LOGGER
+  /* Log the received particles */
+  logger_log_after_communcations(s->parts, s->nr_parts, counts,
+                                 s->gparts, s->nr_gparts, gcounts,
+                                 s->sparts, s->nr_sparts, scounts,
+                                 s->bparts, s->nr_bparts, bcounts);
+#endif
+
   /* Restore the part<->gpart and spart<->gpart links.
    * Generate indices and counts for threadpool tasks. Note we process a node
    * at a time. */
diff --git a/src/logger.c b/src/logger.c
index d31c5b58b1..75005095ab 100644
--- a/src/logger.c
+++ b/src/logger.c
@@ -523,6 +523,15 @@ void logger_ensure_size(struct logger_writer *log, size_t total_nr_parts,
   dump_ensure(&log->dump, limit, log->buffer_scale * limit);
 }
 
+/** @brief Generate the name of the dump files
+ *
+ * @param log The #logger_writer.
+ * @param filename The filename of the dump file.
+ */
+void logger_get_dump_name(struct logger_writer *log, char *filename) {
+  sprintf(filename, "%s_%04i.dump", log->base_name, engine_rank);
+}
+
 /**
  * @brief intialize the logger structure
  *
@@ -548,8 +557,7 @@ void logger_init(struct logger_writer *log, struct swift_params *params) {
 
   /* generate dump filename. */
   char logger_name_file[PARSER_MAX_LINE_SIZE];
-  strcpy(logger_name_file, log->base_name);
-  strcat(logger_name_file, ".dump");
+  logger_get_dump_name(log, logger_name_file);
 
   /* Compute max size for a particle chunk. */
   int max_size = logger_offset_size + logger_mask_size;
@@ -836,6 +844,86 @@ int logger_read_timestamp(unsigned long long int *t, double *time,
   return mask;
 }
 
+
+#ifdef WITH_MPI
+/**
+ * @brief Log all the particles leaving the current rank.
+ *
+ * @param parts The list of #part.
+ * @param nr_parts The number of parts.
+ * @param count The number of parts in each ranks.
+ * @param gparts The list of #gpart.
+ * @param nr_gparts The number of gparts.
+ * @param gcount The number of gparts in each ranks.
+ * @param sparts The list of #spart.
+ * @param nr_sparts The number of sparts.
+ * @param s_counts The number of sparts in each ranks.
+ * @param bparts The list of #bpart.
+ * @param nr_bparts The number of bparts.
+ * @param b_counts The number of bparts in each ranks.
+ *
+ */
+void logger_log_before_communcations(
+    struct part *parts, size_t nr_parts, int *counts,
+    struct gpart *gparts, size_t nr_gparts, int *g_counts,
+    struct spart *sparts, size_t nr_sparts, int *s_counts,
+    struct bpart *bparts, size_t nr_bparts, int *b_counts) {
+  error("TODO");
+}
+
+/**
+ * @brief Log all the particles arriving in the current rank.
+ *
+ * @param parts The list of #part.
+ * @param nr_parts The number of parts.
+ * @param count The number of parts in each ranks.
+ * @param gparts The list of #gpart.
+ * @param nr_gparts The number of gparts.
+ * @param gcount The number of gparts in each ranks.
+ * @param sparts The list of #spart.
+ * @param nr_sparts The number of sparts.
+ * @param s_counts The number of sparts in each ranks.
+ * @param bparts The list of #bpart.
+ * @param nr_bparts The number of bparts.
+ * @param b_counts The number of bparts in each ranks.
+ *
+ */
+void logger_log_after_communcations(
+    struct part *parts, size_t nr_parts, int *counts,
+    struct gpart *gparts, size_t nr_gparts, int *g_counts,
+    struct spart *sparts, size_t nr_sparts, int *s_counts,
+    struct bpart *bparts, size_t nr_bparts, int *b_counts) {
+  error("TODO");
+}
+
+/**
+ * @brief Log all the particles arriving in the current rank.
+ *
+ * @param parts The list of #part.
+ * @param nr_parts The number of parts.
+ * @param count The number of parts in each ranks.
+ * @param gparts The list of #gpart.
+ * @param nr_gparts The number of gparts.
+ * @param gcount The number of gparts in each ranks.
+ * @param sparts The list of #spart.
+ * @param nr_sparts The number of sparts.
+ * @param s_counts The number of sparts in each ranks.
+ * @param bparts The list of #bpart.
+ * @param nr_bparts The number of bparts.
+ * @param b_counts The number of bparts in each ranks.
+ *
+ */
+void logger_log_recv_strays(
+    struct logger_writer *log,
+    struct part *parts, size_t nr_parts, int *counts,
+    struct gpart *gparts, size_t nr_gparts, int *g_counts,
+    struct spart *sparts, size_t nr_sparts, int *s_counts,
+    struct bpart *bparts, size_t nr_bparts, int *b_counts) {
+  error("TODO");
+}
+
+#endif
+
 /**
  * @brief Write a swift_params struct to the given FILE as a stream of bytes.
  *
@@ -861,10 +949,9 @@ void logger_struct_restore(struct logger_writer *log, FILE *stream) {
 
   /* generate dump filename */
   char logger_name_file[PARSER_MAX_LINE_SIZE];
-  strcpy(logger_name_file, log->base_name);
-  strcat(logger_name_file, ".dump");
+  logger_get_dump_name(log, logger_name_file);
 
-  dump_restart(&log->dump, logger_name_file);
+  dump_restart(&log->dump, logger_name_file, log->dump.size);
 }
 
 #endif /* WITH_LOGGER */
diff --git a/src/logger.h b/src/logger.h
index 54c1975816..12fc5db62b 100644
--- a/src/logger.h
+++ b/src/logger.h
@@ -94,6 +94,11 @@ enum logger_masks_number {
   logger_count_mask = 9, /* Need to be the last. */
 } __attribute__((packed));
 
+enum logger_special_flags {
+  logger_flag_change_type = 0,
+  logger_flag_mpi = 1,
+} __attribute__((packed));
+
 struct mask_data {
   /* Number of bytes for a mask. */
   int size;
@@ -179,6 +184,30 @@ int logger_read_timestamp(unsigned long long int *t, double *time,
 void logger_struct_dump(const struct logger_writer *log, FILE *stream);
 void logger_struct_restore(struct logger_writer *log, FILE *stream);
 
+
+int logger_generate_flag(enum logger_special_flags flag, int data) {
+#ifdef SWIFT_DEBUG_CHECKS
+  if (flag & 0xFFFFFF00) {
+    error("The special flag in the logger cannot be larger than 1 byte.");
+  }
+#endif
+  return (flag << (3 * 8)) | (data & 0xFFFFFF);
+}
+
+#ifdef WITH_MPI
+void logger_log_before_communcations(
+  struct part *parts, size_t nr_parts, int *counts,
+  struct gpart *gparts, size_t nr_gparts, int *g_counts,
+  struct spart *sparts, size_t nr_sparts, int *s_counts,
+  struct bpart *bparts, size_t nr_bparts, int *b_counts);
+
+void logger_log_after_communcations(
+  struct part *parts, size_t nr_parts, int *counts,
+  struct gpart *gparts, size_t nr_gparts, int *g_counts,
+  struct spart *sparts, size_t nr_sparts, int *s_counts,
+  struct bpart *bparts, size_t nr_bparts, int *b_counts);
+#endif
+
 /**
  * @brief Initialize the logger data for a particle.
  *
diff --git a/src/logger_io.c b/src/logger_io.c
index 8921304c06..2b4deb407b 100644
--- a/src/logger_io.c
+++ b/src/logger_io.c
@@ -190,8 +190,8 @@ void logger_write_index_file(struct logger_writer* log, struct engine* e) {
 
   /* File name */
   char fileName[FILENAME_BUFFER_SIZE];
-  snprintf(fileName, FILENAME_BUFFER_SIZE, "%.100s_%04i.index",
-           e->logger->base_name, outputCount);
+  snprintf(fileName, FILENAME_BUFFER_SIZE, "%.100s_%04i_%04i.index",
+           e->logger->base_name, engine_rank, outputCount);
 
   /* Open file */
   FILE* f = NULL;
@@ -366,6 +366,10 @@ void logger_write_index_file(struct logger_writer* log, struct engine* e) {
  * @params e The #engine.
  */
 void logger_write_description(struct logger_writer* log, struct engine* e) {
+  /* Only the master writes the description */
+  if (engine_rank != 0) {
+    return;
+  }
   /* const struct unit_system *internal_units = e->internal_units; */
   /* const struct unit_system *snapshot_units = e->snapshot_units; */
 
diff --git a/src/runner_others.c b/src/runner_others.c
index 6dcc92d8a4..d451ba6f10 100644
--- a/src/runner_others.c
+++ b/src/runner_others.c
@@ -325,7 +325,7 @@ void runner_do_star_formation(struct runner *r, struct cell *c, int timer) {
                                 logger_mask_data[logger_consts].mask |
                                 logger_mask_data[logger_special_flags].mask,
                             &xp->logger_data.last_offset,
-                            /* special flags */ swift_type_stars);
+                            logger_generate_flag(logger_flag_change_type, swift_type_stars));
 #endif
 
             /* Convert the gas particle to a star particle */
-- 
GitLab