engine.c 182 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
36
#include <sys/stat.h>
37
#include <sys/types.h>
38
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
39

40
41
/* MPI headers. */
#ifdef WITH_MPI
42

43
#include <mpi.h>
44
45
#endif

Angus Lepper's avatar
Angus Lepper committed
46
47
#ifdef HAVE_LIBNUMA
#include <numa.h>
48
49
#endif

50
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
51
#include "engine.h"
52
53

/* Local headers. */
54
#include "active.h"
55
#include "atomic.h"
56
#include "black_holes_properties.h"
57
#include "cell.h"
58
#include "chemistry.h"
59
#include "clocks.h"
60
#include "cooling.h"
61
#include "cosmology.h"
62
63
#include "cycle.h"
#include "debug.h"
64
#include "entropy_floor.h"
65
#include "equation_of_state.h"
66
#include "error.h"
Alexei Borissov's avatar
Alexei Borissov committed
67
#include "feedback.h"
Matthieu Schaller's avatar
Matthieu Schaller committed
68
#include "fof.h"
69
#include "gravity.h"
70
#include "gravity_cache.h"
71
#include "hydro.h"
lhausamm's avatar
lhausamm committed
72
#include "logger.h"
lhausamm's avatar
lhausamm committed
73
#include "logger_io.h"
74
#include "map.h"
75
#include "memuse.h"
76
#include "minmax.h"
77
#include "mpiuse.h"
78
#include "multipole_struct.h"
79
#include "outputlist.h"
80
#include "parallel_io.h"
81
#include "part.h"
82
#include "partition.h"
James Willis's avatar
James Willis committed
83
#include "profiler.h"
84
#include "proxy.h"
85
#include "restart.h"
86
#include "runner.h"
87
88
#include "serial_io.h"
#include "single_io.h"
89
#include "sort_part.h"
90
#include "star_formation.h"
91
#include "star_formation_logger.h"
Loic Hausammann's avatar
Loic Hausammann committed
92
#include "stars_io.h"
93
#include "statistics.h"
94
#include "timers.h"
95
#include "tools.h"
96
#include "units.h"
97
#include "velociraptor_interface.h"
98
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
99

100
101
102
/* Particle cache size. */
#define CACHE_SIZE 512

103
104
105
106
107
const char *engine_policy_names[] = {"none",
                                     "rand",
                                     "steal",
                                     "keep",
                                     "block",
108
                                     "cpu tight",
109
                                     "mpi",
110
                                     "numa affinity",
111
                                     "hydro",
112
113
114
115
116
                                     "self gravity",
                                     "external gravity",
                                     "cosmological integration",
                                     "drift everything",
                                     "reconstruct multi-poles",
117
                                     "temperature",
118
                                     "cooling",
James Willis's avatar
James Willis committed
119
                                     "stars",
120
                                     "structure finding",
121
                                     "star formation",
122
                                     "feedback",
123
                                     "black holes",
124
                                     "fof search",
125
                                     "time-step limiter",
Loic Hausammann's avatar
Loic Hausammann committed
126
                                     "time-step sync",
Loic Hausammann's avatar
Format    
Loic Hausammann committed
127
                                     "logger"};
Pedro Gonnet's avatar
Pedro Gonnet committed
128

129
130
131
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

132
/** The current step of the engine as a global variable (for messages). */
133
int engine_current_step;
134

135
136
137
extern int engine_max_parts_per_ghost;
extern int engine_max_sparts_per_ghost;

138
139
140
141
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
142
 * @param l A pointer to the #link, will be modified atomically.
143
144
145
146
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
147
void engine_addlink(struct engine *e, struct link **l, struct task *t) {
148

149
150
151
152
153
154
#ifdef SWIFT_DEBUG_CHECKS
  if (t == NULL) {
    error("Trying to link NULL task.");
  }
#endif

155
  /* Get the next free link. */
156
  const size_t ind = atomic_inc(&e->nr_links);
157
  if (ind >= e->size_links) {
158
159
160
    error(
        "Link table overflow. Increase the value of "
        "`Scheduler:links_per_tasks`.");
161
162
  }
  struct link *res = &e->links[ind];
163

164
  /* Set it atomically. */
165
  res->t = t;
166
  res->next = atomic_swap(l, res);
167
}
168

169
/**
170
 * @brief Repartition the cells amongst the nodes.
171
172
173
 *
 * @param e The #engine.
 */
