engine.c 174 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
36
#include <sys/stat.h>
37
#include <sys/types.h>
38
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
39

40
41
/* MPI headers. */
#ifdef WITH_MPI
42

43
#include <mpi.h>
44
45
#endif

Angus Lepper's avatar
Angus Lepper committed
46
47
#ifdef HAVE_LIBNUMA
#include <numa.h>
48
49
#endif

50
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
51
#include "engine.h"
52
53

/* Local headers. */
54
#include "active.h"
55
#include "atomic.h"
56
#include "black_holes_properties.h"
57
#include "cell.h"
58
#include "chemistry.h"
59
#include "clocks.h"
60
#include "cooling.h"
61
#include "cosmology.h"
62
63
#include "cycle.h"
#include "debug.h"
64
#include "entropy_floor.h"
65
#include "equation_of_state.h"
66
#include "error.h"
Alexei Borissov's avatar
Alexei Borissov committed
67
#include "feedback.h"
Matthieu Schaller's avatar
Matthieu Schaller committed
68
#include "fof.h"
69
#include "gravity.h"
70
#include "gravity_cache.h"
71
#include "hydro.h"
lhausamm's avatar
lhausamm committed
72
#include "logger.h"
lhausamm's avatar
lhausamm committed
73
#include "logger_io.h"
74
#include "map.h"
75
#include "memuse.h"
76
#include "minmax.h"
77
#include "mpiuse.h"
78
#include "outputlist.h"
79
#include "parallel_io.h"
80
#include "part.h"
81
#include "partition.h"
James Willis's avatar
James Willis committed
82
#include "profiler.h"
83
#include "proxy.h"
84
#include "restart.h"
85
#include "runner.h"
86
87
#include "serial_io.h"
#include "single_io.h"
88
#include "sort_part.h"
89
#include "star_formation.h"
90
#include "star_formation_logger.h"
Loic Hausammann's avatar
Loic Hausammann committed
91
#include "stars_io.h"
92
#include "statistics.h"
93
#include "timers.h"
94
#include "tools.h"
95
#include "units.h"
96
#include "velociraptor_interface.h"
97
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
98

99
100
101
/* Particle cache size. */
#define CACHE_SIZE 512

102
103
104
105
106
const char *engine_policy_names[] = {"none",
                                     "rand",
                                     "steal",
                                     "keep",
                                     "block",
107
                                     "cpu tight",
108
                                     "mpi",
109
                                     "numa affinity",
110
                                     "hydro",
111
112
113
114
115
                                     "self gravity",
                                     "external gravity",
                                     "cosmological integration",
                                     "drift everything",
                                     "reconstruct multi-poles",
116
                                     "temperature",
117
                                     "cooling",
James Willis's avatar
James Willis committed
118
                                     "stars",
119
                                     "structure finding",
120
                                     "star formation",
121
                                     "feedback",
122
                                     "black holes",
123
                                     "fof search",
124
125
                                     "time-step limiter",
                                     "time-step sync"};
Pedro Gonnet's avatar
Pedro Gonnet committed
126

127
128
129
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

130
/** The current step of the engine as a global variable (for messages). */
131
int engine_current_step;
132

133
134
135
extern int engine_max_parts_per_ghost;
extern int engine_max_sparts_per_ghost;

136
137
138
139
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
140
 * @param l A pointer to the #link, will be modified atomically.
141
142
143
144
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
145
void engine_addlink(struct engine *e, struct link **l, struct task *t) {
146

147
148
149
150
151
152
#ifdef SWIFT_DEBUG_CHECKS
  if (t == NULL) {
    error("Trying to link NULL task.");
  }
#endif

153
  /* Get the next free link. */
154
  const size_t ind = atomic_inc(&e->nr_links);
155
  if (ind >= e->size_links) {
156
157
158
    error(
        "Link table overflow. Increase the value of "
        "`Scheduler:links_per_tasks`.");
159
160
  }
  struct link *res = &e->links[ind];
161

162
  /* Set it atomically. */
163
  res->t = t;
164
  res->next = atomic_swap(l, res);
165
}
166

167
/**
168
 * @brief Repartition the cells amongst the nodes.
169
170
171
 *
 * @param e The #engine.
 */
