engine.c 171 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
36
#include <sys/stat.h>
37
#include <sys/types.h>
38
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
39

40
41
/* MPI headers. */
#ifdef WITH_MPI
42

43
#include <mpi.h>
44
45
#endif

Angus Lepper's avatar
Angus Lepper committed
46
47
#ifdef HAVE_LIBNUMA
#include <numa.h>
48
49
#endif

50
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
51
#include "engine.h"
52
53

/* Local headers. */
54
#include "active.h"
55
#include "atomic.h"
56
#include "black_holes_properties.h"
57
#include "cell.h"
58
#include "chemistry.h"
59
#include "clocks.h"
60
#include "cooling.h"
61
#include "cosmology.h"
62
63
#include "cycle.h"
#include "debug.h"
64
#include "entropy_floor.h"
65
#include "equation_of_state.h"
66
#include "error.h"
Alexei Borissov's avatar
Alexei Borissov committed
67
#include "feedback.h"
Matthieu Schaller's avatar
Matthieu Schaller committed
68
#include "fof.h"
69
#include "gravity.h"
70
#include "gravity_cache.h"
71
#include "hydro.h"
lhausamm's avatar
lhausamm committed
72
#include "logger.h"
lhausamm's avatar
lhausamm committed
73
#include "logger_io.h"
74
#include "map.h"
75
#include "memuse.h"
76
#include "minmax.h"
77
#include "mpiuse.h"
78
#include "outputlist.h"
79
#include "parallel_io.h"
80
#include "part.h"
81
#include "partition.h"
James Willis's avatar
James Willis committed
82
#include "profiler.h"
83
#include "proxy.h"
84
#include "restart.h"
85
#include "runner.h"
86
87
#include "serial_io.h"
#include "single_io.h"
88
#include "sort_part.h"
89
#include "star_formation.h"
90
#include "star_formation_logger.h"
Loic Hausammann's avatar
Loic Hausammann committed
91
#include "stars_io.h"
92
#include "statistics.h"
93
#include "timers.h"
94
#include "tools.h"
95
#include "units.h"
96
#include "velociraptor_interface.h"
97
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
98

99
100
101
/* Particle cache size. */
#define CACHE_SIZE 512

102
103
104
105
106
const char *engine_policy_names[] = {"none",
                                     "rand",
                                     "steal",
                                     "keep",
                                     "block",
107
                                     "cpu tight",
108
                                     "mpi",
109
                                     "numa affinity",
110
                                     "hydro",
111
112
113
114
115
                                     "self gravity",
                                     "external gravity",
                                     "cosmological integration",
                                     "drift everything",
                                     "reconstruct multi-poles",
116
                                     "temperature",
117
                                     "cooling",
James Willis's avatar
James Willis committed
118
                                     "stars",
119
                                     "structure finding",
120
                                     "star formation",
121
                                     "feedback",
122
                                     "black holes",
123
                                     "fof search",
124
125
                                     "time-step limiter",
                                     "time-step sync"};
Pedro Gonnet's avatar
Pedro Gonnet committed
126

127
128
129
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

130
/** The current step of the engine as a global variable (for messages). */
131
int engine_current_step;
132

133
134
135
extern int engine_max_parts_per_ghost;
extern int engine_max_sparts_per_ghost;

136
137
138
139
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
140
 * @param l A pointer to the #link, will be modified atomically.
141
142
143
144
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
145
void engine_addlink(struct engine *e, struct link **l, struct task *t) {
146

147
148
149
150
151
152
#ifdef SWIFT_DEBUG_CHECKS
  if (t == NULL) {
    error("Trying to link NULL task.");
  }
#endif

153
  /* Get the next free link. */
154
  const size_t ind = atomic_inc(&e->nr_links);
155
  if (ind >= e->size_links) {
156
157
158
    error(
        "Link table overflow. Increase the value of "
        "`Scheduler:links_per_tasks`.");
159
160
  }
  struct link *res = &e->links[ind];
161

162
  /* Set it atomically. */
163
  res->t = t;
164
  res->next = atomic_swap(l, res);
165
}
166

167
/**
168
 * @brief Repartition the cells amongst the nodes.
169
170
171
 *
 * @param e The #engine.
 */
