engine.c 187 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
36
#include <sys/stat.h>
37
#include <sys/types.h>
38
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
39

40
41
/* MPI headers. */
#ifdef WITH_MPI
42

43
#include <mpi.h>
44
45
#endif

Angus Lepper's avatar
Angus Lepper committed
46
47
#ifdef HAVE_LIBNUMA
#include <numa.h>
48
49
#endif

50
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
51
#include "engine.h"
52
53

/* Local headers. */
54
#include "active.h"
55
#include "atomic.h"
56
#include "black_holes_properties.h"
57
#include "cell.h"
58
#include "chemistry.h"
59
#include "clocks.h"
60
#include "cooling.h"
61
#include "cosmology.h"
62
63
#include "cycle.h"
#include "debug.h"
64
#include "distributed_io.h"
65
#include "entropy_floor.h"
66
#include "equation_of_state.h"
67
#include "error.h"
Alexei Borissov's avatar
Alexei Borissov committed
68
#include "feedback.h"
Matthieu Schaller's avatar
Matthieu Schaller committed
69
#include "fof.h"
70
#include "gravity.h"
71
#include "gravity_cache.h"
72
#include "hydro.h"
73
#include "line_of_sight.h"
lhausamm's avatar
lhausamm committed
74
#include "logger.h"
lhausamm's avatar
lhausamm committed
75
#include "logger_io.h"
76
#include "map.h"
77
#include "memuse.h"
78
#include "minmax.h"
79
#include "mpiuse.h"
80
#include "multipole_struct.h"
81
82
#include "output_list.h"
#include "output_options.h"
83
#include "parallel_io.h"
84
#include "part.h"
85
#include "partition.h"
James Willis's avatar
James Willis committed
86
#include "profiler.h"
87
#include "proxy.h"
88
#include "restart.h"
89
#include "runner.h"
90
91
#include "serial_io.h"
#include "single_io.h"
92
#include "sort_part.h"
93
#include "star_formation.h"
94
#include "star_formation_logger.h"
Loic Hausammann's avatar
Loic Hausammann committed
95
#include "stars_io.h"
96
#include "statistics.h"
97
#include "timers.h"
98
#include "tools.h"
99
#include "units.h"
100
#include "velociraptor_interface.h"
101
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
102

103
104
105
/* Particle cache size. */
#define CACHE_SIZE 512

106
107
108
109
110
const char *engine_policy_names[] = {"none",
                                     "rand",
                                     "steal",
                                     "keep",
                                     "block",
111
                                     "cpu tight",
112
                                     "mpi",
113
                                     "numa affinity",
114
                                     "hydro",
115
116
117
118
119
                                     "self gravity",
                                     "external gravity",
                                     "cosmological integration",
                                     "drift everything",
                                     "reconstruct multi-poles",
120
                                     "temperature",
121
                                     "cooling",
James Willis's avatar
James Willis committed
122
                                     "stars",
123
                                     "structure finding",
124
                                     "star formation",
125
                                     "feedback",
126
                                     "black holes",
127
                                     "fof search",
128
                                     "time-step limiter",
Loic Hausammann's avatar
Loic Hausammann committed
129
                                     "time-step sync",
130
                                     "logger",
Loic Hausammann's avatar
Loic Hausammann committed
131
132
                                     "line of sight",
                                     "sink"};
Pedro Gonnet's avatar
Pedro Gonnet committed
133

134
135
136
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

137
/** The current step of the engine as a global variable (for messages). */
138
int engine_current_step;
139

140
141
142
extern int engine_max_parts_per_ghost;
extern int engine_max_sparts_per_ghost;

143
144
145
146
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
147
 * @param l A pointer to the #link, will be modified atomically.
148
149
150
151
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
152
void engine_addlink(struct engine *e, struct link **l, struct task *t) {
153

154
155
156
157
158
159
#ifdef SWIFT_DEBUG_CHECKS
  if (t == NULL) {
    error("Trying to link NULL task.");
  }
#endif

160
  /* Get the next free link. */
161
  const size_t ind = atomic_inc(&e->nr_links);
162
  if (ind >= e->size_links) {
163
164
165
    error(
        "Link table overflow. Increase the value of "
        "`Scheduler:links_per_tasks`.");
166
167
  }
  struct link *res = &e->links[ind];
168

169
  /* Set it atomically. */
170
  res->t = t;
171
  res->next = atomic_swap(l, res);
172
}
173

174
/**
175
 * @brief Repartition the cells amongst the nodes.
176
177
178
 *
 * @param e The #engine.
 */
