diff --git a/examples/main.c b/examples/main.c
index 95784341ab3ad363b90c5a33bed9ffc0885499ce..5da86de221c6edef96f09125d29b039abd30af6f 100644
--- a/examples/main.c
+++ b/examples/main.c
@@ -169,6 +169,7 @@ int main(int argc, char *argv[]) {
   int with_drift_all = 0;
   int with_mpole_reconstruction = 0;
   int with_structure_finding = 0;
+  int with_logger = 0;
   int with_eagle = 0;
   int verbose = 0;
   int nr_threads = 1;
@@ -252,6 +253,8 @@ int main(int argc, char *argv[]) {
                  "Overwrite the CPU "
                  "frequency (Hz) to be used for time measurements.",
                  NULL, 0, 0),
+      OPT_BOOLEAN(0, "logger", &with_logger, "Run with the particle logger.",
+                  NULL, 0, 0),
       OPT_INTEGER('n', "steps", &nsteps,
                   "Execute a fixed number of time steps. When unset use the "
                   "time_end parameter to stop.",
@@ -330,6 +333,15 @@ int main(int argc, char *argv[]) {
   }
 #endif
 
+#if !defined(WITH_LOGGER)
+  if (with_logger) {
+    printf(
+        "Error: the particle logger is not available, please compile with "
+        "--enable-logger.");
+    return 1;
+  }
+#endif
+
 #ifndef HAVE_FE_ENABLE_EXCEPT
   if (with_fp_exceptions) {
     printf("Error: no support for floating point exceptions\n");
@@ -583,12 +595,9 @@ int main(int argc, char *argv[]) {
 #ifdef WITH_MPI
   if (with_mpole_reconstruction && nr_nodes > 1)
     error("Cannot reconstruct m-poles every step over MPI (yet).");
-#ifdef WITH_LOGGER
-  error("Can't run with the particle logger over MPI (yet)");
-#endif
 #endif
 
-  /* Temporary early aborts for modes not supported with hand-vec. */
+    /* Temporary early aborts for modes not supported with hand-vec. */
 #if defined(WITH_VECTORIZATION) && defined(GADGET2_SPH) && \
     !defined(CHEMISTRY_NONE)
   error(
@@ -1143,6 +1152,7 @@ int main(int argc, char *argv[]) {
     if (with_structure_finding)
       engine_policies |= engine_policy_structure_finding;
     if (with_fof) engine_policies |= engine_policy_fof;
+    if (with_logger) engine_policies |= engine_policy_logger;
 
     /* Initialize the engine with the space and policies. */
     if (myrank == 0) clocks_gettime(&tic);
@@ -1205,7 +1215,12 @@ int main(int argc, char *argv[]) {
 #ifdef WITH_MPI
     /* Split the space. */
     engine_split(&e, &initial_partition);
+    /* Turn off the logger to avoid writing the communications */
+    if (with_logger) e.policy &= ~engine_policy_logger;
+
     engine_redistribute(&e);
+    /* Turn it back on */
+    if (with_logger) e.policy |= engine_policy_logger;
 #endif
 
     /* Initialise the particles */
@@ -1213,8 +1228,10 @@ int main(int argc, char *argv[]) {
 
     /* Write the state of the system before starting time integration. */
 #ifdef WITH_LOGGER
-    logger_log_all(e.logger, &e);
-    engine_dump_index(&e);
+    if (e.policy & engine_policy_logger) {
+      logger_log_all(e.logger, &e);
+      engine_dump_index(&e);
+    }
 #endif
     /* Dump initial state snapshot, if not working with an output list */
     if (!e.output_list_snapshots) engine_dump_snapshot(&e);
@@ -1416,11 +1433,16 @@ int main(int argc, char *argv[]) {
       engine_print_stats(&e);
     }
 #ifdef WITH_LOGGER
-    logger_log_all(e.logger, &e);
+    if (e.policy & engine_policy_logger) {
+      logger_log_all(e.logger, &e);
+
+      /* Write a final index file */
+      engine_dump_index(&e);
 
-    /* Write a sentinel timestamp */
-    logger_log_timestamp(e.logger, e.ti_current, e.time,
-                         &e.logger->timestamp_offset);
+      /* Write a sentinel timestamp */
+      logger_log_timestamp(e.logger, e.ti_current, e.time,
+                           &e.logger->timestamp_offset);
+    }
 #endif
 
     /* Write final snapshot? */
diff --git a/src/engine.c b/src/engine.c
index 88ce085f71f3e9d2cd869ac637db8127c0e51c64..c33b38396f48f224d59338a1d873d662679f9100 100644
--- a/src/engine.c
+++ b/src/engine.c
@@ -122,7 +122,8 @@ const char *engine_policy_names[] = {"none",
                                      "black holes",
                                      "fof search",
                                      "time-step limiter",
-                                     "time-step sync"};
+                                     "time-step sync",
+                                     "logger"};
 
 /** The rank of the engine as a global variable (for messages). */
 int engine_rank;
@@ -494,7 +495,6 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
                             size_t *Nbpart) {
 
 #ifdef WITH_MPI
-
   struct space *s = e->s;
   ticks tic = getticks();
 
@@ -540,6 +540,19 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
     /* Load the part and xpart into the proxy. */
     proxy_parts_load(&e->proxies[pid], &s->parts[offset_parts + k],
                      &s->xparts[offset_parts + k], 1);
+
+#ifdef WITH_LOGGER
+    if (e->policy & engine_policy_logger) {
+      const uint32_t logger_flag =
+          logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
+
+      /* Log the particle when leaving a rank. */
+      logger_log_part(
+          e->logger, &s->parts[offset_parts + k], &s->xparts[offset_parts + k],
+          logger_masks_all_part | logger_mask_data[logger_special_flags].mask,
+          logger_flag);
+    }
+#endif
   }
 
   /* Put the sparts into the corresponding proxies. */
@@ -575,6 +588,19 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
 
     /* Load the spart into the proxy */
     proxy_sparts_load(&e->proxies[pid], &s->sparts[offset_sparts + k], 1);
+
+#ifdef WITH_LOGGER
+    if (e->policy & engine_policy_logger) {
+      const uint32_t logger_flag =
+          logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
+
+      /* Log the particle when leaving a rank. */
+      logger_log_spart(
+          e->logger, &s->sparts[offset_sparts + k],
+          logger_masks_all_spart | logger_mask_data[logger_special_flags].mask,
+          logger_flag);
+    }
+#endif
   }
 
   /* Put the bparts into the corresponding proxies. */
@@ -610,6 +636,12 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
 
     /* Load the bpart into the proxy */
     proxy_bparts_load(&e->proxies[pid], &s->bparts[offset_bparts + k], 1);
+
+#ifdef WITH_LOGGER
+    if (e->policy & engine_policy_logger) {
+      error("Not yet implemented.");
+    }
+#endif
   }
 
   /* Put the gparts into the corresponding proxies. */
@@ -639,6 +671,22 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
 
     /* Load the gpart into the proxy */
     proxy_gparts_load(&e->proxies[pid], &s->gparts[offset_gparts + k], 1);
+
+#ifdef WITH_LOGGER
+    /* Write only the dark matter particles */
+    if ((e->policy & engine_policy_logger) &&
+        s->gparts[offset_gparts + k].type == swift_type_dark_matter) {
+
+      const uint32_t logger_flag =
+          logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
+
+      /* Log the particle when leaving a rank. */
+      logger_log_gpart(
+          e->logger, &s->gparts[offset_gparts + k],
+          logger_masks_all_gpart | logger_mask_data[logger_special_flags].mask,
+          logger_flag);
+    }
+#endif
   }
 
   /* Launch the proxies. */
@@ -858,6 +906,41 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
              sizeof(struct spart) * prox->nr_sparts_in);
       memcpy(&s->bparts[offset_bparts + count_bparts], prox->bparts_in,
              sizeof(struct bpart) * prox->nr_bparts_in);
+
+#ifdef WITH_LOGGER
+      if (e->policy & engine_policy_logger) {
+        const uint32_t flag =
+            logger_pack_flags_and_data(logger_flag_mpi_enter, prox->nodeID);
+
+        struct part *parts = &s->parts[offset_parts + count_parts];
+        struct xpart *xparts = &s->xparts[offset_parts + count_parts];
+        struct spart *sparts = &s->sparts[offset_sparts + count_sparts];
+        struct gpart *gparts = &s->gparts[offset_gparts + count_gparts];
+
+        /* Log the gas particles */
+        logger_log_parts(
+            e->logger, parts, xparts, prox->nr_parts_in,
+            logger_masks_all_part | logger_mask_data[logger_special_flags].mask,
+            flag);
+
+        /* Log the stellar particles */
+        logger_log_sparts(e->logger, sparts, prox->nr_sparts_in,
+                          logger_masks_all_spart |
+                              logger_mask_data[logger_special_flags].mask,
+                          flag);
+
+        /* Log the gparts */
+        logger_log_gparts(e->logger, gparts, prox->nr_gparts_in,
+                          logger_masks_all_gpart |
+                              logger_mask_data[logger_special_flags].mask,
+                          flag);
+
+        /* Log the bparts */
+        if (prox->nr_bparts_in > 0) {
+          error("TODO");
+        }
+      }
+#endif
       /* for (int k = offset; k < offset + count; k++)
          message(
             "received particle %lli, x=[%.3e %.3e %.3e], h=%.3e, from node %i.",
@@ -1482,7 +1565,9 @@ int engine_estimate_nr_tasks(const struct engine *e) {
   }
 #if defined(WITH_LOGGER)
   /* each cell logs its particles */
-  n1 += 1;
+  if (e->policy & engine_policy_logger) {
+    n1 += 1;
+  }
 #endif
 
 #ifdef WITH_MPI
@@ -2073,13 +2158,15 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs,
     cooling_update(e->cosmology, e->cooling_func, e->s);
 
 #ifdef WITH_LOGGER
-  /* Mark the first time step in the particle logger file. */
-  logger_log_timestamp(e->logger, e->ti_current, e->time,
-                       &e->logger->timestamp_offset);
-  /* Make sure that we have enough space in the particle logger file
-   * to store the particles in current time step. */
-  logger_ensure_size(e->logger, s->nr_parts, s->nr_gparts, s->nr_sparts);
-  logger_write_description(e->logger, e);
+  if (e->policy & engine_policy_logger) {
+    /* Mark the first time step in the particle logger file. */
+    logger_log_timestamp(e->logger, e->ti_current, e->time,
+                         &e->logger->timestamp_offset);
+    /* Make sure that we have enough space in the particle logger file
+     * to store the particles in current time step. */
+    logger_ensure_size(e->logger, s->nr_parts, s->nr_gparts, s->nr_sparts);
+    logger_write_description(e->logger, e);
+  }
 #endif
 
   /* Now, launch the calculation */
@@ -2396,13 +2483,15 @@ void engine_step(struct engine *e) {
   }
 
 #ifdef WITH_LOGGER
-  /* Mark the current time step in the particle logger file. */
-  logger_log_timestamp(e->logger, e->ti_current, e->time,
-                       &e->logger->timestamp_offset);
-  /* Make sure that we have enough space in the particle logger file
-   * to store the particles in current time step. */
-  logger_ensure_size(e->logger, e->s->nr_parts, e->s->nr_gparts,
-                     e->s->nr_sparts);
+  if (e->policy & engine_policy_logger) {
+    /* Mark the current time step in the particle logger file. */
+    logger_log_timestamp(e->logger, e->ti_current, e->time,
+                         &e->logger->timestamp_offset);
+    /* Make sure that we have enough space in the particle logger file
+     * to store the particles in current time step. */
+    logger_ensure_size(e->logger, e->s->nr_parts, e->s->nr_gparts,
+                       e->s->nr_sparts);
+  }
 #endif
 
   /* Are we drifting everything (a la Gadget/GIZMO) ? */
@@ -2523,7 +2612,9 @@ void engine_step(struct engine *e) {
 
   engine_check_for_dumps(e);
 #ifdef WITH_LOGGER
-  engine_check_for_index_dump(e);
+  if (e->policy & engine_policy_logger) {
+    engine_check_for_index_dump(e);
+  }
 #endif
 
   TIMER_TOC2(timer_step);
@@ -3424,7 +3515,7 @@ void engine_dump_snapshot(struct engine *e) {
  */
 void engine_dump_index(struct engine *e) {
 
-#if defined(WITH_LOGGER) && !defined(WITH_MPI)
+#if defined(WITH_LOGGER)
   struct clocks_time time1, time2;
   clocks_gettime(&time1);
 
@@ -3710,8 +3801,10 @@ void engine_init(struct engine *e, struct space *s, struct swift_params *params,
   e->total_nr_tasks = 0;
 
 #if defined(WITH_LOGGER)
-  e->logger = (struct logger_writer *)malloc(sizeof(struct logger_writer));
-  logger_init(e->logger, params);
+  if (e->policy & engine_policy_logger) {
+    e->logger = (struct logger_writer *)malloc(sizeof(struct logger_writer));
+    logger_init(e->logger, params);
+  }
 #endif
 
   /* Make the space link back to the engine. */
@@ -4233,7 +4326,7 @@ void engine_config(int restart, int fof, struct engine *e,
 #endif
 
 #if defined(WITH_LOGGER)
-    if (e->nodeID == 0)
+    if ((e->policy & engine_policy_logger) && e->nodeID == 0)
       message(
           "WARNING: There is currently no way of predicting the output "
           "size, please use it carefully");
@@ -4456,7 +4549,7 @@ void engine_config(int restart, int fof, struct engine *e,
   }
 
 #ifdef WITH_LOGGER
-  if (!restart) {
+  if ((e->policy & engine_policy_logger) && !restart) {
     /* Write the particle logger header */
     logger_write_file_header(e->logger);
   }
@@ -4982,8 +5075,10 @@ void engine_clean(struct engine *e, const int fof) {
 
   swift_free("links", e->links);
 #if defined(WITH_LOGGER)
-  logger_free(e->logger);
-  free(e->logger);
+  if (e->policy & engine_policy_logger) {
+    logger_free(e->logger);
+    free(e->logger);
+  }
 #endif
   scheduler_clean(&e->sched);
   space_clean(e->s);
@@ -5064,7 +5159,9 @@ void engine_struct_dump(struct engine *e, FILE *stream) {
   if (e->output_list_stf) output_list_struct_dump(e->output_list_stf, stream);
 
 #ifdef WITH_LOGGER
-  logger_struct_dump(e->logger, stream);
+  if (e->policy & engine_policy_logger) {
+    logger_struct_dump(e->logger, stream);
+  }
 #endif
 }
 
@@ -5211,10 +5308,12 @@ void engine_struct_restore(struct engine *e, FILE *stream) {
   }
 
 #ifdef WITH_LOGGER
-  struct logger_writer *log =
-      (struct logger_writer *)malloc(sizeof(struct logger_writer));
-  logger_struct_restore(log, stream);
-  e->logger = log;
+  if (e->policy & engine_policy_logger) {
+    struct logger_writer *log =
+        (struct logger_writer *)malloc(sizeof(struct logger_writer));
+    logger_struct_restore(log, stream);
+    e->logger = log;
+  }
 #endif
 
 #ifdef EOS_PLANETARY
diff --git a/src/engine.h b/src/engine.h
index f4de5d6a76205bda533e97790e4954a1d801e08b..ecf9979b57dc4b382b8772ca9a34002387bc8c3e 100644
--- a/src/engine.h
+++ b/src/engine.h
@@ -77,9 +77,10 @@ enum engine_policy {
   engine_policy_black_holes = (1 << 19),
   engine_policy_fof = (1 << 20),
   engine_policy_timestep_limiter = (1 << 21),
-  engine_policy_timestep_sync = (1 << 22)
+  engine_policy_timestep_sync = (1 << 22),
+  engine_policy_logger = (1 << 23),
 };
-#define engine_maxpolicy 23
+#define engine_maxpolicy 24
 extern const char *engine_policy_names[engine_maxpolicy + 1];
 
 /**
diff --git a/src/engine_maketasks.c b/src/engine_maketasks.c
index 9f907f33b4b0173a32292c58e9b1a1fd8bad3312..6b96ad9d8b256e7aff756cf06568ab8cb546d24f 100644
--- a/src/engine_maketasks.c
+++ b/src/engine_maketasks.c
@@ -831,6 +831,9 @@ void engine_make_hierarchical_tasks_common(struct engine *e, struct cell *c) {
   const int with_timestep_limiter =
       (e->policy & engine_policy_timestep_limiter);
   const int with_timestep_sync = (e->policy & engine_policy_timestep_sync);
+#ifdef WITH_LOGGER
+  const int with_logger = e->policy & engine_policy_logger;
+#endif
 
   /* Are we at the top-level? */
   if (c->top == c && c->nodeID == e->nodeID) {
@@ -855,15 +858,20 @@ void engine_make_hierarchical_tasks_common(struct engine *e, struct cell *c) {
                                    c, NULL);
 
 #if defined(WITH_LOGGER)
-      /* Add the hydro logger task. */
-      c->logger = scheduler_addtask(s, task_type_logger, task_subtype_none, 0,
-                                    0, c, NULL);
+      struct task *kick2_or_logger;
+      if (with_logger) {
+        /* Add the hydro logger task. */
+        c->logger = scheduler_addtask(s, task_type_logger, task_subtype_none, 0,
+                                      0, c, NULL);
 
-      /* Add the kick2 dependency */
-      scheduler_addunlock(s, c->kick2, c->logger);
+        /* Add the kick2 dependency */
+        scheduler_addunlock(s, c->kick2, c->logger);
 
-      /* Create a variable in order to avoid to many ifdef */
-      struct task *kick2_or_logger = c->logger;
+        /* Create a variable in order to avoid to many ifdef */
+        kick2_or_logger = c->logger;
+      } else {
+        kick2_or_logger = c->kick2;
+      }
 #else
       struct task *kick2_or_logger = c->kick2;
 #endif
@@ -1070,6 +1078,9 @@ void engine_make_hierarchical_tasks_hydro(struct engine *e, struct cell *c,
   const int with_cooling = (e->policy & engine_policy_cooling);
   const int with_star_formation = (e->policy & engine_policy_star_formation);
   const int with_black_holes = (e->policy & engine_policy_black_holes);
+#ifdef WITH_LOGGER
+  const int with_logger = (e->policy & engine_policy_logger);
+#endif
 
   /* Are we are the level where we create the stars' resort tasks?
    * If the tree is shallow, we need to do this at the super-level if the
@@ -1176,7 +1187,11 @@ void engine_make_hierarchical_tasks_hydro(struct engine *e, struct cell *c,
                                            task_subtype_none, 0, 0, c, NULL);
 
 #ifdef WITH_LOGGER
-        scheduler_addunlock(s, c->super->logger, c->stars.stars_in);
+        if (with_logger) {
+          scheduler_addunlock(s, c->super->logger, c->stars.stars_in);
+        } else {
+          scheduler_addunlock(s, c->super->kick2, c->stars.stars_in);
+        }
 #else
         scheduler_addunlock(s, c->super->kick2, c->stars.stars_in);
 #endif
@@ -1209,7 +1224,13 @@ void engine_make_hierarchical_tasks_hydro(struct engine *e, struct cell *c,
             s, task_type_bh_swallow_ghost3, task_subtype_none, 0, 0, c, NULL);
 
 #ifdef WITH_LOGGER
-        scheduler_addunlock(s, c->super->logger, c->black_holes.black_holes_in);
+        if (with_logger) {
+          scheduler_addunlock(s, c->super->logger,
+                              c->black_holes.black_holes_in);
+        } else {
+          scheduler_addunlock(s, c->super->kick2,
+                              c->black_holes.black_holes_in);
+        }
 #else
         scheduler_addunlock(s, c->super->kick2, c->black_holes.black_holes_in);
 #endif
diff --git a/src/engine_redistribute.c b/src/engine_redistribute.c
index 9e9ce4842a88a65f446f12a74de5039912247516..a312ba4e11c963828c582165fb5ff6308991baa6 100644
--- a/src/engine_redistribute.c
+++ b/src/engine_redistribute.c
@@ -976,6 +976,59 @@ void engine_redistribute(struct engine *e) {
   for (int k = 0; k < nr_nodes; k++)
     nr_bparts_new += b_counts[k * nr_nodes + nodeID];
 
+#ifdef WITH_LOGGER
+  if (e->policy & engine_policy_logger) {
+    /* Log the particles before sending them out */
+    size_t part_offset = 0;
+    size_t spart_offset = 0;
+    size_t gpart_offset = 0;
+    size_t bpart_offset = 0;
+
+    for (int i = 0; i < nr_nodes; i++) {
+      const size_t c_ind = engine_rank * nr_nodes + i;
+
+      /* No need to log the local particles. */
+      if (i == engine_rank) {
+        part_offset += counts[c_ind];
+        spart_offset += s_counts[c_ind];
+        gpart_offset += g_counts[c_ind];
+        bpart_offset += b_counts[c_ind];
+        continue;
+      }
+      const uint32_t flag = logger_pack_flags_and_data(logger_flag_mpi_exit, i);
+
+      /* Log the hydro parts. */
+      logger_log_parts(
+          e->logger, &parts[part_offset], &xparts[part_offset], counts[c_ind],
+          logger_masks_all_part | logger_mask_data[logger_special_flags].mask,
+          flag);
+
+      /* Log the stellar parts. */
+      logger_log_sparts(
+          e->logger, &sparts[spart_offset], s_counts[c_ind],
+          logger_masks_all_spart | logger_mask_data[logger_special_flags].mask,
+          flag);
+
+      /* Log the gparts */
+      logger_log_gparts(
+          e->logger, &gparts[gpart_offset], g_counts[c_ind],
+          logger_masks_all_gpart | logger_mask_data[logger_special_flags].mask,
+          flag);
+
+      /* Log the bparts */
+      if (b_counts[c_ind] > 0) {
+        error("TODO");
+      }
+
+      /* Update the counters */
+      part_offset += counts[c_ind];
+      spart_offset += s_counts[c_ind];
+      gpart_offset += g_counts[c_ind];
+      bpart_offset += b_counts[c_ind];
+    }
+  }
+#endif
+
   /* Now exchange the particles, type by type to keep the memory required
    * under control. */
 
@@ -1028,6 +1081,60 @@ void engine_redistribute(struct engine *e) {
   /* All particles have now arrived. Time for some final operations on the
      stuff we just received */
 
+#ifdef WITH_LOGGER
+  if (e->policy & engine_policy_logger) {
+    size_t part_offset = 0;
+    size_t spart_offset = 0;
+    size_t gpart_offset = 0;
+    size_t bpart_offset = 0;
+
+    for (int i = 0; i < nr_nodes; i++) {
+      const size_t c_ind = i * nr_nodes + engine_rank;
+
+      /* No need to log the local particles. */
+      if (i == engine_rank) {
+        part_offset += counts[c_ind];
+        spart_offset += s_counts[c_ind];
+        gpart_offset += g_counts[c_ind];
+        bpart_offset += b_counts[c_ind];
+        continue;
+      }
+
+      const uint32_t flag =
+          logger_pack_flags_and_data(logger_flag_mpi_enter, i);
+
+      /* Log the hydro parts. */
+      logger_log_parts(
+          e->logger, &s->parts[part_offset], &s->xparts[part_offset], counts[c_ind],
+          logger_masks_all_part | logger_mask_data[logger_special_flags].mask,
+          flag);
+
+      /* Log the stellar parts. */
+      logger_log_sparts(
+          e->logger, &s->sparts[spart_offset], s_counts[c_ind],
+          logger_masks_all_spart | logger_mask_data[logger_special_flags].mask,
+          flag);
+
+      /* Log the gparts */
+      logger_log_gparts(
+          e->logger, &s->gparts[gpart_offset], g_counts[c_ind],
+          logger_masks_all_gpart | logger_mask_data[logger_special_flags].mask,
+          flag);
+
+      /* Log the bparts */
+      if (b_counts[c_ind] > 0) {
+        error("TODO");
+      }
+
+      /* Update the counters */
+      part_offset += counts[c_ind];
+      spart_offset += s_counts[c_ind];
+      gpart_offset += g_counts[c_ind];
+      bpart_offset += b_counts[c_ind];
+    }
+  }
+#endif
+
   /* Restore the part<->gpart and spart<->gpart links.
    * Generate indices and counts for threadpool tasks. Note we process a node
    * at a time. */
diff --git a/src/logger.c b/src/logger.c
index d31c5b58b1e0bdcdc7cafc953b0e9dbd4fb649d4..38056dd680a3b6feeb7291196897722aad760f34 100644
--- a/src/logger.c
+++ b/src/logger.c
@@ -78,7 +78,7 @@ const struct mask_data logger_mask_data[logger_count_mask] = {
     /* Particle's constants: mass (float) and ID (long long). */
     {sizeof(float) + sizeof(long long), 1 << logger_consts, "consts"},
     /* Flag for special cases (e.g. change of MPI rank, star formation, ...) */
-    {sizeof(int), 1 << logger_special_flags, "special flags"},
+    {sizeof(uint32_t), 1 << logger_special_flags, "special flags"},
     /* Simulation time stamp: integertime and double time (e.g. scale
        factor or time). */
     {sizeof(integertime_t) + sizeof(double), 1 << logger_timestamp,
@@ -178,74 +178,44 @@ void logger_log_all(struct logger_writer *log, const struct engine *e) {
 
   /* some constants. */
   const struct space *s = e->s;
-  const unsigned int mask_hydro =
-      logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
-      logger_mask_data[logger_a].mask | logger_mask_data[logger_u].mask |
-      logger_mask_data[logger_h].mask | logger_mask_data[logger_rho].mask |
-      logger_mask_data[logger_consts].mask;
 
   /* loop over all parts. */
   for (size_t i = 0; i < s->nr_parts; i++) {
-    logger_log_part(log, &s->parts[i], mask_hydro,
-                    &s->xparts[i].logger_data.last_offset,
+    logger_log_part(log, &s->parts[i], &s->xparts[i], logger_masks_all_part,
                     /* Special flags */ 0);
-    s->xparts[i].logger_data.steps_since_last_output = 0;
   }
 
-  const unsigned int mask_grav =
-      logger_mask_data[logger_x].mask | logger_mask_data[logger_v].mask |
-      logger_mask_data[logger_a].mask | logger_mask_data[logger_consts].mask;
-
   /* loop over all gparts */
   for (size_t i = 0; i < s->nr_gparts; i++) {
     /* Log only the dark matter */
     if (s->gparts[i].type != swift_type_dark_matter) continue;
 
-    logger_log_gpart(log, &s->gparts[i], mask_grav,
-                     &s->gparts[i].logger_data.last_offset,
+    logger_log_gpart(log, &s->gparts[i], logger_masks_all_gpart,
                      /* Special flags */ 0);
-    s->gparts[i].logger_data.steps_since_last_output = 0;
   }
 
-  const unsigned int mask_stars = logger_mask_data[logger_x].mask |
-                                  logger_mask_data[logger_v].mask |
-                                  logger_mask_data[logger_consts].mask;
-
   /* loop over all sparts */
   for (size_t i = 0; i < s->nr_sparts; i++) {
-    logger_log_spart(log, &s->sparts[i], mask_stars,
-                     &s->sparts[i].logger_data.last_offset,
+    logger_log_spart(log, &s->sparts[i], logger_masks_all_spart,
                      /* Special flags */ 0);
-    s->sparts[i].logger_data.steps_since_last_output = 0;
   }
 
   if (e->total_nr_bparts > 0) error("Not implemented");
 }
 
 /**
- * @brief Dump a #part to the log.
+ * @brief Copy the particle fields into a given buffer.
  *
- * @param log The #logger_writer
- * @param p The #part to dump.
- * @param mask The mask of the data to dump.
- * @param offset Pointer to the offset of the previous log of this particle;
- * @param special_flags The value of the special flag.
- * (return) offset of this log.
+ * @param p The #part to copy.
+ * @param mask The mask for the fields to write.
+ * @param offset The offset to the previous log.
+ * @param offset_new The offset of the current record.
+ * @param buff The buffer to use when writing.
+ * @param special_flags The data for the special flags.
  */
-void logger_log_part(struct logger_writer *log, const struct part *p,
-                     unsigned int mask, size_t *offset,
-                     const int special_flags) {
-
-  /* Make sure we're not writing a timestamp. */
-  if (mask & logger_mask_data[logger_timestamp].mask)
-    error("You should not log particles as timestamps.");
-
-  /* Start by computing the size of the message. */
-  const int size = logger_compute_chunk_size(mask);
-
-  /* Allocate a chunk of memory in the dump of the right size. */
-  size_t offset_new;
-  char *buff = (char *)dump_get(&log->dump, size, &offset_new);
+void logger_copy_part_fields(const struct part *p, unsigned int mask,
+                             size_t *offset, size_t offset_new, char *buff,
+                             const uint32_t special_flags) {
 
   /* Write the header. */
   buff = logger_write_chunk_header(buff, &mask, offset, offset_new);
@@ -254,18 +224,21 @@ void logger_log_part(struct logger_writer *log, const struct part *p,
   if (mask & logger_mask_data[logger_x].mask) {
     memcpy(buff, p->x, logger_mask_data[logger_x].size);
     buff += logger_mask_data[logger_x].size;
+    mask &= ~logger_mask_data[logger_x].mask;
   }
 
   /* Particle velocity as three floats. */
   if (mask & logger_mask_data[logger_v].mask) {
     memcpy(buff, p->v, logger_mask_data[logger_v].size);
     buff += logger_mask_data[logger_v].size;
+    mask &= ~logger_mask_data[logger_v].mask;
   }
 
   /* Particle accelleration as three floats. */
   if (mask & logger_mask_data[logger_a].mask) {
     memcpy(buff, p->a_hydro, logger_mask_data[logger_a].size);
     buff += logger_mask_data[logger_a].size;
+    mask &= ~logger_mask_data[logger_a].mask;
   }
 
 #if defined(GADGET2_SPH)
@@ -274,18 +247,21 @@ void logger_log_part(struct logger_writer *log, const struct part *p,
   if (mask & logger_mask_data[logger_u].mask) {
     memcpy(buff, &p->entropy, logger_mask_data[logger_u].size);
     buff += logger_mask_data[logger_u].size;
+    mask &= ~logger_mask_data[logger_u].mask;
   }
 
   /* Particle smoothing length as a single float. */
   if (mask & logger_mask_data[logger_h].mask) {
     memcpy(buff, &p->h, logger_mask_data[logger_h].size);
     buff += logger_mask_data[logger_h].size;
+    mask &= ~logger_mask_data[logger_h].mask;
   }
 
   /* Particle density as a single float. */
   if (mask & logger_mask_data[logger_rho].mask) {
     memcpy(buff, &p->rho, logger_mask_data[logger_rho].size);
     buff += logger_mask_data[logger_rho].size;
+    mask &= ~logger_mask_data[logger_rho].mask;
   }
 
   /* Particle constants, which is a bit more complicated. */
@@ -296,6 +272,7 @@ void logger_log_part(struct logger_writer *log, const struct part *p,
     const int64_t id = p->id;
     memcpy(buff, &id, sizeof(int64_t));
     buff += sizeof(int64_t);
+    mask &= ~logger_mask_data[logger_consts].mask;
   }
 
 #endif
@@ -304,42 +281,77 @@ void logger_log_part(struct logger_writer *log, const struct part *p,
   if (mask & logger_mask_data[logger_special_flags].mask) {
     memcpy(buff, &special_flags, logger_mask_data[logger_special_flags].size);
     buff += logger_mask_data[logger_special_flags].size;
+    mask &= ~logger_mask_data[logger_special_flags].mask;
   }
 
-  /* Update the log message offset. */
-  *offset = offset_new;
+#ifdef SWIFT_DEBUG_CHECKS
+  if (mask) {
+    error("Requested logging of values not present in parts. %u", mask);
+  }
+#endif
 }
 
 /**
- * @brief Dump a #spart to the log.
+ * @brief Dump a #part to the log.
  *
  * @param log The #logger_writer
- * @param sp The #spart to dump.
+ * @param p The #part to dump.
+ * @param xp The #xpart to dump.
  * @param mask The mask of the data to dump.
- * @param offset Pointer to the offset of the previous log of this particle;
  * @param special_flags The value of the special flag.
- * (return) offset of this log.
  */
-void logger_log_spart(struct logger_writer *log, const struct spart *sp,
-                      unsigned int mask, size_t *offset,
-                      const int special_flags) {
-
-  /* Make sure we're not writing a timestamp. */
-  if (mask & logger_mask_data[logger_timestamp].mask)
-    error("You should not log particles as timestamps.");
+void logger_log_part(struct logger_writer *log, const struct part *p,
+                     struct xpart *xp, unsigned int mask,
+                     const uint32_t special_flags) {
 
-  /* Make sure we're not looging fields not supported by gparts. */
-  if (mask &
-      (logger_mask_data[logger_u].mask | logger_mask_data[logger_rho].mask |
-       logger_mask_data[logger_a].mask))
-    error("Can't log SPH quantities for sparts.");
+  logger_log_parts(log, p, xp, /* count */ 1, mask, special_flags);
+}
 
+/**
+ * @brief Dump a group of #part to the log.
+ *
+ * @param log The #logger_writer
+ * @param sp The #part to dump.
+ * @param mask The mask of the data to dump.
+ * @param count The number of particle to dump.
+ * @param special_flags The value of the special flags.
+ */
+void logger_log_parts(struct logger_writer *log, const struct part *p,
+                      struct xpart *xp, int count, unsigned int mask,
+                      const uint32_t special_flags) {
   /* Start by computing the size of the message. */
   const int size = logger_compute_chunk_size(mask);
 
   /* Allocate a chunk of memory in the dump of the right size. */
   size_t offset_new;
-  char *buff = (char *)dump_get(&log->dump, size, &offset_new);
+  char *buff = (char *)dump_get(&log->dump, count * size, &offset_new);
+
+  for (int i = 0; i < count; i++) {
+    /* Copy everything into the buffer */
+    logger_copy_part_fields(&p[i], mask, &xp[i].logger_data.last_offset,
+                            offset_new, buff, special_flags);
+
+    /* Update the pointers */
+    xp[i].logger_data.last_offset = offset_new;
+    xp[i].logger_data.steps_since_last_output = 0;
+    buff += size;
+    offset_new += size;
+  }
+}
+
+/**
+ * @brief Copy the particle fields into a given buffer.
+ *
+ * @param sp The #spart to copy.
+ * @param mask The mask for the fields to write.
+ * @param offset The offset to the previous log.
+ * @param offset_new The offset of the current record.
+ * @param buff The buffer to use when writing.
+ * @param special_flags The data for the special flags.
+ */
+void logger_copy_spart_fields(const struct spart *sp, unsigned int mask,
+                              size_t *offset, size_t offset_new, char *buff,
+                              const uint32_t special_flags) {
 
   /* Write the header. */
   buff = logger_write_chunk_header(buff, &mask, offset, offset_new);
@@ -348,12 +360,14 @@ void logger_log_spart(struct logger_writer *log, const struct spart *sp,
   if (mask & logger_mask_data[logger_x].mask) {
     memcpy(buff, sp->x, logger_mask_data[logger_x].size);
     buff += logger_mask_data[logger_x].size;
+    mask &= ~logger_mask_data[logger_x].mask;
   }
 
   /* Particle velocity as three floats. */
   if (mask & logger_mask_data[logger_v].mask) {
     memcpy(buff, sp->v, logger_mask_data[logger_v].size);
     buff += logger_mask_data[logger_v].size;
+    mask &= ~logger_mask_data[logger_v].mask;
   }
 
   /* Particle constants, which is a bit more complicated. */
@@ -364,93 +378,176 @@ void logger_log_spart(struct logger_writer *log, const struct spart *sp,
     const int64_t id = sp->id;
     memcpy(buff, &id, sizeof(int64_t));
     buff += sizeof(int64_t);
+    mask &= ~logger_mask_data[logger_consts].mask;
   }
 
   /* Special flags */
   if (mask & logger_mask_data[logger_special_flags].mask) {
     memcpy(buff, &special_flags, logger_mask_data[logger_special_flags].size);
     buff += logger_mask_data[logger_special_flags].size;
+    mask &= ~logger_mask_data[logger_special_flags].mask;
   }
 
-  /* Update the log message offset. */
-  *offset = offset_new;
+#ifdef SWIFT_DEBUG_CHECKS
+  if (mask) {
+    error("Requested logging of values not present in sparts. %u", mask);
+  }
+#endif
 }
 
 /**
- * @brief Dump a #gpart to the log.
+ * @brief Dump a #spart to the log.
  *
  * @param log The #logger_writer
- * @param p The #gpart to dump.
+ * @param sp The #spart to dump.
  * @param mask The mask of the data to dump.
- * @param offset Pointer to the offset of the previous log of this particle;
- * @param special_flags The value of the special flags.
- * (return) offset of this log.
+ * @param special_flags The value of the special flag.
  */
-void logger_log_gpart(struct logger_writer *log, const struct gpart *p,
-                      unsigned int mask, size_t *offset,
-                      const int special_flags) {
+void logger_log_spart(struct logger_writer *log, struct spart *sp,
+                      unsigned int mask, const uint32_t special_flags) {
 
-#ifdef SWIFT_DEBUG_CHECKS
-  if (p->id_or_neg_offset < 0) {
-    error("Cannot log a gpart attached to another particle");
-  }
-#endif
-
-  /* Make sure we're not writing a timestamp. */
-  if (mask & logger_mask_data[logger_timestamp].mask)
-    error("You should not log particles as timestamps.");
-
-  /* Make sure we're not looging fields not supported by gparts. */
-  if (mask &
-      (logger_mask_data[logger_u].mask | logger_mask_data[logger_rho].mask))
-    error("Can't log SPH quantities for gparts.");
+  logger_log_sparts(log, sp, /* count */ 1, mask, special_flags);
+}
 
+/**
+ * @brief Dump a group of #spart to the log.
+ *
+ * @param log The #logger_writer
+ * @param sp The #spart to dump.
+ * @param mask The mask of the data to dump.
+ * @param count The number of particle to dump.
+ * @param special_flags The value of the special flags.
+ */
+void logger_log_sparts(struct logger_writer *log, struct spart *sp, int count,
+                       unsigned int mask, const uint32_t special_flags) {
   /* Start by computing the size of the message. */
   const int size = logger_compute_chunk_size(mask);
 
   /* Allocate a chunk of memory in the dump of the right size. */
   size_t offset_new;
-  char *buff = (char *)dump_get(&log->dump, size, &offset_new);
+  char *buff = (char *)dump_get(&log->dump, count * size, &offset_new);
+
+  for (int i = 0; i < count; i++) {
+    /* Copy everything into the buffer */
+    logger_copy_spart_fields(&sp[i], mask, &sp[i].logger_data.last_offset,
+                             offset_new, buff, special_flags);
+
+    /* Update the pointers */
+    sp[i].logger_data.last_offset = offset_new;
+    sp[i].logger_data.steps_since_last_output = 0;
+    buff += size;
+    offset_new += size;
+  }
+}
+
+/**
+ * @brief Copy the particle fields into a given buffer.
+ *
+ * @param gp The #gpart to copy.
+ * @param mask The mask for the fields to write.
+ * @param offset The offset to the previous log.
+ * @param offset_new The offset of the current record.
+ * @param buff The buffer to use when writing.
+ * @param special_flags The data of the special flag.
+ */
+void logger_copy_gpart_fields(const struct gpart *gp, unsigned int mask,
+                              size_t *offset, size_t offset_new, char *buff,
+                              const uint32_t special_flags) {
 
   /* Write the header. */
   buff = logger_write_chunk_header(buff, &mask, offset, offset_new);
 
   /* Particle position as three doubles. */
   if (mask & logger_mask_data[logger_x].mask) {
-    memcpy(buff, p->x, logger_mask_data[logger_x].size);
+    memcpy(buff, gp->x, logger_mask_data[logger_x].size);
     buff += logger_mask_data[logger_x].size;
+    mask &= ~logger_mask_data[logger_x].mask;
   }
 
   /* Particle velocity as three floats. */
   if (mask & logger_mask_data[logger_v].mask) {
-    memcpy(buff, p->v_full, logger_mask_data[logger_v].size);
+    memcpy(buff, gp->v_full, logger_mask_data[logger_v].size);
     buff += logger_mask_data[logger_v].size;
+    mask &= ~logger_mask_data[logger_v].mask;
   }
 
   /* Particle accelleration as three floats. */
   if (mask & logger_mask_data[logger_a].mask) {
-    memcpy(buff, p->a_grav, logger_mask_data[logger_a].size);
+    memcpy(buff, gp->a_grav, logger_mask_data[logger_a].size);
     buff += logger_mask_data[logger_a].size;
+    mask &= ~logger_mask_data[logger_a].mask;
   }
 
   /* Particle constants, which is a bit more complicated. */
   if (mask & logger_mask_data[logger_consts].mask) {
     // TODO make it dependent of logger_mask_data.
-    memcpy(buff, &p->mass, sizeof(float));
+    memcpy(buff, &gp->mass, sizeof(float));
     buff += sizeof(float);
-    const int64_t id = p->id_or_neg_offset;
+    const int64_t id = gp->id_or_neg_offset;
     memcpy(buff, &id, sizeof(int64_t));
     buff += sizeof(int64_t);
+    mask &= ~logger_mask_data[logger_consts].mask;
   }
 
   /* Special flags */
   if (mask & logger_mask_data[logger_special_flags].mask) {
     memcpy(buff, &special_flags, logger_mask_data[logger_special_flags].size);
     buff += logger_mask_data[logger_special_flags].size;
+    mask &= ~logger_mask_data[logger_special_flags].mask;
   }
 
-  /* Update the log message offset. */
-  *offset = offset_new;
+#ifdef SWIFT_DEBUG_CHECKS
+  if (mask) {
+    error("Requested logging of values not present in gparts. %u", mask);
+  }
+#endif
+}
+
+/**
+ * @brief Dump a #gpart to the log.
+ *
+ * @param log The #logger_writer
+ * @param p The #gpart to dump.
+ * @param mask The mask of the data to dump.
+ * @param special_flags The value of the special flags.
+ */
+void logger_log_gpart(struct logger_writer *log, struct gpart *p,
+                      unsigned int mask, const uint32_t special_flags) {
+  logger_log_gparts(log, p, /* count */ 1, mask, special_flags);
+}
+
+/**
+ * @brief Dump a group of #gpart to the log.
+ *
+ * @param log The #logger_writer
+ * @param p The #gpart to dump.
+ * @param mask The mask of the data to dump.
+ * @param count The number of particle to dump.
+ * @param special_flags The value of the special flags.
+ */
+void logger_log_gparts(struct logger_writer *log, struct gpart *p, int count,
+                       unsigned int mask, const uint32_t special_flags) {
+  /* Start by computing the size of the message. */
+  const int size = logger_compute_chunk_size(mask);
+
+  /* Allocate a chunk of memory in the dump of the right size. */
+  size_t offset_new;
+  char *buff = (char *)dump_get(&log->dump, count * size, &offset_new);
+
+  for (int i = 0; i < count; i++) {
+    /* Log only the dark matter */
+    if (p[i].type != swift_type_dark_matter) continue;
+
+    /* Copy everything into the buffer */
+    logger_copy_gpart_fields(&p[i], mask, &p[i].logger_data.last_offset,
+                             offset_new, buff, special_flags);
+
+    /* Update the pointers */
+    p[i].logger_data.last_offset = offset_new;
+    p[i].logger_data.steps_since_last_output = 0;
+    buff += size;
+    offset_new += size;
+  }
 }
 
 /**
@@ -523,6 +620,15 @@ void logger_ensure_size(struct logger_writer *log, size_t total_nr_parts,
   dump_ensure(&log->dump, limit, log->buffer_scale * limit);
 }
 
+/** @brief Generate the name of the dump files
+ *
+ * @param log The #logger_writer.
+ * @param filename The filename of the dump file.
+ */
+void logger_get_dump_name(struct logger_writer *log, char *filename) {
+  sprintf(filename, "%s_%04i.dump", log->base_name, engine_rank);
+}
+
 /**
  * @brief intialize the logger structure
  *
@@ -548,8 +654,7 @@ void logger_init(struct logger_writer *log, struct swift_params *params) {
 
   /* generate dump filename. */
   char logger_name_file[PARSER_MAX_LINE_SIZE];
-  strcpy(logger_name_file, log->base_name);
-  strcat(logger_name_file, ".dump");
+  logger_get_dump_name(log, logger_name_file);
 
   /* Compute max size for a particle chunk. */
   int max_size = logger_offset_size + logger_mask_size;
@@ -861,8 +966,7 @@ void logger_struct_restore(struct logger_writer *log, FILE *stream) {
 
   /* generate dump filename */
   char logger_name_file[PARSER_MAX_LINE_SIZE];
-  strcpy(logger_name_file, log->base_name);
-  strcat(logger_name_file, ".dump");
+  logger_get_dump_name(log, logger_name_file);
 
   dump_restart(&log->dump, logger_name_file);
 }
diff --git a/src/logger.h b/src/logger.h
index 54c19758163a2595913b907323af9c81c804be96..036106cead6ef2c9057ff785348fc1c6fcbab386 100644
--- a/src/logger.h
+++ b/src/logger.h
@@ -27,6 +27,7 @@
 /* Includes. */
 #include "common_io.h"
 #include "dump.h"
+#include "error.h"
 #include "inline.h"
 #include "timeline.h"
 #include "units.h"
@@ -38,7 +39,7 @@ struct part;
 struct engine;
 
 #define logger_major_version 0
-#define logger_minor_version 2
+#define logger_minor_version 3
 
 /**
  * Logger entries contain messages representing the particle data at a given
@@ -87,11 +88,29 @@ enum logger_masks_number {
   logger_h = 4,
   logger_rho = 5,
   logger_consts = 6,
-  logger_special_flags =
-      7,                 /* < 0 for MPI rank changes, 0 for none,
-                            > 0 for particle type changes, > part_type for deletion */
-  logger_timestamp = 8,  /* expect it to be before count. */
-  logger_count_mask = 9, /* Need to be the last. */
+  logger_special_flags = 7, /* Flag for special cases */
+  logger_timestamp = 8,     /* expect it to be before count. */
+  logger_count_mask = 9,    /* Need to be the last. */
+} __attribute__((packed));
+
+/* Defines some mask for logging all the fields */
+enum logger_masks_all {
+  logger_masks_all_part = (1 << logger_x) | (1 << logger_v) | (1 << logger_a) |
+                          (1 << logger_u) | (1 << logger_h) |
+                          (1 << logger_rho) | (1 << logger_consts),
+  logger_masks_all_gpart = (1 << logger_x) | (1 << logger_v) | (1 << logger_a) |
+                           (1 << logger_consts),
+  logger_masks_all_spart =
+      (1 << logger_x) | (1 << logger_v) | (1 << logger_consts),
+} __attribute__((packed));
+
+enum logger_special_flags {
+  logger_flag_change_type = 1, /* Flag for a change of particle type */
+  logger_flag_mpi_enter, /* Flag for a particle received from another  MPI rank
+                          */
+  logger_flag_mpi_exit,  /* Flag for a particle sent to another MPI rank */
+  logger_flag_delete,    /* Flag for a deleted particle */
+  logger_flag_create,    /* Flag for a created particle */
 } __attribute__((packed));
 
 struct mask_data {
@@ -156,14 +175,19 @@ struct logger_part_data {
 int logger_compute_chunk_size(unsigned int mask);
 void logger_log_all(struct logger_writer *log, const struct engine *e);
 void logger_log_part(struct logger_writer *log, const struct part *p,
-                     unsigned int mask, size_t *offset,
-                     const int special_flags);
-void logger_log_spart(struct logger_writer *log, const struct spart *p,
-                      unsigned int mask, size_t *offset,
-                      const int special_flags);
-void logger_log_gpart(struct logger_writer *log, const struct gpart *p,
-                      unsigned int mask, size_t *offset,
-                      const int special_flags);
+                     struct xpart *xp, unsigned int mask,
+                     const uint32_t special_flags);
+void logger_log_parts(struct logger_writer *log, const struct part *p,
+                      struct xpart *xp, int count, unsigned int mask,
+                      const uint32_t special_flags);
+void logger_log_spart(struct logger_writer *log, struct spart *p,
+                      unsigned int mask, const uint32_t special_flags);
+void logger_log_sparts(struct logger_writer *log, struct spart *sp, int count,
+                       unsigned int mask, const uint32_t special_flags);
+void logger_log_gpart(struct logger_writer *log, struct gpart *p,
+                      unsigned int mask, const uint32_t special_flags);
+void logger_log_gparts(struct logger_writer *log, struct gpart *gp, int count,
+                       unsigned int mask, const uint32_t special_flags);
 void logger_init(struct logger_writer *log, struct swift_params *params);
 void logger_free(struct logger_writer *log);
 void logger_log_timestamp(struct logger_writer *log, integertime_t t,
@@ -179,6 +203,29 @@ int logger_read_timestamp(unsigned long long int *t, double *time,
 void logger_struct_dump(const struct logger_writer *log, FILE *stream);
 void logger_struct_restore(struct logger_writer *log, FILE *stream);
 
+/**
+ * @brief Generate the data for the special flags.
+ *
+ * @param flag The special flag to use.
+ * @param data The data to write in the .
+ */
+INLINE static uint32_t logger_pack_flags_and_data(
+    enum logger_special_flags flag, int data) {
+#ifdef SWIFT_DEBUG_CHECKS
+  if (flag & 0xFFFFFF00) {
+    error(
+        "The special flag in the particle logger cannot be larger than 1 "
+        "byte.");
+  }
+  if (data & ~0xFFFFFF) {
+    error(
+        "The data for the special flag in the particle logger cannot be larger "
+        "than 3 bytes.");
+  }
+#endif
+  return ((uint32_t)flag << (3 * 8)) | (data & 0xFFFFFF);
+}
+
 /**
  * @brief Initialize the logger data for a particle.
  *
diff --git a/src/logger_io.c b/src/logger_io.c
index 8921304c062b75b67344d06ef3c4203a931eba13..2b4deb407bb72c6c144a396bdf587688cbc1c5c6 100644
--- a/src/logger_io.c
+++ b/src/logger_io.c
@@ -190,8 +190,8 @@ void logger_write_index_file(struct logger_writer* log, struct engine* e) {
 
   /* File name */
   char fileName[FILENAME_BUFFER_SIZE];
-  snprintf(fileName, FILENAME_BUFFER_SIZE, "%.100s_%04i.index",
-           e->logger->base_name, outputCount);
+  snprintf(fileName, FILENAME_BUFFER_SIZE, "%.100s_%04i_%04i.index",
+           e->logger->base_name, engine_rank, outputCount);
 
   /* Open file */
   FILE* f = NULL;
@@ -366,6 +366,10 @@ void logger_write_index_file(struct logger_writer* log, struct engine* e) {
  * @params e The #engine.
  */
 void logger_write_description(struct logger_writer* log, struct engine* e) {
+  /* Only the master writes the description */
+  if (engine_rank != 0) {
+    return;
+  }
   /* const struct unit_system *internal_units = e->internal_units; */
   /* const struct unit_system *snapshot_units = e->snapshot_units; */
 
diff --git a/src/runner_others.c b/src/runner_others.c
index 6dcc92d8a492621346f92e455bbb02fae88e1fb0..0ae254b2c08a5f2208bc365fdfdb79605215f123 100644
--- a/src/runner_others.c
+++ b/src/runner_others.c
@@ -315,7 +315,7 @@ void runner_do_star_formation(struct runner *r, struct cell *c, int timer) {
             /* Write the particle */
             /* Logs all the fields request by the user */
             // TODO select only the requested fields
-            logger_log_part(e->logger, p,
+            logger_log_part(e->logger, p, xp,
                             logger_mask_data[logger_x].mask |
                                 logger_mask_data[logger_v].mask |
                                 logger_mask_data[logger_a].mask |
@@ -324,8 +324,8 @@ void runner_do_star_formation(struct runner *r, struct cell *c, int timer) {
                                 logger_mask_data[logger_rho].mask |
                                 logger_mask_data[logger_consts].mask |
                                 logger_mask_data[logger_special_flags].mask,
-                            &xp->logger_data.last_offset,
-                            /* special flags */ swift_type_stars);
+                            logger_pack_flags_and_data(logger_flag_change_type,
+                                                       swift_type_stars));
 #endif
 
             /* Convert the gas particle to a star particle */
@@ -354,11 +354,7 @@ void runner_do_star_formation(struct runner *r, struct cell *c, int timer) {
                                logger_mask_data[logger_x].mask |
                                    logger_mask_data[logger_v].mask |
                                    logger_mask_data[logger_consts].mask,
-                               &sp->logger_data.last_offset,
                                /* special flags */ 0);
-
-              /* Set counter back to zero */
-              sp->logger_data.steps_since_last_output = 0;
 #endif
             }
           }
@@ -610,6 +606,10 @@ void runner_do_logger(struct runner *r, struct cell *c, int timer) {
   const int gcount = c->grav.count;
   const int scount = c->stars.count;
 
+  if (c->black_holes.count != 0) {
+    error("Black holes are not implemented in the logger.");
+  }
+
   /* Anything to do here? */
   if (!cell_is_active_hydro(c, e) && !cell_is_active_gravity(c, e) &&
       !cell_is_active_stars(c, e))
@@ -634,7 +634,7 @@ void runner_do_logger(struct runner *r, struct cell *c, int timer) {
         if (logger_should_write(&xp->logger_data, e->logger)) {
           /* Write particle */
           /* Currently writing everything, should adapt it through time */
-          logger_log_part(e->logger, p,
+          logger_log_part(e->logger, p, xp,
                           logger_mask_data[logger_x].mask |
                               logger_mask_data[logger_v].mask |
                               logger_mask_data[logger_a].mask |
@@ -642,11 +642,7 @@ void runner_do_logger(struct runner *r, struct cell *c, int timer) {
                               logger_mask_data[logger_h].mask |
                               logger_mask_data[logger_rho].mask |
                               logger_mask_data[logger_consts].mask,
-                          &xp->logger_data.last_offset,
                           /* special flags */ 0);
-
-          /* Set counter back to zero */
-          xp->logger_data.steps_since_last_output = 0;
         } else
           /* Update counter */
           xp->logger_data.steps_since_last_output += 1;
@@ -673,11 +669,8 @@ void runner_do_logger(struct runner *r, struct cell *c, int timer) {
                                logger_mask_data[logger_v].mask |
                                logger_mask_data[logger_a].mask |
                                logger_mask_data[logger_consts].mask,
-                           &gp->logger_data.last_offset,
                            /* Special flags */ 0);
 
-          /* Set counter back to zero */
-          gp->logger_data.steps_since_last_output = 0;
         } else
           /* Update counter */
           gp->logger_data.steps_since_last_output += 1;
@@ -700,11 +693,7 @@ void runner_do_logger(struct runner *r, struct cell *c, int timer) {
                            logger_mask_data[logger_x].mask |
                                logger_mask_data[logger_v].mask |
                                logger_mask_data[logger_consts].mask,
-                           &sp->logger_data.last_offset,
                            /* Special flags */ 0);
-
-          /* Set counter back to zero */
-          sp->logger_data.steps_since_last_output = 0;
         } else
           /* Update counter */
           sp->logger_data.steps_since_last_output += 1;