174
void engine_repartition(struct engine *e) {
175

176
#if defined(WITH_MPI) && (defined(HAVE_PARMETIS) || defined(HAVE_METIS))
177

178
179
  ticks tic = getticks();

180
#ifdef SWIFT_DEBUG_CHECKS
181
182
183
184
  /* Be verbose about this. */
  if (e->nodeID == 0 || e->verbose) message("repartitioning space");
  fflush(stdout);

185
  /* Check that all cells have been drifted to the current time */
186
  space_check_drift_point(e->s, e->ti_current, /*check_multipoles=*/0);
187
188
#endif

189
  /* Clear the repartition flag. */
190
  e->forcerepart = 0;
191

192
  /* Nothing to do if only using a single node. Also avoids METIS
193
   * bug that doesn't handle this case well. */
194
  if (e->nr_nodes == 1) return;
195

196
197
  /* Generate the fixed costs include file. */
  if (e->step > 3 && e->reparttype->trigger <= 1.f) {
Peter W. Draper's avatar
Peter W. Draper committed
198
199
    task_dump_stats("partition_fixed_costs.h", e, /* header = */ 1,
                    /* allranks = */ 1);
200
201
  }

202
  /* Do the repartitioning. */
203
  partition_repartition(e->reparttype, e->nodeID, e->nr_nodes, e->s,
204
                        e->sched.tasks, e->sched.nr_tasks);
205

206
207
  /* Partitioning requires copies of the particles, so we need to reduce the
   * memory in use to the minimum, we can free the sorting indices and the
208
209
   * tasks as these will be regenerated at the next rebuild. Also the foreign
   * particle arrays can go as these will be regenerated in proxy exchange. */
210
211

  /* Sorting indices. */
212
  if (e->s->cells_top != NULL) space_free_cells(e->s);
213

214
215
216
  /* Report the time spent in the different task categories */
  if (e->verbose) scheduler_report_task_times(&e->sched, e->nr_threads);

217
218
  /* Task arrays. */
  scheduler_free_tasks(&e->sched);
219

220
221
222
  /* Foreign parts. (no need to nullify the cell pointers as the cells
   * will be regenerated) */
  space_free_foreign_parts(e->s, /*clear_cell_pointers=*/0);
223

224
225
226
  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
227
     the parts array, and emitting the sends and receives.
228
229
230
231
232
233
234
235
236
237
238
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

239
240
241
  /* Flag that a repartition has taken place */
  e->step_props |= engine_step_prop_repartition;

242
243
244
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
245
#else
246
  if (e->reparttype->type != REPART_NONE)
247
    error("SWIFT was not compiled with MPI and METIS or ParMETIS support.");
248
249
250

  /* Clear the repartition flag. */
  e->forcerepart = 0;
251
#endif
252
}
253

254
255
256
257
258
259
260
261
262
/**
 * @brief Decide whether trigger a repartition the cells amongst the nodes.
 *
 * @param e The #engine.
 */