179
void engine_repartition(struct engine *e) {
180

181
#if defined(WITH_MPI) && (defined(HAVE_PARMETIS) || defined(HAVE_METIS))
182

183
184
  ticks tic = getticks();

185
#ifdef SWIFT_DEBUG_CHECKS
186
187
188
189
  /* Be verbose about this. */
  if (e->nodeID == 0 || e->verbose) message("repartitioning space");
  fflush(stdout);

190
  /* Check that all cells have been drifted to the current time */
191
  space_check_drift_point(e->s, e->ti_current, /*check_multipoles=*/0);
192
193
#endif

194
  /* Clear the repartition flag. */
195
  e->forcerepart = 0;
196

197
  /* Nothing to do if only using a single node. Also avoids METIS
198
   * bug that doesn't handle this case well. */
199
  if (e->nr_nodes == 1) return;
200

201
202
  /* Generate the fixed costs include file. */
  if (e->step > 3 && e->reparttype->trigger <= 1.f) {
203
204
205
    task_dump_stats("partition_fixed_costs.h", e,
                    /* task_dump_threshold = */ 0.f,
                    /* header = */ 1, /* allranks = */ 1);
206
207
  }

208
  /* Do the repartitioning. */
209
  partition_repartition(e->reparttype, e->nodeID, e->nr_nodes, e->s,
210
                        e->sched.tasks, e->sched.nr_tasks);
211

212
213
  /* Partitioning requires copies of the particles, so we need to reduce the
   * memory in use to the minimum, we can free the sorting indices and the
214
215
   * tasks as these will be regenerated at the next rebuild. Also the foreign
   * particle arrays can go as these will be regenerated in proxy exchange. */
216
217

  /* Sorting indices. */
218
  if (e->s->cells_top != NULL) space_free_cells(e->s);
219

220
221
222
  /* Report the time spent in the different task categories */
  if (e->verbose) scheduler_report_task_times(&e->sched, e->nr_threads);

223
224
  /* Task arrays. */
  scheduler_free_tasks(&e->sched);
225

226
227
228
  /* Foreign parts. (no need to nullify the cell pointers as the cells
   * will be regenerated) */
  space_free_foreign_parts(e->s, /*clear_cell_pointers=*/0);
229

230
231
232
  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
233
     the parts array, and emitting the sends and receives.
234
235
236
237
238
239
240
241
242
243
244
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

245
246
247
  /* Flag that a repartition has taken place */
  e->step_props |= engine_step_prop_repartition;

248
249
250
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
251
#else
252
  if (e->reparttype->type != REPART_NONE)
253
    error("SWIFT was not compiled with MPI and METIS or ParMETIS support.");
254
255
256

  /* Clear the repartition flag. */
  e->forcerepart = 0;
257
#endif
258
}
259

260
261
262
263
264
265
266
267
268
/**
 * @brief Decide whether trigger a repartition the cells amongst the nodes.
 *
 * @param e The #engine.
 */