172
void engine_repartition(struct engine *e) {
173

174
#if defined(WITH_MPI) && (defined(HAVE_PARMETIS) || defined(HAVE_METIS))
175

176
177
  ticks tic = getticks();

178
#ifdef SWIFT_DEBUG_CHECKS
179
180
181
182
  /* Be verbose about this. */
  if (e->nodeID == 0 || e->verbose) message("repartitioning space");
  fflush(stdout);

183
  /* Check that all cells have been drifted to the current time */
184
  space_check_drift_point(e->s, e->ti_current, /*check_multipoles=*/0);
185
186
#endif

187
  /* Clear the repartition flag. */
188
  e->forcerepart = 0;
189

190
  /* Nothing to do if only using a single node. Also avoids METIS
191
   * bug that doesn't handle this case well. */
192
  if (e->nr_nodes == 1) return;
193

194
195
  /* Generate the fixed costs include file. */
  if (e->step > 3 && e->reparttype->trigger <= 1.f) {
Peter W. Draper's avatar
Peter W. Draper committed
196
197
    task_dump_stats("partition_fixed_costs.h", e, /* header = */ 1,
                    /* allranks = */ 1);
198
199
  }

200
  /* Do the repartitioning. */
201
  partition_repartition(e->reparttype, e->nodeID, e->nr_nodes, e->s,
202
                        e->sched.tasks, e->sched.nr_tasks);
203

204
205
  /* Partitioning requires copies of the particles, so we need to reduce the
   * memory in use to the minimum, we can free the sorting indices and the
206
207
   * tasks as these will be regenerated at the next rebuild. Also the foreign
   * particle arrays can go as these will be regenerated in proxy exchange. */
208
209

  /* Sorting indices. */
210
  if (e->s->cells_top != NULL) space_free_cells(e->s);
211

212
213
  /* Task arrays. */
  scheduler_free_tasks(&e->sched);
214

215
216
217
  /* Foreign parts. (no need to nullify the cell pointers as the cells
   * will be regenerated) */
  space_free_foreign_parts(e->s, /*clear_cell_pointers=*/0);
218

219
220
221
  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
222
     the parts array, and emitting the sends and receives.
223
224
225
226
227
228
229
230
231
232
233
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

234
235
236
  /* Flag that a repartition has taken place */
  e->step_props |= engine_step_prop_repartition;

237
238
239
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
240
#else
241
  if (e->reparttype->type != REPART_NONE)
242
    error("SWIFT was not compiled with MPI and METIS or ParMETIS support.");
243
244
245

  /* Clear the repartition flag. */
  e->forcerepart = 0;
246
#endif
247
}
248

249
250
251
252
253
254
255
256
257
/**
 * @brief Decide whether trigger a repartition the cells amongst the nodes.
 *
 * @param e The #engine.
 */