void engine_repartition_trigger(struct engine *e) {

#ifdef WITH_MPI

263
  const ticks tic = getticks();
264
265
  static int opened = 0;
  if (e->restarting) opened = 1;
266

267
268
  /* Do nothing if there have not been enough steps since the last repartition
   * as we don't want to repeat this too often or immediately after a
269
270
271
272
   * repartition step, or also immediately on restart. We check all this
   * even when we are not repartitioning as the balance logs can still be
   * interesting. */
  if (e->step - e->last_repartition >= 2 && !e->restarting) {
273

274
    /* If we have fixed costs available and this is step 2 or we are forcing
275
276
277
278
279
280
281
282
283
284
285
286
287
288
     * repartitioning then we do a forced fixed costs repartition regardless. */
    int forced = 0;
    if (e->reparttype->type != REPART_NONE) {
      if (e->reparttype->trigger > 1 ||
          (e->step == 2 && e->reparttype->use_fixed_costs)) {
        if (e->reparttype->trigger > 1) {
          if ((e->step % (int)e->reparttype->trigger) == 0) e->forcerepart = 1;
        } else {
          e->forcerepart = 1;
        }
        e->reparttype->use_ticks = 0;
        forced = 1;
      }
    }
289

290
291
292
293
294
295
296
297
298
299
300
    /* We only check the CPU loads when we have processed a significant number
     * of all particles as we require all tasks to have timings or are
     * interested in the various balances logs. */
    if ((e->updates > 1 &&
         e->updates >= e->total_nr_parts * e->reparttype->minfrac) ||
        (e->g_updates > 1 &&
         e->g_updates >= e->total_nr_gparts * e->reparttype->minfrac)) {

      /* Are we using the task tick timings or fixed costs? */
      if (e->reparttype->use_fixed_costs > 1) {
        e->reparttype->use_ticks = 0;
301
      } else {
302
        e->reparttype->use_ticks = 1;
303
304
      }

305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
      /* Get the resident size of the process for the memory logs. */
      long size, resident, shared, text, library, data, dirty;
      memuse_use(&size, &resident, &shared, &text, &data, &library, &dirty);

      /* Gather it together with the CPU times used by the tasks in the last
       * step. */
      double timemem[3] = {e->usertime_last_step, e->systime_last_step,
                           (double)resident};
      double timemems[e->nr_nodes * 3];
      MPI_Gather(&timemem, 3, MPI_DOUBLE, timemems, 3, MPI_DOUBLE, 0,
                 MPI_COMM_WORLD);
      if (e->nodeID == 0) {

        /* Get the range and mean of the two CPU times and memory. */
        double umintime = timemems[0];
        double umaxtime = timemems[0];

        double smintime = timemems[1];
        double smaxtime = timemems[1];

        double minmem = timemems[2];
        double maxmem = timemems[2];

        double tmintime = umintime + smintime;
        double tmaxtime = umaxtime + smaxtime;

        double usum = timemems[0];
        double ssum = timemems[1];
        double tsum = usum + ssum;

        double msum = timemems[2];

        for (int k = 3; k < e->nr_nodes * 3; k += 3) {
          if (timemems[k] > umaxtime) umaxtime = timemems[k];
          if (timemems[k] < umintime) umintime = timemems[k];

          if (timemems[k + 1] > smaxtime) smaxtime = timemems[k + 1];
          if (timemems[k + 1] < smintime) smintime = timemems[k + 1];

          double total = timemems[k] + timemems[k + 1];
          if (total > tmaxtime) tmaxtime = total;
          if (total < tmintime) tmintime = total;

          usum += timemems[k];
          ssum += timemems[k + 1];
          tsum += total;

          if (timemems[k + 2] > maxmem) maxmem = timemems[k + 2];
          if (timemems[k + 2] < minmem) minmem = timemems[k + 2];
          msum += timemems[k + 2];
        }
        double umean = usum / (double)e->nr_nodes;
        double smean = ssum / (double)e->nr_nodes;
        double tmean = tsum / (double)e->nr_nodes;
        double mmean = msum / (double)e->nr_nodes;

        /* Are we out of balance and need to repartition? */
        /* ---------------------------------------------- */
        double abs_trigger = fabs(e->reparttype->trigger);
        double balance = (umaxtime - umintime) / umean;
        if (e->reparttype->type != REPART_NONE) {

          /* When forced we don't care about the balance. */
          if (!forced) {
            if (balance > abs_trigger) {
              if (e->verbose)
                message("trigger fraction %.3f > %.3f will repartition",
                        balance, abs_trigger);
              e->forcerepart = 1;
            } else {
              if (e->verbose && e->reparttype->type != REPART_NONE)
                message("trigger fraction %.3f =< %.3f will not repartition",
                        balance, abs_trigger);
            }
          }
380

381
        } else {
382
          /* Not repartitioning, would that have been done otherwise? */
383
          if (e->verbose) {
384
385
386
387
388
            if (balance > abs_trigger) {
              message("trigger fraction %.3f > %.3f would have repartitioned",
                      balance, abs_trigger);
            } else {
              message(
389
                  "trigger fraction %.3f =< %.3f would not have repartitioned",
390
391
                  balance, abs_trigger);
            }
392
          }
393
        }
394

395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
        /* Keep logs of all CPU times and resident memory size for debugging
         * load issues. */
        FILE *timelog = NULL;
        FILE *memlog = NULL;
        if (!opened) {
          timelog = fopen("rank_cpu_balance.log", "w");
          fprintf(timelog, "# step rank user sys sum\n");

          memlog = fopen("rank_memory_balance.log", "w");
          fprintf(memlog, "# step rank resident\n");

          opened = 1;
        } else {
          timelog = fopen("rank_cpu_balance.log", "a");
          memlog = fopen("rank_memory_balance.log", "a");
        }

        for (int k = 0; k < e->nr_nodes * 3; k += 3) {

          fprintf(timelog, "%d %d %f %f %f\n", e->step, k / 3, timemems[k],
                  timemems[k + 1], timemems[k] + timemems[k + 1]);

          fprintf(memlog, "%d %d %ld\n", e->step, k / 3, (long)timemems[k + 2]);
418
        }
419
420
421
422
423
424
425
426
427
428
429
430
431
432

        fprintf(timelog, "# %d mean times: %f %f %f\n", e->step, umean, smean,
                tmean);
        if (abs_trigger > 1.f) abs_trigger = 0.f; /* Not relevant. */
        fprintf(timelog,
                "# %d balance: %f, expected: %f (sys: %f, total: %f)\n",
                e->step, balance, abs_trigger, (smaxtime - smintime) / smean,
                (tmaxtime - tmintime) / tmean);

        fclose(timelog);

        fprintf(memlog, "# %d mean resident memory: %f, balance: %f\n", e->step,
                mmean, (maxmem - minmem) / mmean);
        fclose(memlog);
433
      }
434

435
      /* All nodes do this together, so send to other ranks. */
436
      MPI_Bcast(&e->forcerepart, 1, MPI_INT, 0, MPI_COMM_WORLD);
437
438
439
    }

    /* Remember we did this. */
Peter W. Draper's avatar
Peter W. Draper committed
440
    if (e->forcerepart) e->last_repartition = e->step;
441
442
  }

443
444
445
  if (e->verbose)
    message("took %.3f %s", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
446
447
448
#endif
}

449
/**
450
451
452
453
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
454
void engine_exchange_cells(struct engine *e) {
455

456
#ifdef WITH_MPI
457

458
  const int with_gravity = e->policy & engine_policy_self_gravity;
459
  const ticks tic = getticks();
460

461
  /* Exchange the cell structure with neighbouring ranks. */
462
  proxy_cells_exchange(e->proxies, e->nr_proxies, e->s, with_gravity);
463

464
465
466
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
467
468
469
470
471
472
473

#else
  error("SWIFT was not compiled with MPI support.");
#endif
}