void engine_repartition_trigger(struct engine *e) {

#ifdef WITH_MPI

269
  const ticks tic = getticks();
270
271
  static int opened = 0;
  if (e->restarting) opened = 1;
272

273
274
  /* Do nothing if there have not been enough steps since the last repartition
   * as we don't want to repeat this too often or immediately after a
275
276
277
278
   * repartition step, or also immediately on restart. We check all this
   * even when we are not repartitioning as the balance logs can still be
   * interesting. */
  if (e->step - e->last_repartition >= 2 && !e->restarting) {
279

280
    /* If we have fixed costs available and this is step 2 or we are forcing
281
282
283
284
285
286
287
288
289
290
291
292
293
294
     * repartitioning then we do a forced fixed costs repartition regardless. */
    int forced = 0;
    if (e->reparttype->type != REPART_NONE) {
      if (e->reparttype->trigger > 1 ||
          (e->step == 2 && e->reparttype->use_fixed_costs)) {
        if (e->reparttype->trigger > 1) {
          if ((e->step % (int)e->reparttype->trigger) == 0) e->forcerepart = 1;
        } else {
          e->forcerepart = 1;
        }
        e->reparttype->use_ticks = 0;
        forced = 1;
      }
    }
295

296
297
298
299
300
301
302
303
304
305
306
    /* We only check the CPU loads when we have processed a significant number
     * of all particles as we require all tasks to have timings or are
     * interested in the various balances logs. */
    if ((e->updates > 1 &&
         e->updates >= e->total_nr_parts * e->reparttype->minfrac) ||
        (e->g_updates > 1 &&
         e->g_updates >= e->total_nr_gparts * e->reparttype->minfrac)) {

      /* Are we using the task tick timings or fixed costs? */
      if (e->reparttype->use_fixed_costs > 1) {
        e->reparttype->use_ticks = 0;
307
      } else {
308
        e->reparttype->use_ticks = 1;
309
310
      }

311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
      /* Get the resident size of the process for the memory logs. */
      long size, resident, shared, text, library, data, dirty;
      memuse_use(&size, &resident, &shared, &text, &data, &library, &dirty);

      /* Gather it together with the CPU times used by the tasks in the last
       * step. */
      double timemem[3] = {e->usertime_last_step, e->systime_last_step,
                           (double)resident};
      double timemems[e->nr_nodes * 3];
      MPI_Gather(&timemem, 3, MPI_DOUBLE, timemems, 3, MPI_DOUBLE, 0,
                 MPI_COMM_WORLD);
      if (e->nodeID == 0) {

        /* Get the range and mean of the two CPU times and memory. */
        double umintime = timemems[0];
        double umaxtime = timemems[0];

        double smintime = timemems[1];
        double smaxtime = timemems[1];

        double minmem = timemems[2];
        double maxmem = timemems[2];

        double tmintime = umintime + smintime;
        double tmaxtime = umaxtime + smaxtime;

        double usum = timemems[0];
        double ssum = timemems[1];
        double tsum = usum + ssum;

        double msum = timemems[2];

        for (int k = 3; k < e->nr_nodes * 3; k += 3) {
          if (timemems[k] > umaxtime) umaxtime = timemems[k];
          if (timemems[k] < umintime) umintime = timemems[k];

          if (timemems[k + 1] > smaxtime) smaxtime = timemems[k + 1];
          if (timemems[k + 1] < smintime) smintime = timemems[k + 1];

          double total = timemems[k] + timemems[k + 1];
          if (total > tmaxtime) tmaxtime = total;
          if (total < tmintime) tmintime = total;

          usum += timemems[k];
          ssum += timemems[k + 1];
          tsum += total;

          if (timemems[k + 2] > maxmem) maxmem = timemems[k + 2];
          if (timemems[k + 2] < minmem) minmem = timemems[k + 2];
          msum += timemems[k + 2];
        }
        double umean = usum / (double)e->nr_nodes;
        double smean = ssum / (double)e->nr_nodes;
        double tmean = tsum / (double)e->nr_nodes;
        double mmean = msum / (double)e->nr_nodes;

        /* Are we out of balance and need to repartition? */
        /* ---------------------------------------------- */
        double abs_trigger = fabs(e->reparttype->trigger);
        double balance = (umaxtime - umintime) / umean;
        if (e->reparttype->type != REPART_NONE) {

          /* When forced we don't care about the balance. */
          if (!forced) {
            if (balance > abs_trigger) {
              if (e->verbose)
                message("trigger fraction %.3f > %.3f will repartition",
                        balance, abs_trigger);
              e->forcerepart = 1;
            } else {
              if (e->verbose && e->reparttype->type != REPART_NONE)
                message("trigger fraction %.3f =< %.3f will not repartition",
                        balance, abs_trigger);
            }
          }
386

387
        } else {
388
          /* Not repartitioning, would that have been done otherwise? */
389
          if (e->verbose) {
390
391
392
393
394
            if (balance > abs_trigger) {
              message("trigger fraction %.3f > %.3f would have repartitioned",
                      balance, abs_trigger);
            } else {
              message(
395
                  "trigger fraction %.3f =< %.3f would not have repartitioned",
396
397
                  balance, abs_trigger);
            }
398
          }
399
        }
400

401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
        /* Keep logs of all CPU times and resident memory size for debugging
         * load issues. */
        FILE *timelog = NULL;
        FILE *memlog = NULL;
        if (!opened) {
          timelog = fopen("rank_cpu_balance.log", "w");
          fprintf(timelog, "# step rank user sys sum\n");

          memlog = fopen("rank_memory_balance.log", "w");
          fprintf(memlog, "# step rank resident\n");

          opened = 1;
        } else {
          timelog = fopen("rank_cpu_balance.log", "a");
          memlog = fopen("rank_memory_balance.log", "a");
        }

        for (int k = 0; k < e->nr_nodes * 3; k += 3) {

          fprintf(timelog, "%d %d %f %f %f\n", e->step, k / 3, timemems[k],
                  timemems[k + 1], timemems[k] + timemems[k + 1]);

          fprintf(memlog, "%d %d %ld\n", e->step, k / 3, (long)timemems[k + 2]);
424
        }
425
426
427
428
429
430
431
432
433
434
435
436
437
438

        fprintf(timelog, "# %d mean times: %f %f %f\n", e->step, umean, smean,
                tmean);
        if (abs_trigger > 1.f) abs_trigger = 0.f; /* Not relevant. */
        fprintf(timelog,
                "# %d balance: %f, expected: %f (sys: %f, total: %f)\n",
                e->step, balance, abs_trigger, (smaxtime - smintime) / smean,
                (tmaxtime - tmintime) / tmean);

        fclose(timelog);

        fprintf(memlog, "# %d mean resident memory: %f, balance: %f\n", e->step,
                mmean, (maxmem - minmem) / mmean);
        fclose(memlog);
439
      }
440

441
      /* All nodes do this together, so send to other ranks. */
442
      MPI_Bcast(&e->forcerepart, 1, MPI_INT, 0, MPI_COMM_WORLD);
443
444
445
    }

    /* Remember we did this. */
Peter W. Draper's avatar
Peter W. Draper committed
446
    if (e->forcerepart) e->last_repartition = e->step;
447
448
  }

449
450
451
  if (e->verbose)
    message("took %.3f %s", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
452
453
454
#endif
}

455
/**
456
457
458
459
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
460
void engine_exchange_cells(struct engine *e) {
461

462
#ifdef WITH_MPI
463

464
  const int with_gravity = e->policy & engine_policy_self_gravity;
465
  const ticks tic = getticks();
466

467
  /* Exchange the cell structure with neighbouring ranks. */
468
  proxy_cells_exchange(e->proxies, e->nr_proxies, e->s, with_gravity);
469

470
471
472
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
473
474
475
476
477
478
479

#else
  error("SWIFT was not compiled with MPI support.");
#endif
}