void engine_repartition_trigger(struct engine *e) {

#ifdef WITH_MPI

258
  const ticks tic = getticks();
259
260
  static int opened = 0;
  if (e->restarting) opened = 1;
261

262
263
  /* Do nothing if there have not been enough steps since the last repartition
   * as we don't want to repeat this too often or immediately after a
264
265
266
   * repartition step. We attempt all this even when we are not repartitioning
   * as the balance logs can still be interesting. */
  if (e->step - e->last_repartition >= 2) {
267

268
    /* If we have fixed costs available and this is step 2 or we are forcing
269
270
271
272
273
274
275
276
277
278
279
280
281
282
     * repartitioning then we do a forced fixed costs repartition regardless. */
    int forced = 0;
    if (e->reparttype->type != REPART_NONE) {
      if (e->reparttype->trigger > 1 ||
          (e->step == 2 && e->reparttype->use_fixed_costs)) {
        if (e->reparttype->trigger > 1) {
          if ((e->step % (int)e->reparttype->trigger) == 0) e->forcerepart = 1;
        } else {
          e->forcerepart = 1;
        }
        e->reparttype->use_ticks = 0;
        forced = 1;
      }
    }
283

284
285
286
287
288
289
290
291
292
293
294
    /* We only check the CPU loads when we have processed a significant number
     * of all particles as we require all tasks to have timings or are
     * interested in the various balances logs. */
    if ((e->updates > 1 &&
         e->updates >= e->total_nr_parts * e->reparttype->minfrac) ||
        (e->g_updates > 1 &&
         e->g_updates >= e->total_nr_gparts * e->reparttype->minfrac)) {

      /* Are we using the task tick timings or fixed costs? */
      if (e->reparttype->use_fixed_costs > 1) {
        e->reparttype->use_ticks = 0;
295
      } else {
296
        e->reparttype->use_ticks = 1;
297
298
      }

299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
      /* Get the resident size of the process for the memory logs. */
      long size, resident, shared, text, library, data, dirty;
      memuse_use(&size, &resident, &shared, &text, &data, &library, &dirty);

      /* Gather it together with the CPU times used by the tasks in the last
       * step. */
      double timemem[3] = {e->usertime_last_step, e->systime_last_step,
                           (double)resident};
      double timemems[e->nr_nodes * 3];
      MPI_Gather(&timemem, 3, MPI_DOUBLE, timemems, 3, MPI_DOUBLE, 0,
                 MPI_COMM_WORLD);
      if (e->nodeID == 0) {

        /* Get the range and mean of the two CPU times and memory. */
        double umintime = timemems[0];
        double umaxtime = timemems[0];

        double smintime = timemems[1];
        double smaxtime = timemems[1];

        double minmem = timemems[2];
        double maxmem = timemems[2];

        double tmintime = umintime + smintime;
        double tmaxtime = umaxtime + smaxtime;

        double usum = timemems[0];
        double ssum = timemems[1];
        double tsum = usum + ssum;

        double msum = timemems[2];

        for (int k = 3; k < e->nr_nodes * 3; k += 3) {
          if (timemems[k] > umaxtime) umaxtime = timemems[k];
          if (timemems[k] < umintime) umintime = timemems[k];

          if (timemems[k + 1] > smaxtime) smaxtime = timemems[k + 1];
          if (timemems[k + 1] < smintime) smintime = timemems[k + 1];

          double total = timemems[k] + timemems[k + 1];
          if (total > tmaxtime) tmaxtime = total;
          if (total < tmintime) tmintime = total;

          usum += timemems[k];
          ssum += timemems[k + 1];
          tsum += total;

          if (timemems[k + 2] > maxmem) maxmem = timemems[k + 2];
          if (timemems[k + 2] < minmem) minmem = timemems[k + 2];
          msum += timemems[k + 2];
        }
        double umean = usum / (double)e->nr_nodes;
        double smean = ssum / (double)e->nr_nodes;
        double tmean = tsum / (double)e->nr_nodes;
        double mmean = msum / (double)e->nr_nodes;

        /* Are we out of balance and need to repartition? */
        /* ---------------------------------------------- */
        double abs_trigger = fabs(e->reparttype->trigger);
        double balance = (umaxtime - umintime) / umean;
        if (e->reparttype->type != REPART_NONE) {

          /* When forced we don't care about the balance. */
          if (!forced) {
            if (balance > abs_trigger) {
              if (e->verbose)
                message("trigger fraction %.3f > %.3f will repartition",
                        balance, abs_trigger);
              e->forcerepart = 1;
            } else {
              if (e->verbose && e->reparttype->type != REPART_NONE)
                message("trigger fraction %.3f =< %.3f will not repartition",
                        balance, abs_trigger);
            }
          }
374

375
        } else {
376
377
378
379
          /* Not repartitioning, would that have been done otherwise? */
          if (e->verbose)
            message("trigger fraction %.3f > %.3f would have repartitioned",
                    balance, abs_trigger);
380
        }
381

382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
        /* Keep logs of all CPU times and resident memory size for debugging
         * load issues. */
        FILE *timelog = NULL;
        FILE *memlog = NULL;
        if (!opened) {
          timelog = fopen("rank_cpu_balance.log", "w");
          fprintf(timelog, "# step rank user sys sum\n");

          memlog = fopen("rank_memory_balance.log", "w");
          fprintf(memlog, "# step rank resident\n");

          opened = 1;
        } else {
          timelog = fopen("rank_cpu_balance.log", "a");
          memlog = fopen("rank_memory_balance.log", "a");
        }

        for (int k = 0; k < e->nr_nodes * 3; k += 3) {

          fprintf(timelog, "%d %d %f %f %f\n", e->step, k / 3, timemems[k],
                  timemems[k + 1], timemems[k] + timemems[k + 1]);

          fprintf(memlog, "%d %d %ld\n", e->step, k / 3, (long)timemems[k + 2]);
405
        }
406
407
408
409
410
411
412
413
414
415
416
417
418
419

        fprintf(timelog, "# %d mean times: %f %f %f\n", e->step, umean, smean,
                tmean);
        if (abs_trigger > 1.f) abs_trigger = 0.f; /* Not relevant. */
        fprintf(timelog,
                "# %d balance: %f, expected: %f (sys: %f, total: %f)\n",
                e->step, balance, abs_trigger, (smaxtime - smintime) / smean,
                (tmaxtime - tmintime) / tmean);

        fclose(timelog);

        fprintf(memlog, "# %d mean resident memory: %f, balance: %f\n", e->step,
                mmean, (maxmem - minmem) / mmean);
        fclose(memlog);
420
      }
421

422
      /* All nodes do this together, so send to other ranks. */
423
      MPI_Bcast(&e->forcerepart, 1, MPI_INT, 0, MPI_COMM_WORLD);
424
425
426
    }

    /* Remember we did this. */
Peter W. Draper's avatar
Peter W. Draper committed
427
    if (e->forcerepart) e->last_repartition = e->step;
428
429
  }

430
431
432
  if (e->verbose)
    message("took %.3f %s", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
433
434
435
#endif
}

436
/**
437
438
439
440
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
441
void engine_exchange_cells(struct engine *e) {
442

443
#ifdef WITH_MPI
444

445
  const int with_gravity = e->policy & engine_policy_self_gravity;
446
  const ticks tic = getticks();
447

448
  /* Exchange the cell structure with neighbouring ranks. */
449
  proxy_cells_exchange(e->proxies, e->nr_proxies, e->s, with_gravity);
450

451
452
453
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
454
455
456
457
458
459
460

#else
  error("SWIFT was not compiled with MPI support.");
#endif
}