172
void engine_repartition(struct engine *e) {
173

174
#if defined(WITH_MPI) && (defined(HAVE_PARMETIS) || defined(HAVE_METIS))
175

176
177
  ticks tic = getticks();

178
#ifdef SWIFT_DEBUG_CHECKS
179
180
181
182
  /* Be verbose about this. */
  if (e->nodeID == 0 || e->verbose) message("repartitioning space");
  fflush(stdout);

183
  /* Check that all cells have been drifted to the current time */
184
  space_check_drift_point(e->s, e->ti_current, /*check_multipoles=*/0);
185
186
#endif

187
  /* Clear the repartition flag. */
188
  e->forcerepart = 0;
189

190
  /* Nothing to do if only using a single node. Also avoids METIS
191
   * bug that doesn't handle this case well. */
192
  if (e->nr_nodes == 1) return;
193

194
195
  /* Generate the fixed costs include file. */
  if (e->step > 3 && e->reparttype->trigger <= 1.f) {
Peter W. Draper's avatar
Peter W. Draper committed
196
197
    task_dump_stats("partition_fixed_costs.h", e, /* header = */ 1,
                    /* allranks = */ 1);
198
199
  }

200
  /* Do the repartitioning. */
201
  partition_repartition(e->reparttype, e->nodeID, e->nr_nodes, e->s,
202
                        e->sched.tasks, e->sched.nr_tasks);
203

204
205
  /* Partitioning requires copies of the particles, so we need to reduce the
   * memory in use to the minimum, we can free the sorting indices and the
206
207
   * tasks as these will be regenerated at the next rebuild. Also the foreign
   * particle arrays can go as these will be regenerated in proxy exchange. */
208
209

  /* Sorting indices. */
210
  if (e->s->cells_top != NULL) space_free_cells(e->s);
211

212
213
  /* Task arrays. */
  scheduler_free_tasks(&e->sched);
214

215
216
217
  /* Foreign parts. (no need to nullify the cell pointers as the cells
   * will be regenerated) */
  space_free_foreign_parts(e->s, /*clear_cell_pointers=*/0);
218

219
220
221
  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
222
     the parts array, and emitting the sends and receives.
223
224
225
226
227
228
229
230
231
232
233
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

234
235
236
  /* Flag that a repartition has taken place */
  e->step_props |= engine_step_prop_repartition;

237
238
239
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
240
#else
241
  if (e->reparttype->type != REPART_NONE)
242
    error("SWIFT was not compiled with MPI and METIS or ParMETIS support.");
243
244
245

  /* Clear the repartition flag. */
  e->forcerepart = 0;
246
#endif
247
}
248

249
250
251
252
253
254
255
256
257
/**
 * @brief Decide whether trigger a repartition the cells amongst the nodes.
 *
 * @param e The #engine.
 */