/**
474
 * @brief Exchange straying particles with other nodes.
475
476
 *
 * @param e The #engine.
Pedro Gonnet's avatar
Pedro Gonnet committed
477
 * @param offset_parts The index in the parts array as of which the foreign
478
 *        parts reside (i.e. the current number of local #part).
479
 * @param ind_part The foreign #cell ID of each part.
480
 * @param Npart The number of stray parts, contains the number of parts received
481
 *        on return.
Pedro Gonnet's avatar
Pedro Gonnet committed
482
 * @param offset_gparts The index in the gparts array as of which the foreign
483
 *        parts reside (i.e. the current number of local #gpart).
484
 * @param ind_gpart The foreign #cell ID of each gpart.
Pedro Gonnet's avatar
Pedro Gonnet committed
485
 * @param Ngpart The number of stray gparts, contains the number of gparts
Pedro Gonnet's avatar
Pedro Gonnet committed
486
 *        received on return.
487
 * @param offset_sparts The index in the sparts array as of which the foreign
488
 *        parts reside (i.e. the current number of local #spart).
489
490
491
 * @param ind_spart The foreign #cell ID of each spart.
 * @param Nspart The number of stray sparts, contains the number of sparts
 *        received on return.
492
493
494
495
496
 * @param offset_bparts The index in the bparts array as of which the foreign
 *        parts reside (i.e. the current number of local #bpart).
 * @param ind_bpart The foreign #cell ID of each bpart.
 * @param Nbpart The number of stray bparts, contains the number of bparts
 *        received on return.
497
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
498
499
 * Note that this function does not mess-up the linkage between parts and
 * gparts, i.e. the received particles have correct linkeage.
500
 */