/**
461
 * @brief Exchange straying particles with other nodes.
462
463
 *
 * @param e The #engine.
Pedro Gonnet's avatar
Pedro Gonnet committed
464
 * @param offset_parts The index in the parts array as of which the foreign
465
 *        parts reside (i.e. the current number of local #part).
466
 * @param ind_part The foreign #cell ID of each part.
467
 * @param Npart The number of stray parts, contains the number of parts received
468
 *        on return.
Pedro Gonnet's avatar
Pedro Gonnet committed
469
 * @param offset_gparts The index in the gparts array as of which the foreign
470
 *        parts reside (i.e. the current number of local #gpart).
471
 * @param ind_gpart The foreign #cell ID of each gpart.
Pedro Gonnet's avatar
Pedro Gonnet committed
472
 * @param Ngpart The number of stray gparts, contains the number of gparts
Pedro Gonnet's avatar
Pedro Gonnet committed
473
 *        received on return.
474
 * @param offset_sparts The index in the sparts array as of which the foreign
475
 *        parts reside (i.e. the current number of local #spart).
476
477
478
 * @param ind_spart The foreign #cell ID of each spart.
 * @param Nspart The number of stray sparts, contains the number of sparts
 *        received on return.
479
480
481
482
483
 * @param offset_bparts The index in the bparts array as of which the foreign
 *        parts reside (i.e. the current number of local #bpart).
 * @param ind_bpart The foreign #cell ID of each bpart.
 * @param Nbpart The number of stray bparts, contains the number of bparts
 *        received on return.
484
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
485
486
 * Note that this function does not mess-up the linkage between parts and
 * gparts, i.e. the received particles have correct linkeage.
487
 */