void engine_repartition_trigger(struct engine *e) {

#ifdef WITH_MPI

258
259
  const ticks tic = getticks();

260
261
262
  /* Do nothing if there have not been enough steps since the last repartition
   * as we don't want to repeat this too often or immediately after a
   * repartition step. Also nothing to do when requested. */
263
264
  if (e->step - e->last_repartition >= 2 &&
      e->reparttype->type != REPART_NONE) {
265

266
267
268
269
270
    /* If we have fixed costs available and this is step 2 or we are forcing
     * repartitioning then we do a fixed costs one now. */
    if (e->reparttype->trigger > 1 ||
        (e->step == 2 && e->reparttype->use_fixed_costs)) {

271
      if (e->reparttype->trigger > 1) {
Matthieu Schaller's avatar
Matthieu Schaller committed
272
        if ((e->step % (int)e->reparttype->trigger) == 0) e->forcerepart = 1;
273
274
275
      } else {
        e->forcerepart = 1;
      }
276
      e->reparttype->use_ticks = 0;
277
278
279

    } else {

280
281
282
      /* It is only worth checking the CPU loads when we have processed a
       * significant number of all particles as we require all tasks to have
       * timings. */
283
284
285
286
287
      if ((e->updates > 1 &&
           e->updates >= e->total_nr_parts * e->reparttype->minfrac) ||
          (e->g_updates > 1 &&
           e->g_updates >= e->total_nr_gparts * e->reparttype->minfrac)) {

288
289
290
291
292
293
        /* Should we are use the task timings or fixed costs. */
        if (e->reparttype->use_fixed_costs > 1) {
          e->reparttype->use_ticks = 0;
        } else {
          e->reparttype->use_ticks = 1;
        }
294

295
        /* Get CPU time used since the last call to this function. */
Peter W. Draper's avatar
Peter W. Draper committed
296
297
        double elapsed_cputime =
            clocks_get_cputime_used() - e->cputime_last_step;
298
299
300

        /* Gather the elapsed CPU times from all ranks for the last step. */
        double elapsed_cputimes[e->nr_nodes];
Peter W. Draper's avatar
Peter W. Draper committed
301
302
        MPI_Gather(&elapsed_cputime, 1, MPI_DOUBLE, elapsed_cputimes, 1,
                   MPI_DOUBLE, 0, MPI_COMM_WORLD);
303
304
        if (e->nodeID == 0) {

305
          /* Get the range and mean of cputimes. */
306
307
          double mintime = elapsed_cputimes[0];
          double maxtime = elapsed_cputimes[0];
308
          double sum = elapsed_cputimes[0];
309
310
311
          for (int k = 1; k < e->nr_nodes; k++) {
            if (elapsed_cputimes[k] > maxtime) maxtime = elapsed_cputimes[k];
            if (elapsed_cputimes[k] < mintime) mintime = elapsed_cputimes[k];
312
            sum += elapsed_cputimes[k];
313
          }
314
          double mean = sum / (double)e->nr_nodes;
315
316

          /* Are we out of balance? */
317
318
          double abs_trigger = fabs(e->reparttype->trigger);
          if (((maxtime - mintime) / mean) > abs_trigger) {
319
            if (e->verbose)
Peter W. Draper's avatar
Peter W. Draper committed
320
321
              message("trigger fraction %.3f > %.3f will repartition",
                      (maxtime - mintime) / mean, abs_trigger);
322
            e->forcerepart = 1;
323
          } else {
Peter W. Draper's avatar
Peter W. Draper committed
324
            if (e->verbose)
Peter W. Draper's avatar
Peter W. Draper committed
325
326
              message("trigger fraction %.3f =< %.3f will not repartition",
                      (maxtime - mintime) / mean, abs_trigger);
327
328
329
          }
        }
      }
330
331
332

      /* All nodes do this together. */
      MPI_Bcast(&e->forcerepart, 1, MPI_INT, 0, MPI_COMM_WORLD);
333
334
335
    }

    /* Remember we did this. */
Peter W. Draper's avatar
Peter W. Draper committed
336
    if (e->forcerepart) e->last_repartition = e->step;
337
338
  }

339
340
341
  /* We always reset CPU time for next check, unless it will not be used. */
  if (e->reparttype->type != REPART_NONE)
    e->cputime_last_step = clocks_get_cputime_used();
342
343
344
345

  if (e->verbose)
    message("took %.3f %s", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
346
347
348
#endif
}

349
/**
350
351
352
353
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
354
void engine_exchange_cells(struct engine *e) {
355

356
#ifdef WITH_MPI
357

358
  const int with_gravity = e->policy & engine_policy_self_gravity;
359
  const ticks tic = getticks();
360

361
  /* Exchange the cell structure with neighbouring ranks. */
362
  proxy_cells_exchange(e->proxies, e->nr_proxies, e->s, with_gravity);
363

364
365
366
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
367
368
369
370
371
372
373

#else
  error("SWIFT was not compiled with MPI support.");
#endif
}

/**
374
 * @brief Exchange straying particles with other nodes.
375
376
 *
 * @param e The #engine.
Pedro Gonnet's avatar
Pedro Gonnet committed
377
 * @param offset_parts The index in the parts array as of which the foreign
378
 *        parts reside (i.e. the current number of local #part).
379
 * @param ind_part The foreign #cell ID of each part.
380
 * @param Npart The number of stray parts, contains the number of parts received
381
 *        on return.
Pedro Gonnet's avatar
Pedro Gonnet committed
382
 * @param offset_gparts The index in the gparts array as of which the foreign
383
 *        parts reside (i.e. the current number of local #gpart).
384
 * @param ind_gpart The foreign #cell ID of each gpart.
Pedro Gonnet's avatar
Pedro Gonnet committed
385
 * @param Ngpart The number of stray gparts, contains the number of gparts
Pedro Gonnet's avatar
Pedro Gonnet committed
386
 *        received on return.
387
 * @param offset_sparts The index in the sparts array as of which the foreign
388
 *        parts reside (i.e. the current number of local #spart).
389
390
391
 * @param ind_spart The foreign #cell ID of each spart.
 * @param Nspart The number of stray sparts, contains the number of sparts
 *        received on return.
392
393
394
395
396
 * @param offset_bparts The index in the bparts array as of which the foreign
 *        parts reside (i.e. the current number of local #bpart).
 * @param ind_bpart The foreign #cell ID of each bpart.
 * @param Nbpart The number of stray bparts, contains the number of bparts
 *        received on return.
397
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
398
399
 * Note that this function does not mess-up the linkage between parts and
 * gparts, i.e. the received particles have correct linkeage.
400
 */