/**
480
 * @brief Exchange straying particles with other nodes.
481
482
 *
 * @param e The #engine.
Pedro Gonnet's avatar
Pedro Gonnet committed
483
 * @param offset_parts The index in the parts array as of which the foreign
484
 *        parts reside (i.e. the current number of local #part).
485
 * @param ind_part The foreign #cell ID of each part.
486
 * @param Npart The number of stray parts, contains the number of parts received
487
 *        on return.
Pedro Gonnet's avatar
Pedro Gonnet committed
488
 * @param offset_gparts The index in the gparts array as of which the foreign
489
 *        parts reside (i.e. the current number of local #gpart).
490
 * @param ind_gpart The foreign #cell ID of each gpart.
Pedro Gonnet's avatar
Pedro Gonnet committed
491
 * @param Ngpart The number of stray gparts, contains the number of gparts
Pedro Gonnet's avatar
Pedro Gonnet committed
492
 *        received on return.
493
 * @param offset_sparts The index in the sparts array as of which the foreign
494
 *        parts reside (i.e. the current number of local #spart).
495
496
497
 * @param ind_spart The foreign #cell ID of each spart.
 * @param Nspart The number of stray sparts, contains the number of sparts
 *        received on return.
498
499
500
501
502
 * @param offset_bparts The index in the bparts array as of which the foreign
 *        parts reside (i.e. the current number of local #bpart).
 * @param ind_bpart The foreign #cell ID of each bpart.
 * @param Nbpart The number of stray bparts, contains the number of bparts
 *        received on return.
503
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
504
505
 * Note that this function does not mess-up the linkage between parts and
 * gparts, i.e. the received particles have correct linkeage.
506
 */