488
void engine_exchange_strays(struct engine *e, const size_t offset_parts,
489
490
491
492
493
494
495
                            const int *restrict ind_part, size_t *Npart,
                            const size_t offset_gparts,
                            const int *restrict ind_gpart, size_t *Ngpart,
                            const size_t offset_sparts,
                            const int *restrict ind_spart, size_t *Nspart,
                            const size_t offset_bparts,
                            const int *restrict ind_bpart, size_t *Nbpart) {
496
497
498

#ifdef WITH_MPI

499
  struct space *s = e->s;
500
  ticks tic = getticks();
501
502

  /* Re-set the proxies. */
Pedro Gonnet's avatar
Pedro Gonnet committed
503
504
505
  for (int k = 0; k < e->nr_proxies; k++) {
    e->proxies[k].nr_parts_out = 0;
    e->proxies[k].nr_gparts_out = 0;
506
    e->proxies[k].nr_sparts_out = 0;
507
    e->proxies[k].nr_bparts_out = 0;
Pedro Gonnet's avatar
Pedro Gonnet committed
508
  }
509

510
  /* Put the parts into the corresponding proxies. */
511
  for (size_t k = 0; k < *Npart; k++) {
512
513
514
515

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_part[k] == -1) continue;

516
    /* Get the target node and proxy ID. */
517
    const int node_id = e->s->cells_top[ind_part[k]].nodeID;
518
519
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
520
    const int pid = e->proxy_ind[node_id];
521
    if (pid < 0) {
522
523
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
524
          "id=%lld, x=[%e,%e,%e].",
Pedro Gonnet's avatar
Pedro Gonnet committed
525
526
527
          node_id, s->parts[offset_parts + k].id,
          s->parts[offset_parts + k].x[0], s->parts[offset_parts + k].x[1],
          s->parts[offset_parts + k].x[2]);
528
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
529

530
531
    /* Re-link the associated gpart with the buffer offset of the part. */
    if (s->parts[offset_parts + k].gpart != NULL) {
532
533
      s->parts[offset_parts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_parts_out;
534
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
535

536
537
538
539
540
#ifdef SWIFT_DEBUG_CHECKS
    if (s->parts[offset_parts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

541
    /* Load the part and xpart into the proxy. */
542
543
544
    proxy_parts_load(&e->proxies[pid], &s->parts[offset_parts + k],
                     &s->xparts[offset_parts + k], 1);
  }
545

546
  /* Put the sparts into the corresponding proxies. */
547
  for (size_t k = 0; k < *Nspart; k++) {
548
549
550
551
552

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_spart[k] == -1) continue;

    /* Get the target node and proxy ID. */
553
554
555
556
    const int node_id = e->s->cells_top[ind_spart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
557
    if (pid < 0) {
558
559
560
561
562
563
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lld, x=[%e,%e,%e].",
          node_id, s->sparts[offset_sparts + k].id,
          s->sparts[offset_sparts + k].x[0], s->sparts[offset_sparts + k].x[1],
          s->sparts[offset_sparts + k].x[2]);
564
    }
565

566
    /* Re-link the associated gpart with the buffer offset of the spart. */
567
568
569
570
571
    if (s->sparts[offset_sparts + k].gpart != NULL) {
      s->sparts[offset_sparts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_sparts_out;
    }

572
573
574
575
576
#ifdef SWIFT_DEBUG_CHECKS
    if (s->sparts[offset_sparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

577
578
579
    /* Load the spart into the proxy */
    proxy_sparts_load(&e->proxies[pid], &s->sparts[offset_sparts + k], 1);
  }
580

581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
  /* Put the bparts into the corresponding proxies. */
  for (size_t k = 0; k < *Nbpart; k++) {

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_bpart[k] == -1) continue;

    /* Get the target node and proxy ID. */
    const int node_id = e->s->cells_top[ind_bpart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
    if (pid < 0) {
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lld, x=[%e,%e,%e].",
          node_id, s->bparts[offset_bparts + k].id,
          s->bparts[offset_bparts + k].x[0], s->bparts[offset_bparts + k].x[1],
          s->bparts[offset_bparts + k].x[2]);
    }

    /* Re-link the associated gpart with the buffer offset of the bpart. */
    if (s->bparts[offset_bparts + k].gpart != NULL) {
      s->bparts[offset_bparts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_bparts_out;
    }

#ifdef SWIFT_DEBUG_CHECKS
    if (s->bparts[offset_bparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

    /* Load the bpart into the proxy */
    proxy_bparts_load(&e->proxies[pid], &s->bparts[offset_bparts + k], 1);
  }
615

616
617
  /* Put the gparts into the corresponding proxies. */
  for (size_t k = 0; k < *Ngpart; k++) {
618
619
620
621
622

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_gpart[k] == -1) continue;

    /* Get the target node and proxy ID. */
623
624
625
626
    const int node_id = e->s->cells_top[ind_gpart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
627
    if (pid < 0) {
628
629
630
631
632
633
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lli, x=[%e,%e,%e].",
          node_id, s->gparts[offset_gparts + k].id_or_neg_offset,
          s->gparts[offset_gparts + k].x[0], s->gparts[offset_gparts + k].x[1],
          s->gparts[offset_gparts + k].x[2]);
634
635
636
637
638
639
    }

#ifdef SWIFT_DEBUG_CHECKS
    if (s->gparts[offset_gparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif
640
641
642
643
644

    /* Load the gpart into the proxy */
    proxy_gparts_load(&e->proxies[pid], &s->gparts[offset_gparts + k], 1);
  }

645
  /* Launch the proxies. */
646
647
  MPI_Request reqs_in[5 * engine_maxproxies];
  MPI_Request reqs_out[5 * engine_maxproxies];
648
  for (int k = 0; k < e->nr_proxies; k++) {
649
    proxy_parts_exchange_first(&e->proxies[k]);
650
651
652
653
654
    reqs_in[k] = e->proxies[k].req_parts_count_in;
    reqs_out[k] = e->proxies[k].req_parts_count_out;
  }

  /* Wait for each count to come in and start the recv. */
655
  for (int k = 0; k < e->nr_proxies; k++) {
656
    int pid = MPI_UNDEFINED;
657
658
    if (MPI_Waitany(e->nr_proxies, reqs_in, &pid, MPI_STATUS_IGNORE) !=
            MPI_SUCCESS ||
659
660
661
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
662
    proxy_parts_exchange_second(&e->proxies[pid]);
663
  }
664

665
  /* Wait for all the sends to have finished too. */
666
667
668
  if (MPI_Waitall(e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

669
  /* Count the total number of incoming particles and make sure we have
670
     enough space to accommodate them. */
671
672
  int count_parts_in = 0;
  int count_gparts_in = 0;
673
  int count_sparts_in = 0;
674
  int count_bparts_in = 0;
675
676
677
  for (int k = 0; k < e->nr_proxies; k++) {
    count_parts_in += e->proxies[k].nr_parts_in;
    count_gparts_in += e->proxies[k].nr_gparts_in;
678
    count_sparts_in += e->proxies[k].nr_sparts_in;
679
    count_bparts_in += e->proxies[k].nr_bparts_in;
680
681
  }
  if (e->verbose) {
682
683
684
685
686
    message(
        "sent out %zu/%zu/%zu/%zu parts/gparts/sparts/bparts, got %i/%i/%i/%i "
        "back.",
        *Npart, *Ngpart, *Nspart, *Nbpart, count_parts_in, count_gparts_in,
        count_sparts_in, count_bparts_in);
687
  }
688
689

  /* Reallocate the particle arrays if necessary */
690
  if (offset_parts + count_parts_in > s->size_parts) {
691
    s->size_parts = (offset_parts + count_parts_in) * engine_parts_size_grow;
692
693
    struct part *parts_new = NULL;
    struct xpart *xparts_new = NULL;
694
    if (swift_memalign("parts", (void **)&parts_new, part_align,
695
                       sizeof(struct part) * s->size_parts) != 0 ||
696
        swift_memalign("xparts", (void **)&xparts_new, xpart_align,
697
698
                       sizeof(struct xpart) * s->size_parts) != 0)
      error("Failed to allocate new part data.");
699
700
    memcpy(parts_new, s->parts, sizeof(struct part) * offset_parts);
    memcpy(xparts_new, s->xparts, sizeof(struct xpart) * offset_parts);
701
702
    swift_free("parts", s->parts);
    swift_free("xparts", s->xparts);
703
704
    s->parts = parts_new;
    s->xparts = xparts_new;
705
706

    /* Reset the links */
Pedro Gonnet's avatar
Pedro Gonnet committed
707
708
    for (size_t k = 0; k < offset_parts; k++) {
      if (s->parts[k].gpart != NULL) {
709
        s->parts[k].gpart->id_or_neg_offset = -k;
Pedro Gonnet's avatar
Pedro Gonnet committed
710
711
      }
    }
712
  }
713

714
715
716
  if (offset_sparts + count_sparts_in > s->size_sparts) {
    s->size_sparts = (offset_sparts + count_sparts_in) * engine_parts_size_grow;
    struct spart *sparts_new = NULL;
717
    if (swift_memalign("sparts", (void **)&sparts_new, spart_align,
718
719
720
                       sizeof(struct spart) * s->size_sparts) != 0)
      error("Failed to allocate new spart data.");
    memcpy(sparts_new, s->sparts, sizeof(struct spart) * offset_sparts);
721
    swift_free("sparts", s->sparts);
722
    s->sparts = sparts_new;
723
724

    /* Reset the links */
725
    for (size_t k = 0; k < offset_sparts; k++) {
726
727
728
729
730
      if (s->sparts[k].gpart != NULL) {
        s->sparts[k].gpart->id_or_neg_offset = -k;
      }
    }
  }
731

732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
  if (offset_bparts + count_bparts_in > s->size_bparts) {
    s->size_bparts = (offset_bparts + count_bparts_in) * engine_parts_size_grow;
    struct bpart *bparts_new = NULL;
    if (swift_memalign("bparts", (void **)&bparts_new, bpart_align,
                       sizeof(struct bpart) * s->size_bparts) != 0)
      error("Failed to allocate new bpart data.");
    memcpy(bparts_new, s->bparts, sizeof(struct bpart) * offset_bparts);
    swift_free("bparts", s->bparts);
    s->bparts = bparts_new;

    /* Reset the links */
    for (size_t k = 0; k < offset_bparts; k++) {
      if (s->bparts[k].gpart != NULL) {
        s->bparts[k].gpart->id_or_neg_offset = -k;
      }
    }
  }
749

750
  if (offset_gparts + count_gparts_in > s->size_gparts) {
751
    s->size_gparts = (offset_gparts + count_gparts_in) * engine_parts_size_grow;
752
    struct gpart *gparts_new = NULL;
753
    if (swift_memalign("gparts", (void **)&gparts_new, gpart_align,
754
755
756
                       sizeof(struct gpart) * s->size_gparts) != 0)
      error("Failed to allocate new gpart data.");
    memcpy(gparts_new, s->gparts, sizeof(struct gpart) * offset_gparts);
757
    swift_free("gparts", s->gparts);
758
    s->gparts = gparts_new;
759

760
    /* Reset the links */
Pedro Gonnet's avatar
Pedro Gonnet committed
761
    for (size_t k = 0; k < offset_gparts; k++) {
762
      if (s->gparts[k].type == swift_type_gas) {
763
        s->parts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
764
      } else if (s->gparts[k].type == swift_type_stars) {
765
        s->sparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
766
767
      } else if (s->gparts[k].type == swift_type_black_hole) {
        s->bparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
Pedro Gonnet's avatar
Pedro Gonnet committed
768
769
      }
    }
770
  }
771
772

  /* Collect the requests for the particle data from the proxies. */
773
774
  int nr_in = 0, nr_out = 0;
  for (int k = 0; k < e->nr_proxies; k++) {
775
    if (e->proxies[k].nr_parts_in > 0) {
776
777
      reqs_in[5 * k] = e->proxies[k].req_parts_in;
      reqs_in[5 * k + 1] = e->proxies[k].req_xparts_in;
778
779
      nr_in += 2;
    } else {
780
      reqs_in[5 * k] = reqs_in[5 * k + 1] = MPI_REQUEST_NULL;
781
782
    }
    if (e->proxies[k].nr_gparts_in > 0) {
783
      reqs_in[5 * k + 2] = e->proxies[k].req_gparts_in;
784
      nr_in += 1;
785
    } else {
786
      reqs_in[5 * k + 2] = MPI_REQUEST_NULL;
787
    }
788
    if (e->proxies[k].nr_sparts_in > 0) {
789
      reqs_in[5 * k + 3] = e->proxies[k].req_sparts_in;
790
791
      nr_in += 1;
    } else {
792
793
794
795
796
797
798
      reqs_in[5 * k + 3] = MPI_REQUEST_NULL;
    }
    if (e->proxies[k].nr_bparts_in > 0) {
      reqs_in[5 * k + 4] = e->proxies[k].req_bparts_in;
      nr_in += 1;
    } else {
      reqs_in[5 * k + 4] = MPI_REQUEST_NULL;
799
800
    }

801
    if (e->proxies[k].nr_parts_out > 0) {
802
803
      reqs_out[5 * k] = e->proxies[k].req_parts_out;
      reqs_out[5 * k + 1] = e->proxies[k].req_xparts_out;
804
805
      nr_out += 2;
    } else {
806
      reqs_out[5 * k] = reqs_out[5 * k + 1] = MPI_REQUEST_NULL;
807
808
    }
    if (e->proxies[k].nr_gparts_out > 0) {
809
      reqs_out[5 * k + 2] = e->proxies[k].req_gparts_out;
810
811
      nr_out += 1;
    } else {
812
      reqs_out[5 * k + 2] = MPI_REQUEST_NULL;
813
814
    }
    if (e->proxies[k].nr_sparts_out > 0) {
815
816
817
818
819
820
821
      reqs_out[5 * k + 3] = e->proxies[k].req_sparts_out;
      nr_out += 1;
    } else {
      reqs_out[5 * k + 3] = MPI_REQUEST_NULL;
    }
    if (e->proxies[k].nr_bparts_out > 0) {
      reqs_out[5 * k + 4] = e->proxies[k].req_bparts_out;
822
      nr_out += 1;
823
    } else {
824
      reqs_out[5 * k + 4] = MPI_REQUEST_NULL;
825
    }
826
827
828
829
  }

  /* Wait for each part array to come in and collect the new
     parts from the proxies. */
830
  int count_parts = 0, count_gparts = 0, count_sparts = 0, count_bparts = 0;
831
  for (int k = 0; k < nr_in; k++) {
832
    int err, pid;
833
    if ((err = MPI_Waitany(5 * e->nr_proxies, reqs_in, &pid,
834
                           MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
835
836
837
838
839
840
      char buff[MPI_MAX_ERROR_STRING];
      int res;
      MPI_Error_string(err, buff, &res);
      error("MPI_Waitany failed (%s).", buff);
    }
    if (pid == MPI_UNDEFINED) break;
841
842
    // message( "request from proxy %i has arrived." , pid / 5 );
    pid = 5 * (pid / 5);
Pedro Gonnet's avatar
Pedro Gonnet committed
843

844
    /* If all the requests for a given proxy have arrived... */
845
846
    if (reqs_in[pid + 0] == MPI_REQUEST_NULL &&
        reqs_in[pid + 1] == MPI_REQUEST_NULL &&
847
        reqs_in[pid + 2] == MPI_REQUEST_NULL &&
848
        reqs_in[pid + 3] == MPI_REQUEST_NULL &&
849
        reqs_in[pid + 4] == MPI_REQUEST_NULL) {
850
      /* Copy the particle data to the part/xpart/gpart arrays. */
851
      struct proxy *prox = &e->proxies[pid / 5];
852
853
854
855
856
857
      memcpy(&s->parts[offset_parts + count_parts], prox->parts_in,
             sizeof(struct part) * prox->nr_parts_in);
      memcpy(&s->xparts[offset_parts + count_parts], prox->xparts_in,
             sizeof(struct xpart) * prox->nr_parts_in);
      memcpy(&s->gparts[offset_gparts + count_gparts], prox->gparts_in,
             sizeof(struct gpart) * prox->nr_gparts_in);
858
859
      memcpy(&s->sparts[offset_sparts + count_sparts], prox->sparts_in,
             sizeof(struct spart) * prox->nr_sparts_in);
860
861
      memcpy(&s->bparts[offset_bparts + count_bparts], prox->bparts_in,
             sizeof(struct bpart) * prox->nr_bparts_in);
862
863
      /* for (int k = offset; k < offset + count; k++)
         message(
864
            "received particle %lli, x=[%.3e %.3e %.3e], h=%.3e, from node %i.",
865
            s->parts[k].id, s->parts[k].x[0], s->parts[k].x[1],
866
            s->parts[k].x[2], s->parts[k].h, p->nodeID); */
Pedro Gonnet's avatar
Pedro Gonnet committed
867

868
      /* Re-link the gparts. */
869
870
      for (int kk = 0; kk < prox->nr_gparts_in; kk++) {
        struct gpart *gp = &s->gparts[offset_gparts + count_gparts + kk];
871
872

        if (gp->type == swift_type_gas) {
873
          struct part *p =
874
              &s->parts[offset_parts + count_parts - gp->id_or_neg_offset];
875
          gp->id_or_neg_offset = s->parts - p;
876
          p->gpart = gp;
877
        } else if (gp->type == swift_type_stars) {
878
879
880
881
          struct spart *sp =
              &s->sparts[offset_sparts + count_sparts - gp->id_or_neg_offset];
          gp->id_or_neg_offset = s->sparts - sp;
          sp->gpart = gp;
882
883
884
885
886
        } else if (gp->type == swift_type_black_hole) {
          struct bpart *bp =
              &s->bparts[offset_bparts + count_bparts - gp->id_or_neg_offset];
          gp->id_or_neg_offset = s->bparts - bp;
          bp->gpart = gp;
887
888
        }
      }
Pedro Gonnet's avatar
Pedro Gonnet committed
889

890
      /* Advance the counters. */
891
892
      count_parts += prox->nr_parts_in;
      count_gparts += prox->nr_gparts_in;
893
      count_sparts += prox->nr_sparts_in;
894
      count_bparts += prox->nr_bparts_in;
895
    }
896
  }
Pedro Gonnet's avatar
Pedro Gonnet committed
897

898
  /* Wait for all the sends to have finished too. */
Pedro Gonnet's avatar
Pedro Gonnet committed
899
  if (nr_out > 0)
900
    if (MPI_Waitall(5 * e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) !=
Pedro Gonnet's avatar
Pedro Gonnet committed
901
902
903
        MPI_SUCCESS)
      error("MPI_Waitall on sends failed.");

904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
  /* Free the proxy memory */
  for (int k = 0; k < e->nr_proxies; k++) {
    struct proxy *prox = &e->proxies[k];
    if (prox->gparts_out) {
      swift_free("gparts_out", prox->gparts_out);
      prox->gparts_out = NULL;
      prox->size_gparts_out = 0;
    }
    if (prox->gparts_in) {
      swift_free("gparts_in", prox->gparts_in);
      prox->gparts_in = NULL;
      prox->size_gparts_in = 0;
    }
  }

919
920
921
922
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());

923
  /* Return the number of harvested parts. */
924
925
  *Npart = count_parts;
  *Ngpart = count_gparts;
926
  *Nspart = count_sparts;
927
  *Nbpart = count_bparts;
Pedro Gonnet's avatar
Pedro Gonnet committed
928

929
#else
Pedro Gonnet's avatar
Pedro Gonnet committed
930
  error("SWIFT was not compiled with MPI support.");
931
932
#endif
}