501
void engine_exchange_strays(struct engine *e, const size_t offset_parts,
502
503
504
505
506
507
508
                            const int *restrict ind_part, size_t *Npart,
                            const size_t offset_gparts,
                            const int *restrict ind_gpart, size_t *Ngpart,
                            const size_t offset_sparts,
                            const int *restrict ind_spart, size_t *Nspart,
                            const size_t offset_bparts,
                            const int *restrict ind_bpart, size_t *Nbpart) {
509
510

#ifdef WITH_MPI
511
  struct space *s = e->s;
512
  ticks tic = getticks();
513
514

  /* Re-set the proxies. */
Pedro Gonnet's avatar
Pedro Gonnet committed
515
516
517
  for (int k = 0; k < e->nr_proxies; k++) {
    e->proxies[k].nr_parts_out = 0;
    e->proxies[k].nr_gparts_out = 0;
518
    e->proxies[k].nr_sparts_out = 0;
519
    e->proxies[k].nr_bparts_out = 0;
Pedro Gonnet's avatar
Pedro Gonnet committed
520
  }
521

522
  /* Put the parts into the corresponding proxies. */
523
  for (size_t k = 0; k < *Npart; k++) {
524
525
526
527

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_part[k] == -1) continue;

528
    /* Get the target node and proxy ID. */
529
    const int node_id = e->s->cells_top[ind_part[k]].nodeID;
530
531
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
532
    const int pid = e->proxy_ind[node_id];
533
    if (pid < 0) {
534
535
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
536
          "id=%lld, x=[%e,%e,%e].",
Pedro Gonnet's avatar
Pedro Gonnet committed
537
538
539
          node_id, s->parts[offset_parts + k].id,
          s->parts[offset_parts + k].x[0], s->parts[offset_parts + k].x[1],
          s->parts[offset_parts + k].x[2]);
540
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
541

542
543
    /* Re-link the associated gpart with the buffer offset of the part. */
    if (s->parts[offset_parts + k].gpart != NULL) {
544
545
      s->parts[offset_parts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_parts_out;
546
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
547

548
549
550
551
552
#ifdef SWIFT_DEBUG_CHECKS
    if (s->parts[offset_parts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

553
    /* Load the part and xpart into the proxy. */
554
555
    proxy_parts_load(&e->proxies[pid], &s->parts[offset_parts + k],
                     &s->xparts[offset_parts + k], 1);
556
557

#ifdef WITH_LOGGER
Loic Hausammann's avatar
Loic Hausammann committed
558
    if (e->policy & engine_policy_logger) {
Loic Hausammann's avatar
Format    
Loic Hausammann committed
559
560
      const uint32_t logger_flag =
          logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
Loic Hausammann's avatar
Loic Hausammann committed
561
562

      /* Log the particle when leaving a rank. */
Loic Hausammann's avatar
Format    
Loic Hausammann committed
563
564
565
566
      logger_log_part(
          e->logger, &s->parts[offset_parts + k], &s->xparts[offset_parts + k],
          logger_masks_all_part | logger_mask_data[logger_special_flags].mask,
          logger_flag);
Loic Hausammann's avatar
Loic Hausammann committed
567
    }
568
#endif
569
  }
570

571
  /* Put the sparts into the corresponding proxies. */
572
  for (size_t k = 0; k < *Nspart; k++) {
573
574
575
576
577

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_spart[k] == -1) continue;

    /* Get the target node and proxy ID. */
578
579
580
581
    const int node_id = e->s->cells_top[ind_spart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
582
    if (pid < 0) {
583
584
585
586
587
588
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lld, x=[%e,%e,%e].",
          node_id, s->sparts[offset_sparts + k].id,
          s->sparts[offset_sparts + k].x[0], s->sparts[offset_sparts + k].x[1],
          s->sparts[offset_sparts + k].x[2]);
589
    }
590

591
    /* Re-link the associated gpart with the buffer offset of the spart. */
592
593
594
595
596
    if (s->sparts[offset_sparts + k].gpart != NULL) {
      s->sparts[offset_sparts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_sparts_out;
    }

597
598
599
600
601
#ifdef SWIFT_DEBUG_CHECKS
    if (s->sparts[offset_sparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

602
603
    /* Load the spart into the proxy */
    proxy_sparts_load(&e->proxies[pid], &s->sparts[offset_sparts + k], 1);
604
605

#ifdef WITH_LOGGER
Loic Hausammann's avatar
Loic Hausammann committed
606
    if (e->policy & engine_policy_logger) {
Loic Hausammann's avatar
Format    
Loic Hausammann committed
607
608
      const uint32_t logger_flag =
          logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
Loic Hausammann's avatar
Loic Hausammann committed
609
610

      /* Log the particle when leaving a rank. */
Loic Hausammann's avatar
Format    
Loic Hausammann committed
611
612
613
614
      logger_log_spart(
          e->logger, &s->sparts[offset_sparts + k],
          logger_masks_all_spart | logger_mask_data[logger_special_flags].mask,
          logger_flag);
Loic Hausammann's avatar
Loic Hausammann committed
615
    }
616
#endif
617
  }
618

619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
  /* Put the bparts into the corresponding proxies. */
  for (size_t k = 0; k < *Nbpart; k++) {

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_bpart[k] == -1) continue;

    /* Get the target node and proxy ID. */
    const int node_id = e->s->cells_top[ind_bpart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
    if (pid < 0) {
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lld, x=[%e,%e,%e].",
          node_id, s->bparts[offset_bparts + k].id,
          s->bparts[offset_bparts + k].x[0], s->bparts[offset_bparts + k].x[1],
          s->bparts[offset_bparts + k].x[2]);
    }

    /* Re-link the associated gpart with the buffer offset of the bpart. */
    if (s->bparts[offset_bparts + k].gpart != NULL) {
      s->bparts[offset_bparts + k].gpart->id_or_neg_offset =
          -e->proxies[pid].nr_bparts_out;
    }

#ifdef SWIFT_DEBUG_CHECKS
    if (s->bparts[offset_bparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif

    /* Load the bpart into the proxy */
    proxy_bparts_load(&e->proxies[pid], &s->bparts[offset_bparts + k], 1);
652
653

#ifdef WITH_LOGGER
Loic Hausammann's avatar
Loic Hausammann committed
654
    if (e->policy & engine_policy_logger) {
Loic Hausammann's avatar
Loic Hausammann committed
655
      error("Not yet implemented.");
Loic Hausammann's avatar
Loic Hausammann committed
656
    }
657
#endif
658
  }
659

660
661
  /* Put the gparts into the corresponding proxies. */
  for (size_t k = 0; k < *Ngpart; k++) {
662
663
664
665
666

    /* Ignore the particles we want to get rid of (inhibited, ...). */
    if (ind_gpart[k] == -1) continue;

    /* Get the target node and proxy ID. */
667
668
669
670
    const int node_id = e->s->cells_top[ind_gpart[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
671
    if (pid < 0) {
672
673
674
675
676
677
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%lli, x=[%e,%e,%e].",
          node_id, s->gparts[offset_gparts + k].id_or_neg_offset,
          s->gparts[offset_gparts + k].x[0], s->gparts[offset_gparts + k].x[1],
          s->gparts[offset_gparts + k].x[2]);
678
679
680
681
682
683
    }

#ifdef SWIFT_DEBUG_CHECKS
    if (s->gparts[offset_gparts + k].time_bin == time_bin_inhibited)
      error("Attempting to exchange an inhibited particle");
#endif
684
685
686

    /* Load the gpart into the proxy */
    proxy_gparts_load(&e->proxies[pid], &s->gparts[offset_gparts + k], 1);
687
688
689

#ifdef WITH_LOGGER
    /* Write only the dark matter particles */
Loic Hausammann's avatar
Loic Hausammann committed
690
691
    if ((e->policy & engine_policy_logger) &&
        s->gparts[offset_gparts + k].type == swift_type_dark_matter) {
Loic Hausammann's avatar
Loic Hausammann committed
692

Loic Hausammann's avatar
Format    
Loic Hausammann committed
693
694
      const uint32_t logger_flag =
          logger_pack_flags_and_data(logger_flag_mpi_exit, node_id);
695
696

      /* Log the particle when leaving a rank. */
Loic Hausammann's avatar
Format    
Loic Hausammann committed
697
698
699
700
      logger_log_gpart(
          e->logger, &s->gparts[offset_gparts + k],
          logger_masks_all_gpart | logger_mask_data[logger_special_flags].mask,
          logger_flag);
701
702
    }
#endif
703
704
  }

705
  /* Launch the proxies. */
706
707
  MPI_Request reqs_in[5 * engine_maxproxies];
  MPI_Request reqs_out[5 * engine_maxproxies];
708
  for (int k = 0; k < e->nr_proxies; k++) {
709
    proxy_parts_exchange_first(&e->proxies[k]);
710
711
712
713
714
    reqs_in[k] = e->proxies[k].req_parts_count_in;
    reqs_out[k] = e->proxies[k].req_parts_count_out;
  }

  /* Wait for each count to come in and start the recv. */
715
  for (int k = 0; k < e->nr_proxies; k++) {
716
    int pid = MPI_UNDEFINED;
717
718
    if (MPI_Waitany(e->nr_proxies, reqs_in, &pid, MPI_STATUS_IGNORE) !=
            MPI_SUCCESS ||
719
720
721
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
722
    proxy_parts_exchange_second(&e->proxies[pid]);
723
  }
724

725
  /* Wait for all the sends to have finished too. */
726
727
728
  if (MPI_Waitall(e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

729
  /* Count the total number of incoming particles and make sure we have
730
     enough space to accommodate them. */
731
732
  int count_parts_in = 0;
  int count_gparts_in = 0;
733
  int count_sparts_in = 0;
734
  int count_bparts_in = 0;
735
736
737
  for (int k = 0; k < e->nr_proxies; k++) {
    count_parts_in += e->proxies[k].nr_parts_in;
    count_gparts_in += e->proxies[k].nr_gparts_in;
738
    count_sparts_in += e->proxies[k].nr_sparts_in;
739
    count_bparts_in += e->proxies[k].nr_bparts_in;
740
741
  }
  if (e->verbose) {
742
743
744
745
746
    message(
        "sent out %zu/%zu/%zu/%zu parts/gparts/sparts/bparts, got %i/%i/%i/%i "
        "back.",
        *Npart, *Ngpart, *Nspart, *Nbpart, count_parts_in, count_gparts_in,
        count_sparts_in, count_bparts_in);
747
  }
748
749

  /* Reallocate the particle arrays if necessary */
750
  if (offset_parts + count_parts_in > s->size_parts) {
751
    s->size_parts = (offset_parts + count_parts_in) * engine_parts_size_grow;
752
753
    struct part *parts_new = NULL;
    struct xpart *xparts_new = NULL;
754
    if (swift_memalign("parts", (void **)&parts_new, part_align,
755
                       sizeof(struct part) * s->size_parts) != 0 ||
756
        swift_memalign("xparts", (void **)&xparts_new, xpart_align,
757
758
                       sizeof(struct xpart) * s->size_parts) != 0)
      error("Failed to allocate new part data.");
759
760
    memcpy(parts_new, s->parts, sizeof(struct part) * offset_parts);
    memcpy(xparts_new, s->xparts, sizeof(struct xpart) * offset_parts);
761
762
    swift_free("parts", s->parts);
    swift_free("xparts", s->xparts);
763
764
    s->parts = parts_new;
    s->xparts = xparts_new;
765
766

    /* Reset the links */
Pedro Gonnet's avatar
Pedro Gonnet committed
767
768
    for (size_t k = 0; k < offset_parts; k++) {
      if (s->parts[k].gpart != NULL) {
769
        s->parts[k].gpart->id_or_neg_offset = -k;
Pedro Gonnet's avatar
Pedro Gonnet committed
770
771
      }
    }
772
  }
773

774
775
776
  if (offset_sparts + count_sparts_in > s->size_sparts) {
    s->size_sparts = (offset_sparts + count_sparts_in) * engine_parts_size_grow;
    struct spart *sparts_new = NULL;
777
    if (swift_memalign("sparts", (void **)&sparts_new, spart_align,
778
779
780
                       sizeof(struct spart) * s->size_sparts) != 0)
      error("Failed to allocate new spart data.");
    memcpy(sparts_new, s->sparts, sizeof(struct spart) * offset_sparts);
781
    swift_free("sparts", s->sparts);
782
    s->sparts = sparts_new;
783
784

    /* Reset the links */
785
    for (size_t k = 0; k < offset_sparts; k++) {
786
787
788
789
790
      if (s->sparts[k].gpart != NULL) {
        s->sparts[k].gpart->id_or_neg_offset = -k;
      }
    }
  }
791

792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
  if (offset_bparts + count_bparts_in > s->size_bparts) {
    s->size_bparts = (offset_bparts + count_bparts_in) * engine_parts_size_grow;
    struct bpart *bparts_new = NULL;
    if (swift_memalign("bparts", (void **)&bparts_new, bpart_align,
                       sizeof(struct bpart) * s->size_bparts) != 0)
      error("Failed to allocate new bpart data.");
    memcpy(bparts_new, s->bparts, sizeof(struct bpart) * offset_bparts);
    swift_free("bparts", s->bparts);
    s->bparts = bparts_new;

    /* Reset the links */
    for (size_t k = 0; k < offset_bparts; k++) {
      if (s->bparts[k].gpart != NULL) {
        s->bparts[k].gpart->id_or_neg_offset = -k;
      }
    }
  }
809

810
  if (offset_gparts + count_gparts_in > s->size_gparts) {
811
    s->size_gparts = (offset_gparts + count_gparts_in) * engine_parts_size_grow;
812
    struct gpart *gparts_new = NULL;
813
    if (swift_memalign("gparts", (void **)&gparts_new, gpart_align,
814
815
816
                       sizeof(struct gpart) * s->size_gparts) != 0)
      error("Failed to allocate new gpart data.");
    memcpy(gparts_new, s->gparts, sizeof(struct gpart) * offset_gparts);
817
    swift_free("gparts", s->gparts);
818
    s->gparts = gparts_new;
819

820
    /* Reset the links */
Pedro Gonnet's avatar
Pedro Gonnet committed
821
    for (size_t k = 0; k < offset_gparts; k++) {
822
      if (s->gparts[k].type == swift_type_gas) {
823
        s->parts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
824
      } else if (s->gparts[k].type == swift_type_stars) {
825
        s->sparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
826
827
      } else if (s->gparts[k].type == swift_type_black_hole) {
        s->bparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
Pedro Gonnet's avatar
Pedro Gonnet committed
828
829
      }
    }
830
  }
831
832

  /* Collect the requests for the particle data from the proxies. */
833
834
  int nr_in = 0, nr_out = 0;
  for (int k = 0; k < e->nr_proxies; k++) {
835
    if (e->proxies[k].nr_parts_in > 0) {
836
837
      reqs_in[5 * k] = e->proxies[k].req_parts_in;
      reqs_in[5 * k + 1] = e->proxies[k].req_xparts_in;
838
839
      nr_in += 2;
    } else {
840
      reqs_in[5 * k] = reqs_in[5 * k + 1] = MPI_REQUEST_NULL;
841
842
    }
    if (e->proxies[k].nr_gparts_in > 0) {
843
      reqs_in[5 * k + 2] = e->proxies[k].req_gparts_in;
844
      nr_in += 1;
845
    } else {
846
      reqs_in[5 * k + 2] = MPI_REQUEST_NULL;
847
    }
848
    if (e->proxies[k].nr_sparts_in > 0) {
849
      reqs_in[5 * k + 3] = e->proxies[k].req_sparts_in;
850
851
      nr_in += 1;
    } else {
852
853
854
855
856
857
858
      reqs_in[5 * k + 3] = MPI_REQUEST_NULL;
    }
    if (e->proxies[k].nr_bparts_in > 0) {
      reqs_in[5 * k + 4] = e->proxies[k].req_bparts_in;
      nr_in += 1;
    } else {
      reqs_in[5 * k + 4] = MPI_REQUEST_NULL;
859
860
    }

861
    if (e->proxies[k].nr_parts_out > 0) {
862
863
      reqs_out[5 * k] = e->proxies[k].req_parts_out;
      reqs_out[5 * k + 1] = e->proxies[k].req_xparts_out;
864
865
      nr_out += 2;
    } else {
866
      reqs_out[5 * k] = reqs_out[5 * k + 1] = MPI_REQUEST_NULL;
867
868
    }
    if (e->proxies[k].nr_gparts_out > 0) {
869
      reqs_out[5 * k + 2] = e->proxies[k].req_gparts_out;
870
871
      nr_out += 1;
    } else {
872
      reqs_out[5 * k + 2] = MPI_REQUEST_NULL;
873
874
    }
    if (e->proxies[k].nr_sparts_out > 0) {
875
876
877
878
879
880
881
      reqs_out[5 * k + 3] = e->proxies[k].req_sparts_out;
      nr_out += 1;
    } else {
      reqs_out[5 * k + 3] = MPI_REQUEST_NULL;
    }
    if (e->proxies[k].nr_bparts_out > 0) {
      reqs_out[5 * k + 4] = e->proxies[k].req_bparts_out;
882
      nr_out += 1;
883
    } else {
884
      reqs_out[5 * k + 4] = MPI_REQUEST_NULL;
885
    }
886
887
888
889
  }

  /* Wait for each part array to come in and collect the new
     parts from the proxies. */
890
  int count_parts = 0, count_gparts = 0, count_sparts = 0, count_bparts = 0;
891
  for (int k = 0; k < nr_in; k++) {
892
    int err, pid;
893
    if ((err = MPI_Waitany(5 * e->nr_proxies, reqs_in, &pid,
894
                           MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
895
896
897
898
899
900
      char buff[MPI_MAX_ERROR_STRING];
      int res;
      MPI_Error_string(err, buff, &res);
      error("MPI_Waitany failed (%s).", buff);
    }
    if (pid == MPI_UNDEFINED) break;
901
902
    // message( "request from proxy %i has arrived." , pid / 5 );
    pid = 5 * (pid / 5);
Pedro Gonnet's avatar
Pedro Gonnet committed
903

904
    /* If all the requests for a given proxy have arrived... */
905
906
    if (reqs_in[pid + 0] == MPI_REQUEST_NULL &&
        reqs_in[pid + 1] == MPI_REQUEST_NULL &&
907
        reqs_in[pid + 2] == MPI_REQUEST_NULL &&
908
        reqs_in[pid + 3] == MPI_REQUEST_NULL &&
909
        reqs_in[pid + 4] == MPI_REQUEST_NULL) {
910
      /* Copy the particle data to the part/xpart/gpart arrays. */
911
      struct proxy *prox = &e->proxies[pid / 5];
912
913
914
915
916
917
      memcpy(&s->parts[offset_parts + count_parts], prox->parts_in,
             sizeof(struct part) * prox->nr_parts_in);
      memcpy(&s->xparts[offset_parts + count_parts], prox->xparts_in,
             sizeof(struct xpart) * prox->nr_parts_in);
      memcpy(&s->gparts[offset_gparts + count_gparts], prox->gparts_in,
             sizeof(struct gpart) * prox->nr_gparts_in);
918
919
      memcpy(&s->sparts[offset_sparts + count_sparts], prox->sparts_in,
             sizeof(struct spart) * prox