507
void engine_exchange_strays(struct engine *e, const size_t offset_parts,
508
509
510
511
512
513
514
                            const int *restrict ind_part, size_t *Npart,
                            const size_t offset_gparts,
                            const int *restrict ind_gpart, size_t *Ngpart,
                            const size_t offset_sparts,
                            const int *restrict ind_spart, size_t *Nspart,
                            const size_t offset_bparts,
                            const int *restrict ind_bpart, size_t *Nbpart) {
515
516

#ifdef WITH_MPI
517
  struct space *s = e->s;
518
  ticks tic = getticks();
519
520

  /* Re-set the proxies. */
Pedro Gonnet's avatar
Pedro Gonnet committed
521
522
523
  for (int k = 0; k < e->nr_proxies; k++) {
    e->proxies[k].nr_parts_out = 0;
    e->proxies[k].nr_gparts_out = 0;
524
    e->proxies[k].nr_sparts_out = 0;
525
    e->proxies[k].nr_bparts_out = 0;
Pedro Gonnet's avatar
Pedro Gonnet committed
526
  }
527

528
  /* Put the parts into the corresponding proxies. */
529
  for (size_t k = 0; k < *Npart; k++) {
530
531
532
533

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_part[k] == -1) continue;

534
    /* Get the target node and proxy ID. */
535
    const int node_id = e->s->cells_top[ind_part[k]].nodeID;
536
537
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
538
    const int pid = e->proxy_ind[node_id];
539
    if (pid < 0) {
540
541
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
542
          "id=%lld, x=[%e,%e,%e].",
Pedro Gonnet's avatar
Pedro Gonnet committed
543
544
545
          node_id, s->parts[offset_parts + k].id,
          s->parts[offset_parts + k].x[0], s->parts[offset_parts + k].x[1],
          s->parts[offset_parts + k].x[2]);
546
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
547

548
549
    /* Re-link the associated gpart with the buffer offset of the part. */
    if (s->parts[offset_parts + k].gpart != NULL) {
550
551
      s->parts[offset_parts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_parts_out;
552
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
553

554
555
556
557
558
#ifdef SWIFT_DEBUG_CHECKS
    if (s->parts[offset_parts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

559
    /* Load the part and xpart into the proxy. */
560
561
    proxy_parts_load(&e->proxies[pid], &s->parts[offset_parts + k],
                     &s->xparts[offset_parts + k], 1);
562
563

#ifdef WITH_LOGGER
Loic Hausammann's avatar
Loic Hausammann committed
564
    if (e->policy & engine_policy_logger) {
Loic Hausammann's avatar
Format    
Loic Hausammann committed
565
566
      const uint32_t logger_flag =
          logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
Loic Hausammann's avatar
Loic Hausammann committed
567
568

      /* Log the particle when leaving a rank. */
Loic Hausammann's avatar
Format    
Loic Hausammann committed
569
570
571
572
      logger_log_part(
          e->logger, &s->parts[offset_parts + k], &s->xparts[offset_parts + k],
          logger_masks_all_part | logger_mask_data[logger_special_flags].mask,
          logger_flag);
Loic Hausammann's avatar
Loic Hausammann committed
573
    }
574
#endif
575
  }
576

577
  /* Put the sparts into the corresponding proxies. */
578
  for (size_t k = 0; k < *Nspart; k++) {
579
580
581
582
583

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_spart[k] == -1) continue;

    /* Get the target node and proxy ID. */
584
585
586
587
    const int node_id = e->s->cells_top[ind_spart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
588
    if (pid < 0) {
589
590
591
592
593
594
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lld, x=[%e,%e,%e].",
          node_id, s->sparts[offset_sparts + k].id,
          s->sparts[offset_sparts + k].x[0], s->sparts[offset_sparts + k].x[1],
          s->sparts[offset_sparts + k].x[2]);
595
    }
596

597
    /* Re-link the associated gpart with the buffer offset of the spart. */
598
599
600
601
602
    if (s->sparts[offset_sparts + k].gpart != NULL) {
      s->sparts[offset_sparts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_sparts_out;
    }

603
604
605
606
607
#ifdef SWIFT_DEBUG_CHECKS
    if (s->sparts[offset_sparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

608
609
    /* Load the spart into the proxy */
    proxy_sparts_load(&e->proxies[pid], &s->sparts[offset_sparts + k], 1);
610
611

#ifdef WITH_LOGGER
Loic Hausammann's avatar
Loic Hausammann committed
612
    if (e->policy & engine_policy_logger) {
Loic Hausammann's avatar
Format    
Loic Hausammann committed
613
614
      const uint32_t logger_flag =
          logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
Loic Hausammann's avatar
Loic Hausammann committed
615
616

      /* Log the particle when leaving a rank. */
Loic Hausammann's avatar
Format    
Loic Hausammann committed
617
618
619
620
      logger_log_spart(
          e->logger, &s->sparts[offset_sparts + k],
          logger_masks_all_spart | logger_mask_data[logger_special_flags].mask,
          logger_flag);
Loic Hausammann's avatar
Loic Hausammann committed
621
    }
622
#endif
623
  }
624

625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
  /* Put the bparts into the corresponding proxies. */
  for (size_t k = 0; k < *Nbpart; k++) {

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_bpart[k] == -1) continue;

    /* Get the target node and proxy ID. */
    const int node_id = e->s->cells_top[ind_bpart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
    if (pid < 0) {
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lld, x=[%e,%e,%e].",
          node_id, s->bparts[offset_bparts + k].id,
          s->bparts[offset_bparts + k].x[0], s->bparts[offset_bparts + k].x[1],
          s->bparts[offset_bparts + k].x[2]);
    }

    /* Re-link the associated gpart with the buffer offset of the bpart. */
    if (s->bparts[offset_bparts + k].gpart != NULL) {
      s->bparts[offset_bparts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_bparts_out;
    }

#ifdef SWIFT_DEBUG_CHECKS
    if (s->bparts[offset_bparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

    /* Load the bpart into the proxy */
    proxy_bparts_load(&e->proxies[pid], &s->bparts[offset_bparts + k], 1);
658
659

#ifdef WITH_LOGGER
Loic Hausammann's avatar
Loic Hausammann committed
660
    if (e->policy & engine_policy_logger) {
Loic Hausammann's avatar
Loic Hausammann committed
661
      error("Not yet implemented.");
Loic Hausammann's avatar
Loic Hausammann committed
662
    }
663
#endif
664
  }
665

666
667
  /* Put the gparts into the corresponding proxies. */
  for (size_t k = 0; k < *Ngpart; k++) {
668
669
670
671
672

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_gpart[k] == -1) continue;

    /* Get the target node and proxy ID. */
673
674
675
676
    const int node_id = e->s->cells_top[ind_gpart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
677
    if (pid < 0) {
678
679
680
681
682
683
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lli, x=[%e,%e,%e].",
          node_id, s->gparts[offset_gparts + k].id_or_neg_offset,
          s->gparts[offset_gparts + k].x[0], s->gparts[offset_gparts + k].x[1],
          s->gparts[offset_gparts + k].x[2]);
684
685
686
687
688
689
    }

#ifdef SWIFT_DEBUG_CHECKS
    if (s->gparts[offset_gparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif
690
691
692

    /* Load the gpart into the proxy */
    proxy_gparts_load(&e->proxies[pid], &s->gparts[offset_gparts + k], 1);
693
694
695

#ifdef WITH_LOGGER
    /* Write only the dark matter particles */
Loic Hausammann's avatar
Loic Hausammann committed
696
697
    if ((e->policy & engine_policy_logger) &&
        s->gparts[offset_gparts + k].type == swift_type_dark_matter) {
Loic Hausammann's avatar
Loic Hausammann committed
698

Loic Hausammann's avatar
Format    
Loic Hausammann committed
699
700
      const uint32_t logger_flag =
          logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
701
702

      /* Log the particle when leaving a rank. */
Loic Hausammann's avatar
Format    
Loic Hausammann committed
703
704
705
706
      logger_log_gpart(
          e->logger, &s->gparts[offset_gparts + k],
          logger_masks_all_gpart | logger_mask_data[logger_special_flags].mask,
          logger_flag);
707
708
    }
#endif
709
710
  }

711
  /* Launch the proxies. */
712
713
  MPI_Request reqs_in[5 * engine_maxproxies];
  MPI_Request reqs_out[5 * engine_maxproxies];
714
  for (int k = 0; k < e->nr_proxies; k++) {
715
    proxy_parts_exchange_first(&e->proxies[k]);
716
717
718
719
720
    reqs_in[k] = e->proxies[k].req_parts_count_in;
    reqs_out[k] = e->proxies[k].req_parts_count_out;
  }

  /* Wait for each count to come in and start the recv. */
721
  for (int k = 0; k < e->nr_proxies; k++) {
722
    int pid = MPI_UNDEFINED;
723
724
    if (MPI_Waitany(e->nr_proxies, reqs_in, &pid, MPI_STATUS_IGNORE) !=
            MPI_SUCCESS ||
725
726
727
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
728
    proxy_parts_exchange_second(&e->proxies[pid]);
729
  }
730

731
  /* Wait for all the sends to have finished too. */
732
733
734
  if (MPI_Waitall(e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

735
  /* Count the total number of incoming particles and make sure we have
736
     enough space to accommodate them. */
737
738
  int count_parts_in = 0;
  int count_gparts_in = 0;
739
  int count_sparts_in = 0;
740
  int count_bparts_in = 0;
741
742
743
  for (int k = 0; k < e->nr_proxies; k++) {
    count_parts_in += e->proxies[k].nr_parts_in;
    count_gparts_in += e->proxies[k].nr_gparts_in;
744
    count_sparts_in += e->proxies[k].nr_sparts_in;
745
    count_bparts_in += e->proxies[k].nr_bparts_in;
746
747
  }
  if (e->verbose) {
748
749
750
751
752
    message(
        "sent out %zu/%zu/%zu/%zu parts/gparts/sparts/bparts, got %i/%i/%i/%i "
        "back.",
        *Npart, *Ngpart, *Nspart, *Nbpart, count_parts_in, count_gparts_in,
        count_sparts_in, count_bparts_in);
753
  }
754
755

  /* Reallocate the particle arrays if necessary */
756
  if (offset_parts + count_parts_in > s->size_parts) {
757
    s->size_parts = (offset_parts + count_parts_in) * engine_parts_size_grow;
758
759
    struct part *parts_new = NULL;
    struct xpart *xparts_new = NULL;
760
    if (swift_memalign("parts", (void **)&parts_new, part_align,
761
                       sizeof(struct part) * s->size_parts) != 0 ||
762
        swift_memalign("xparts", (void **)&xparts_new, xpart_align,
763
764
                       sizeof(struct xpart) * s->size_parts) != 0)
      error("Failed to allocate new part data.");
765
766
    memcpy(parts_new, s->parts, sizeof(struct part) * offset_parts);
    memcpy(xparts_new, s->xparts, sizeof(struct xpart) * offset_parts);
767
768
    swift_free("parts", s->parts);
    swift_free("xparts", s->xparts);
769
770
    s->parts = parts_new;
    s->xparts = xparts_new;
771
772

    /* Reset the links */
Pedro Gonnet's avatar
Pedro Gonnet committed
773
774
    for (size_t k = 0; k < offset_parts; k++) {
      if (s->parts[k].gpart != NULL) {
775
        s->parts[k].gpart->id_or_neg_offset = -k;
Pedro Gonnet's avatar
Pedro Gonnet committed
776
777
      }
    }
778
  }
779

780
781
782
  if (offset_sparts + count_sparts_in > s->size_sparts) {
    s->size_sparts = (offset_sparts + count_sparts_in) * engine_parts_size_grow;
    struct spart *sparts_new = NULL;
783
    if (swift_memalign("sparts", (void **)&sparts_new, spart_align,
784
785
786
                       sizeof(struct spart) * s->size_sparts) != 0)
      error("Failed to allocate new spart data.");
    memcpy(sparts_new, s->sparts, sizeof(struct spart) * offset_sparts);
787
    swift_free("sparts", s->sparts);
788
    s->sparts = sparts_new;
789
790

    /* Reset the links */
791
    for (size_t k = 0; k < offset_sparts; k++) {
792
793
794
795
796
      if (s->sparts[k].gpart != NULL) {
        s->sparts[k].gpart->id_or_neg_offset = -k;
      }
    }
  }
797

798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
  if (offset_bparts + count_bparts_in > s->size_bparts) {
    s->size_bparts = (offset_bparts + count_bparts_in) * engine_parts_size_grow;
    struct bpart *bparts_new = NULL;
    if (swift_memalign("bparts", (void **)&bparts_new, bpart_align,
                       sizeof(struct bpart) * s->size_bparts) != 0)
      error("Failed to allocate new bpart data.");
    memcpy(bparts_new, s->bparts, sizeof(struct bpart) * offset_bparts);
    swift_free("bparts", s->bparts);
    s->bparts = bparts_new;

    /* Reset the links */
    for (size_t k = 0; k < offset_bparts; k++) {
      if (s->bparts[k].gpart != NULL) {
        s->bparts[k].gpart->id_or_neg_offset = -k;
      }
    }
  }
815

816
  if (offset_gparts + count_gparts_in > s->size_gparts) {
817
    s->size_gparts = (offset_gparts + count_gparts_in) * engine_parts_size_grow;
818
    struct gpart *gparts_new = NULL;
819
    if (swift_memalign("gparts", (void **)&gparts_new, gpart_align,
820
821
822
                       sizeof(struct gpart) * s->size_gparts) != 0)
      error("Failed to allocate new gpart data.");
    memcpy(gparts_new, s->gparts, sizeof(struct gpart) * offset_gparts);
823
    swift_free("gparts", s->gparts);
824
    s->gparts = gparts_new;
825

826
    /* Reset the links */
Pedro Gonnet's avatar
Pedro Gonnet committed
827
    for (size_t k = 0; k < offset_gparts; k++) {
828
      if (s->gparts[k].type == swift_type_gas) {
829
        s->parts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
830
      } else if (s->gparts[k].type == swift_type_stars) {
831
        s->sparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
832
833
      } else if (s->gparts[k].type == swift_type_black_hole) {
        s->bparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
Pedro Gonnet's avatar
Pedro Gonnet committed
834
835
      }
    }
836
  }
837
838

  /* Collect the requests for the particle data from the proxies. */
839
840
  int nr_in = 0, nr_out = 0;
  for (int k = 0; k < e->nr_proxies; k++) {
841
    if (e->proxies[k].nr_parts_in > 0) {
842
843
      reqs_in[5 * k] = e->proxies[k].req_parts_in;
      reqs_in[5 * k + 1] = e->proxies[k].req_xparts_in;
844
845
      nr_in += 2;
    } else {
846
      reqs_in[5 * k] = reqs_in[5 * k + 1] = MPI_REQUEST_NULL;
847
848
    }
    if (e->proxies[k].nr_gparts_in > 0) {
849
      reqs_in[5 * k + 2] = e->proxies[k].req_gparts_in;
850
      nr_in += 1;
851
    } else {
852
      reqs_in[5 * k + 2] = MPI_REQUEST_NULL;
853
    }
854
    if (e->proxies[k].nr_sparts_in > 0) {
855
      reqs_in[5 * k + 3] = e->proxies[k].req_sparts_in;
856
857
      nr_in += 1;
    } else {
858
859
860
861
862
863
864
      reqs_in[5 * k + 3] = MPI_REQUEST_NULL;
    }
    if (e->proxies[k].nr_bparts_in > 0) {
      reqs_in[5 * k + 4] = e->proxies[k].req_bparts_in;
      nr_in += 1;
    } else {
      reqs_in[5 * k + 4] = MPI_REQUEST_NULL;
865
866
    }

867
    if (e->proxies[k].nr_parts_out > 0) {
868
869
      reqs_out[5 * k] = e->proxies[k].req_parts_out;
      reqs_out[5 * k + 1] = e->proxies[k].req_xparts_out;
870
871
      nr_out += 2;
    } else {
872
      reqs_out[5 * k] = reqs_out[5 * k + 1] = MPI_REQUEST_NULL;
873
874
    }
    if (e->proxies[k].nr_gparts_out > 0) {
875
      reqs_out[5 * k + 2] = e->proxies[k].req_gparts_out;
876
877
      nr_out += 1;
    } else {
878
      reqs_out[5 * k + 2] = MPI_REQUEST_NULL;
879
880
    }
    if (e->proxies[k].nr_sparts_out > 0) {
881
882
883
884
885
886
887
      reqs_out[5 * k + 3] = e->proxies[k].req_sparts_out;
      nr_out += 1;
    } else {
      reqs_out[5 * k + 3] = MPI_REQUEST_NULL;
    }
    if (e->proxies[k].nr_bparts_out > 0) {
      reqs_out[5 * k + 4] = e->proxies[k].req_bparts_out;
888
      nr_out += 1;
889
    } else {
890
      reqs_out[5 * k + 4] = MPI_REQUEST_NULL;
891
    }
892
893
894
895
  }

  /* Wait for each part array to come in and collect the new
     parts from the proxies. */
896
  int count_parts = 0, count_gparts = 0, count_sparts = 0, count_bparts = 0;
897
  for (int k = 0; k < nr_in; k++) {
898
    int err, pid;
899
    if ((err = MPI_Waitany(5 * e->nr_proxies, reqs_in, &pid,
900
                           MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
901
902
903
904
905
906
      char buff[MPI_MAX_ERROR_STRING];
      int res;
      MPI_Error_string(err, buff, &res);
      error("MPI_Waitany failed (%s).", buff);
    }
    if (pid == MPI_UNDEFINED) break;
907
908
    // message( "request from proxy %i has arrived." , pid / 5 );
    pid = 5 * (pid / 5);
Pedro Gonnet's avatar
Pedro Gonnet committed
909

910
    /* If all the requests for a given proxy have arrived... */
911
912
    if (reqs_in[pid + 0] == MPI_REQUEST_NULL &&
        reqs_in[pid + 1] == MPI_REQUEST_NULL &&
913
        reqs_in[pid + 2] == MPI_REQUEST_NULL &&
914
        reqs_in[pid + 3] == MPI_REQUEST_NULL &&
915
        reqs_in[pid + 4] == MPI_REQUEST_NULL) {
916
      /* Copy the particle data to the part/xpart/gpart arrays. */
917
      struct proxy *prox = &e->proxies[pid / 5];
918
919
920
921
922
923
      memcpy(&s->parts[offset_parts + count_parts], prox->parts_in,
             sizeof(struct part) * prox->nr_parts_in);
      memcpy(&s->xparts[offset_parts + count_parts], prox->xparts_in,
             sizeof(struct xpart) * prox->nr_parts_in);
      memcpy(&s->gparts[offset_gparts + count_gparts], prox->gparts_in,
             sizeof(struct gpart) * prox->nr_gparts_in);
924
925
      memcpy(&s->sparts[offset_sparts + count_sparts], prox->sparts_in,
             sizeof(struct spart) * prox->nr_sparts_in);
926
927
      memcpy(&s->bparts[offset_bparts + count_bparts], prox->bparts_in,
             sizeof(struct bpart) * prox->nr_bparts_in);
928
929

#ifdef WITH_LOGGER
Loic Hausammann's avatar
Loic Hausammann committed
930
      if (e->policy & engine_policy_logger) {
Loic Hausammann's avatar
Format    
Loic Hausammann committed
931
932
        const uint32_t flag =
            logger_pack_flags_and_data(logger_flag_mpi_enter, prox->nodeID);
Loic Hausammann's avatar
Loic Hausammann committed
933
934
935
936
937
938
939

        struct part *parts = &s->parts[offset_parts + count_parts];
        struct xpart *xparts = &s->xparts[offset_parts + count_parts];
        struct spart *sparts = &s->sparts[offset_sparts + count_sparts];
        struct gpart *gparts = &s->gparts[offset_gparts + count_gparts];

        /* Log the gas particles */
Loic Hausammann's avatar
Format    
Loic Hausammann committed
940
941
942
943
        logger_log_parts(
            e->logger, parts, xparts, prox->nr_parts_in,
            logger_masks_all_part | logger_mask_data[logger_special_flags].mask,
            flag);
Loic Hausammann's avatar
Loic Hausammann committed
944
945

        /* Log the stellar particles */
946
947
        logger_log_sparts(e->logger, sparts, prox->nr_sparts_in,
                          logger_masks_all_spart |
Loic Hausammann's avatar
Format    
Loic Hausammann committed
948
                              logger_mask_data[logger_special_flags].mask,
949
                          flag);
Loic Hausammann's avatar
Loic Hausammann committed
950
951

        /* Log the gparts */
952
953
        logger_log_gparts(e->logger, gparts, prox->nr_gparts_in,
                          logger_masks_all_gpart |
Loic Hausammann's avatar
Format    
Loic Hausammann committed
954
                              logger_mask_data[logger_special_flags].mask,