401
402
403
404
void engine_exchange_strays(struct engine *e, const size_t offset_parts,
                            const int *ind_part, size_t *Npart,
                            const size_t offset_gparts, const int *ind_gpart,
                            size_t *Ngpart, const size_t offset_sparts,
405
406
407
                            const int *ind_spart, size_t *Nspart,
                            const size_t offset_bparts, const int *ind_bpart,
                            size_t *Nbpart) {
408
409
410

#ifdef WITH_MPI

411
  struct space *s = e->s;
412
  ticks tic = getticks();
413
414

  /* Re-set the proxies. */
Pedro Gonnet's avatar
Pedro Gonnet committed
415
416
417
  for (int k = 0; k < e->nr_proxies; k++) {
    e->proxies[k].nr_parts_out = 0;
    e->proxies[k].nr_gparts_out = 0;
418
    e->proxies[k].nr_sparts_out = 0;
419
    e->proxies[k].nr_bparts_out = 0;
Pedro Gonnet's avatar
Pedro Gonnet committed
420
  }
421

422
  /* Put the parts into the corresponding proxies. */
423
  for (size_t k = 0; k < *Npart; k++) {
424
425
426
427

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_part[k] == -1) continue;

428
    /* Get the target node and proxy ID. */
429
    const int node_id = e->s->cells_top[ind_part[k]].nodeID;
430
431
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
432
    const int pid = e->proxy_ind[node_id];
433
    if (pid < 0) {
434
435
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
436
          "id=%lld, x=[%e,%e,%e].",
Pedro Gonnet's avatar
Pedro Gonnet committed
437
438
439
          node_id, s->parts[offset_parts + k].id,
          s->parts[offset_parts + k].x[0], s->parts[offset_parts + k].x[1],
          s->parts[offset_parts + k].x[2]);
440
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
441

442
443
    /* Re-link the associated gpart with the buffer offset of the part. */
    if (s->parts[offset_parts + k].gpart != NULL) {
444
445
      s->parts[offset_parts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_parts_out;
446
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
447

448
449
450
451
452
#ifdef SWIFT_DEBUG_CHECKS
    if (s->parts[offset_parts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

453
    /* Load the part and xpart into the proxy. */
454
455
    proxy_parts_load(&e->proxies[pid], &s->parts[offset_parts + k],
                     &s->xparts[offset_parts + k], 1);
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470

#ifdef WITH_LOGGER
    /* Log the particle when leaving a rank. */
    logger_log_part(e->log, s->parts[offset_parts + k],
                    logger_mask_data[logger_x].mask |
                    logger_mask_data[logger_v].mask |
                    logger_mask_data[logger_a].mask |
                    logger_mask_data[logger_u].mask |
                    logger_mask_data[logger_h].mask |
                    logger_mask_data[logger_rho].mask |
                    logger_mask_data[logger_consts].mask |
                    logger_mask_data[logger_special_flags].mask,
                    s->xparts[offset_parts + k].logger_data.last_offset,
                    logger_generate_flag(logger_flag_mpi, node_id));
#endif
471
  }
472

473
  /* Put the sparts into the corresponding proxies. */
474
  for (size_t k = 0; k < *Nspart; k++) {
475
476
477
478
479

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_spart[k] == -1) continue;

    /* Get the target node and proxy ID. */
480
481
482
483
    const int node_id = e->s->cells_top[ind_spart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
484
    if (pid < 0) {
485
486
487
488
489
490
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lld, x=[%e,%e,%e].",
          node_id, s->sparts[offset_sparts + k].id,
          s->sparts[offset_sparts + k].x[0], s->sparts[offset_sparts + k].x[1],
          s->sparts[offset_sparts + k].x[2]);
491
    }
492

493
    /* Re-link the associated gpart with the buffer offset of the spart. */
494
495
496
497
498
    if (s->sparts[offset_sparts + k].gpart != NULL) {
      s->sparts[offset_sparts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_sparts_out;
    }

499
500
501
502
503
#ifdef SWIFT_DEBUG_CHECKS
    if (s->sparts[offset_sparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

504
505
    /* Load the spart into the proxy */
    proxy_sparts_load(&e->proxies[pid], &s->sparts[offset_sparts + k], 1);
506
507
508
509
510
511
512
513
514
515
516

#ifdef WITH_LOGGER
    /* Log the particle when leaving a rank. */
    logger_log_spart(e->log, s->sparts[offset_sparts + k],
                     logger_mask_data[logger_x].mask |
                     logger_mask_data[logger_v].mask |
                     logger_mask_data[logger_consts].mask |
                     logger_mask_data[logger_special_flags].mask,
                     s->sparts[offset_parts + k].logger_data.last_offset,
                     logger_generate_flag(logger_flag_mpi, node_id));
#endif
517
  }
518

519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
  /* Put the bparts into the corresponding proxies. */
  for (size_t k = 0; k < *Nbpart; k++) {

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_bpart[k] == -1) continue;

    /* Get the target node and proxy ID. */
    const int node_id = e->s->cells_top[ind_bpart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
    if (pid < 0) {
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lld, x=[%e,%e,%e].",
          node_id, s->bparts[offset_bparts + k].id,
          s->bparts[offset_bparts + k].x[0], s->bparts[offset_bparts + k].x[1],
          s->bparts[offset_bparts + k].x[2]);
    }

    /* Re-link the associated gpart with the buffer offset of the bpart. */
    if (s->bparts[offset_bparts + k].gpart != NULL) {
      s->bparts[offset_bparts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_bparts_out;
    }

#ifdef SWIFT_DEBUG_CHECKS
    if (s->bparts[offset_bparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

    /* Load the bpart into the proxy */
    proxy_bparts_load(&e->proxies[pid], &s->bparts[offset_bparts + k], 1);
552
553
554
555

#ifdef WITH_LOGGER
    error("TODO");
#endif
556
  }
557

558
559
  /* Put the gparts into the corresponding proxies. */
  for (size_t k = 0; k < *Ngpart; k++) {
560
561
562
563
564

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_gpart[k] == -1) continue;

    /* Get the target node and proxy ID. */
565
566
567
568
    const int node_id = e->s->cells_top[ind_gpart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
569
    if (pid < 0) {
570
571
572
573
574
575
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lli, x=[%e,%e,%e].",
          node_id, s->gparts[offset_gparts + k].id_or_neg_offset,
          s->gparts[offset_gparts + k].x[0], s->gparts[offset_gparts + k].x[1],
          s->gparts[offset_gparts + k].x[2]);
576
577
578
579
580
581
    }

#ifdef SWIFT_DEBUG_CHECKS
    if (s->gparts[offset_gparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif
582
583
584

    /* Load the gpart into the proxy */
    proxy_gparts_load(&e->proxies[pid], &s->gparts[offset_gparts + k], 1);
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600

#ifdef WITH_LOGGER
    /* Write only the dark matter particles */
    if (gp->type == swift_type_dark_matter) {

      /* Log the particle when leaving a rank. */
      logger_log_gpart(e->log, s->gparts[offset_gparts + k],
                       logger_mask_data[logger_x].mask |
                       logger_mask_data[logger_v].mask |
                       logger_mask_data[logger_a].mask |
                       logger_mask_data[logger_consts].mask |
                       logger_mask_data[logger_special_flags].mask,
                       s->sparts[offset_parts + k].logger_data.last_offset,
                       logger_generate_flag(logger_flag_mpi, node_id));
    }
#endif
601
602
  }

603
  /* Launch the proxies. */
604
605
  MPI_Request reqs_in[5 * engine_maxproxies];
  MPI_Request reqs_out[5 * engine_maxproxies];
606
  for (int k = 0; k < e->nr_proxies; k++) {
607
    proxy_parts_exchange_first(&e->proxies[k]);
608
609
610
611
612
    reqs_in[k] = e->proxies[k].req_parts_count_in;
    reqs_out[k] = e->proxies[k].req_parts_count_out;
  }

  /* Wait for each count to come in and start the recv. */
613
  for (int k = 0; k < e->nr_proxies; k++) {
614
    int pid = MPI_UNDEFINED;
615
616
    if (MPI_Waitany(e->nr_proxies, reqs_in, &pid, MPI_STATUS_IGNORE) !=
            MPI_SUCCESS ||
617
618
619
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
620
    proxy_parts_exchange_second(&e->proxies[pid]);
621
  }
622

623
  /* Wait for all the sends to have finished too. */
624
625
626
  if (MPI_Waitall(e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

627
  /* Count the total number of incoming particles and make sure we have
628
     enough space to accommodate them. */
629
630
  int count_parts_in = 0;
  int count_gparts_in = 0;
631
  int count_sparts_in = 0;
632
  int count_bparts_in = 0;
633
634
635
  for (int k = 0; k < e->nr_proxies; k++) {
    count_parts_in += e->proxies[k].nr_parts_in;
    count_gparts_in += e->proxies[k].nr_gparts_in;
636
    count_sparts_in += e->proxies[k].nr_sparts_in;
637
    count_bparts_in += e->proxies[k].nr_bparts_in;
638
639
  }
  if (e->verbose) {
640
641
642
643
644
    message(
        "sent out %zu/%zu/%zu/%zu parts/gparts/sparts/bparts, got %i/%i/%i/%i "
        "back.",
        *Npart, *Ngpart, *Nspart, *Nbpart, count_parts_in, count_gparts_in,
        count_sparts_in, count_bparts_in);
645
  }
646
647

  /* Reallocate the particle arrays if necessary */
648
  if (offset_parts + count_parts_in > s->size_parts) {
649
    s->size_parts = (offset_parts + count_parts_in) * engine_parts_size_grow;
650
651
    struct part *parts_new = NULL;
    struct xpart *xparts_new = NULL;
652
    if (swift_memalign("parts", (void **)&parts_new, part_align,
653
                       sizeof(struct part) * s->size_parts) != 0 ||
654
        swift_memalign("xparts", (void **)&xparts_new, xpart_align,
655
656
                       sizeof(struct xpart) * s->size_parts) != 0)
      error("Failed to allocate new part data.");
657
658
    memcpy(parts_new, s->parts, sizeof(struct part) * offset_parts);
    memcpy(xparts_new, s->xparts, sizeof(struct xpart) * offset_parts);
659
660
    swift_free("parts", s->parts);
    swift_free("xparts", s->xparts);
661
662
    s->parts = parts_new;
    s->xparts = xparts_new;
663
664

    /* Reset the links */
Pedro Gonnet's avatar
Pedro Gonnet committed
665
666
    for (size_t k = 0; k < offset_parts; k++) {
      if (s->parts[k].gpart != NULL) {
667
        s->parts[k].gpart->id_or_neg_offset = -k;
Pedro Gonnet's avatar
Pedro Gonnet committed
668
669
      }
    }
670
  }
671

672
673
674
  if (offset_sparts + count_sparts_in > s->size_sparts) {
    s->size_sparts = (offset_sparts + count_sparts_in) * engine_parts_size_grow;
    struct spart *sparts_new = NULL;
675
    if (swift_memalign("sparts", (void **)&sparts_new, spart_align,
676
677
678
                       sizeof(struct spart) * s->size_sparts) != 0)
      error("Failed to allocate new spart data.");
    memcpy(sparts_new, s->sparts, sizeof(struct spart) * offset_sparts);
679
    swift_free("sparts", s->sparts);
680
    s->sparts = sparts_new;
681
682

    /* Reset the links */
683
    for (size_t k = 0; k < offset_sparts; k++) {
684
685
686
687
688
      if (s->sparts[k].gpart != NULL) {
        s->sparts[k].gpart->id_or_neg_offset = -k;
      }
    }
  }
689

690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
  if (offset_bparts + count_bparts_in > s->size_bparts) {
    s->size_bparts = (offset_bparts + count_bparts_in) * engine_parts_size_grow;
    struct bpart *bparts_new = NULL;
    if (swift_memalign("bparts", (void **)&bparts_new, bpart_align,
                       sizeof(struct bpart) * s->size_bparts) != 0)
      error("Failed to allocate new bpart data.");
    memcpy(bparts_new, s->bparts, sizeof(struct bpart) * offset_bparts);
    swift_free("bparts", s->bparts);
    s->bparts = bparts_new;

    /* Reset the links */
    for (size_t k = 0; k < offset_bparts; k++) {
      if (s->bparts[k].gpart != NULL) {
        s->bparts[k].gpart->id_or_neg_offset = -k;
      }
    }
  }
707

708
  if (offset_gparts + count_gparts_in > s->size_gparts) {
709
    s->size_gparts = (offset_gparts + count_gparts_in) * engine_parts_size_grow;
710
    struct gpart *gparts_new = NULL;
711
    if (swift_memalign("gparts", (void **)&gparts_new, gpart_align,
712
713
714
                       sizeof(struct gpart) * s->size_gparts) != 0)
      error("Failed to allocate new gpart data.");
    memcpy(gparts_new, s->gparts, sizeof(struct gpart) * offset_gparts);
715
    swift_free("gparts", s->gparts);
716
    s->gparts = gparts_new;
717

718
    /* Reset the links */
Pedro Gonnet's avatar
Pedro Gonnet committed
719
    for (size_t k = 0; k < offset_gparts; k++) {
720
      if (s->gparts[k].type == swift_type_gas) {
721
        s->parts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
722
      } else if (s->gparts[k].type == swift_type_stars) {
723
        s->sparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
724
725
      } else if (s->gparts[k].type == swift_type_black_hole) {
        s->bparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
Pedro Gonnet's avatar
Pedro Gonnet committed
726
727
      }
    }
728
  }
729
730

  /* Collect the requests for the particle data from the proxies. */
731
732
  int nr_in = 0, nr_out = 0;
  for (int k = 0; k < e->nr_proxies; k++) {
733
    if (e->proxies[k].nr_parts_in > 0) {
734
735
      reqs_in[5 * k] = e->proxies[k].req_parts_in;
      reqs_in[5 * k + 1] = e->proxies[k].req_xparts_in;
736
737
      nr_in += 2;
    } else {
738
      reqs_in[5 * k] = reqs_in[5 * k + 1] = MPI_REQUEST_NULL;
739
740
    }
    if (e->proxies[k].nr_gparts_in > 0) {
741
      reqs_in[5 * k + 2] = e->proxies[k].req_gparts_in;
742
      nr_in += 1;
743
    } else {
744
      reqs_in[5 * k + 2] = MPI_REQUEST_NULL;
745
    }
746
    if (e->proxies[k].nr_sparts_in > 0) {
747
      reqs_in[5 * k + 3] = e->proxies[k].req_sparts_in;
748
749
      nr_in += 1;
    } else {
750
751
752
753
754
755
756
      reqs_in[5 * k + 3] = MPI_REQUEST_NULL;
    }
    if (e->proxies[k].nr_bparts_in > 0) {
      reqs_in[5 * k + 4] = e->proxies[k].req_bparts_in;
      nr_in += 1;
    } else {
      reqs_in[5 * k + 4] = MPI_REQUEST_NULL;
757
758
    }

759
    if (e->proxies[k].nr_parts_out > 0) {
760
761
      reqs_out[5 * k] = e->proxies[k].req_parts_out;
      reqs_out[5 * k + 1] = e->proxies[k].req_xparts_out;
762
763
      nr_out += 2;
    } else {
764
      reqs_out[5 * k] = reqs_out[5 * k + 1] = MPI_REQUEST_NULL;
765
766
    }
    if (e->proxies[k].nr_gparts_out > 0) {
767
      reqs_out[5 * k + 2] = e->proxies[k].req_gparts_out;
768
769
      nr_out += 1;
    } else {
770
      reqs_out[5 * k + 2] = MPI_REQUEST_NULL;
771
772
    }
    if (e->proxies[k].nr_sparts_out > 0) {
773
774
775
776
777
778
779
      reqs_out[5 * k + 3] = e->proxies[k].req_sparts_out;
      nr_out += 1;
    } else {
      reqs_out[5 * k + 3] = MPI_REQUEST_NULL;
    }
    if (e->proxies[k].nr_bparts_out > 0) {
      reqs_out[5 * k + 4] = e->proxies[k].req_bparts_out;
780
      nr_out += 1;
781
    } else {
782
      reqs_out[5 * k + 4] = MPI_REQUEST_NULL;
783
    }
784
785
786
787
  }

  /* Wait for each part array to come in and collect the new
     parts from the proxies. */
788
  int count_parts = 0, count_gparts = 0, count_sparts = 0, count_bparts = 0;
789
  for (int k = 0; k < nr_in; k++) {
790
    int err, pid;
791
    if ((err = MPI_Waitany(5 * e->nr_proxies, reqs_in, &pid,
792
                           MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
793
794
795
796
797
798
      char buff[MPI_MAX_ERROR_STRING];
      int res;
      MPI_Error_string(err, buff, &res);
      error("MPI_Waitany failed (%s).", buff);
    }
    if (pid == MPI_UNDEFINED) break;
799
800
    // message( "request from proxy %i has arrived." , pid / 5 );
    pid = 5 * (pid / 5);
Pedro Gonnet's avatar
Pedro Gonnet committed
801

802
    /* If all the requests for a given proxy have arrived... */
803
804
    if (reqs_in[pid + 0] == MPI_REQUEST_NULL &&
        reqs_in[pid + 1] == MPI_REQUEST_NULL &&
805
        reqs_in[pid + 2] == MPI_REQUEST_NULL &&
806
        reqs_in[pid + 3] == MPI_REQUEST_NULL &&
807
        reqs_in[pid + 4] == MPI_REQUEST_NULL) {
808
      /* Copy the particle data to the part/xpart/gpart arrays. */
809
      struct proxy *prox = &e->proxies[pid / 5];
810
811
812
813
814
815
      memcpy(&s->parts[offset_parts + count_parts], prox->parts_in,
             sizeof(struct part) * prox->nr_parts_in);
      memcpy(&s->xparts[offset_parts + count_parts], prox->xparts_in,
             sizeof(struct xpart) * prox->nr_parts_in);
      memcpy(&s->gparts[offset_gparts + count_gparts], prox->gparts_in,
             sizeof(struct gpart) * prox->nr_gparts_in);
816
817
      memcpy(&s->sparts[offset_sparts + count_sparts], prox->sparts_in,
             sizeof(struct spart) * prox->nr_sparts_in);
818
819
      memcpy(&s->bparts[offset_bparts + count_bparts], prox->bparts_in,
             sizeof(struct bpart) * prox->nr_bparts_in);
820
821
822
823
824
825
826
827

#ifdef WITH_LOGGER
      logger_log_recv_strays(e->log, &s->parts[offset_parts + count_parts], prox->nr_parts_in,
                             &s->gparts[offset_gparts + count_gparts], prox->nr_gparts_in,
                             &s->sparts[offset_sparts + count_sparts], prox->nr_sparts_in,
                             &s->bparts[offset_bparts + count_bparts], prox->nr_bparts_in,
                             prox->nodeID);
#endif
828
829
      /* for (int k = offset; k < offset + count; k++)
         message(
830
            "received particle %lli, x=[%.3e %.3e %.3e], h=%.3e, from node %i.",
831
            s->parts[k].id, s->parts[k].x[0], s->parts[k].x[1],
832
            s->parts[k].x[2], s->parts[k].h, p->nodeID); */
Pedro Gonnet's avatar
Pedro Gonnet committed
833

834
      /* Re-link the gparts. */
835
836
      for (int kk = 0; kk < prox->nr_gparts_in; kk++) {
        struct gpart *gp = &s->gparts[offset_gparts + count_gparts + kk];
837
838

        if (gp->type == swift_type_gas) {
839
          struct part *p =
840
              &s->parts[offset_parts + count_parts - gp->id_or_neg_offset];
841
          gp->id_or_neg_offset = s->parts - p;
842
          p->gpart = gp;
843
        } else if (gp->type == swift_type_stars) {
844
845
846
847
          struct spart *sp =
              &s->sparts[offset_sparts + count_sparts - gp->id_or_neg_offset];
          gp->id_or_neg_offset = s->sparts - sp;
          sp->gpart = gp;
848
849
850
851
852
        } else if (gp->type == swift_type_black_hole) {
          struct bpart *bp =
              &s->bparts[offset_bparts + count_bparts - gp->id_or_neg_offset];
          gp->id_or_neg_offset = s->bparts - bp;
          bp->gpart = gp;
853
854
        }
      }
Pedro Gonnet's avatar
Pedro Gonnet committed
855

856
      /* Advance the counters. */
857
858
      count_parts += prox->nr_parts_in;
      count_gparts += prox->nr_gparts_in;
859
      count_sparts += prox->nr_sparts_in;
860
      count_bparts += prox->nr_bparts_in;
861
    }
862
  }
Pedro Gonnet's avatar
Pedro Gonnet committed
863

864
  /* Wait for all the sends to have finished too. */
Pedro Gonnet's avatar
Pedro Gonnet committed
865
  if (nr_out > 0)
866
    if (MPI_Waitall(5 * e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) !=
Pedro Gonnet's avatar
Pedro Gonnet committed
867
868
869
        MPI_SUCCESS)
      error("MPI_Waitall on sends failed.");

870
871
872
873
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());

874
  /* Return the number of harvested parts. */
875
876
  *Npart = count_parts;
  *Ngpart = count_gparts;
877
  *Nspart = count_sparts;
878
  *Nbpart = count_bparts;
Pedro Gonnet's avatar
Pedro Gonnet committed
879