engine.c 183 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
48
49
50
51
/* Load the profiler header, if needed. */
#ifdef WITH_PROFILER
#include <gperftools/profiler.h>
#endif

52
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
53
#include "engine.h"
54
55

/* Local headers. */
56
#include "active.h"
57
#include "atomic.h"
58
#include "cell.h"
59
#include "chemistry.h"
60
#include "clocks.h"
61
#include "cooling.h"
62
#include "cosmology.h"
63
64
#include "cycle.h"
#include "debug.h"
65
#include "entropy_floor.h"
66
#include "equation_of_state.h"
67
#include "error.h"
68
#include "gravity.h"
69
#include "gravity_cache.h"
70
#include "hydro.h"
lhausamm's avatar
lhausamm committed
71
#include "logger.h"
lhausamm's avatar
lhausamm committed
72
#include "logger_io.h"
73
#include "map.h"
74
#include "memswap.h"
75
#include "minmax.h"
76
#include "outputlist.h"
77
#include "parallel_io.h"
78
#include "part.h"
79
#include "partition.h"
James Willis's avatar
James Willis committed
80
#include "profiler.h"
81
#include "proxy.h"
82
#include "restart.h"
83
#include "runner.h"
84
85
#include "serial_io.h"
#include "single_io.h"
86
#include "sort_part.h"
87
#include "star_formation.h"
Loic Hausammann's avatar
Loic Hausammann committed
88
#include "stars_io.h"
89
#include "statistics.h"
90
#include "timers.h"
91
#include "tools.h"
92
#include "units.h"
93
#include "velociraptor_interface.h"
Matthieu Schaller's avatar
Matthieu Schaller committed
94
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
95

96
97
98
/* Particle cache size. */
#define CACHE_SIZE 512

99
100
101
102
103
const char *engine_policy_names[] = {"none",
                                     "rand",
                                     "steal",
                                     "keep",
                                     "block",
104
                                     "cpu tight",
105
                                     "mpi",
106
                                     "numa affinity",
107
                                     "hydro",
108
109
110
111
112
                                     "self gravity",
                                     "external gravity",
                                     "cosmological integration",
                                     "drift everything",
                                     "reconstruct multi-poles",
113
                                     "temperature",
114
                                     "cooling",
James Willis's avatar
James Willis committed
115
                                     "stars",
Loic Hausammann's avatar
Loic Hausammann committed
116
                                     "structure finding",
117
                                     "star formation",
118
119
                                     "feedback",
                                     "time-step limiter"};
Pedro Gonnet's avatar
Pedro Gonnet committed
120

121
122
123
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

124
125
126
127
128
/**
 * @brief Data collected from the cells at the end of a time-step
 */
struct end_of_step_data {

129
130
  size_t updated, g_updated, s_updated;
  size_t inhibited, g_inhibited, s_inhibited;
131
132
  integertime_t ti_hydro_end_min, ti_hydro_end_max, ti_hydro_beg_max;
  integertime_t ti_gravity_end_min, ti_gravity_end_max, ti_gravity_beg_max;
Loic Hausammann's avatar
Loic Hausammann committed
133
  integertime_t ti_stars_end_min;
134
135
136
  struct engine *e;
};

137
138
139
140
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
141
 * @param l A pointer to the #link, will be modified atomically.
142
143
144
145
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
146
void engine_addlink(struct engine *e, struct link **l, struct task *t) {
147

148
  /* Get the next free link. */
149
  const size_t ind = atomic_inc(&e->nr_links);
150
  if (ind >= e->size_links) {
151
152
153
    error(
        "Link table overflow. Increase the value of "
        "`Scheduler:links_per_tasks`.");
154
155
  }
  struct link *res = &e->links[ind];
156

157
  /* Set it atomically. */
158
  res->t = t;
159
  res->next = atomic_swap(l, res);
160
}
161

162
#ifdef WITH_MPI
163
/**
Peter W. Draper's avatar
Peter W. Draper committed
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
 * Do the exchange of one type of particles with all the other nodes.
 *
 * @param counts 2D array with the counts of particles to exchange with
 *               each other node.
 * @param parts the particle data to exchange
 * @param new_nr_parts the number of particles this node will have after all
 *                     exchanges have completed.
 * @param sizeofparts sizeof the particle struct.
 * @param alignsize the memory alignment required for this particle type.
 * @param mpi_type the MPI_Datatype for these particles.
 * @param nr_nodes the number of nodes to exchange with.
 * @param nodeID the id of this node.
 *
 * @result new particle data constructed from all the exchanges with the
 *         given alignment.
179
 */
180
static void *engine_do_redistribute(int *counts, char *parts,
181
182
                                    size_t new_nr_parts, size_t sizeofparts,
                                    size_t alignsize, MPI_Datatype mpi_type,
183
                                    int nr_nodes, int nodeID) {
184
185

  /* Allocate a new particle array with some extra margin */
186
  char *parts_new = NULL;
187
188
  if (posix_memalign(
          (void **)&parts_new, alignsize,
189
          sizeofparts * new_nr_parts * engine_redistribute_alloc_margin) != 0)
190
191
192
193
    error("Failed to allocate new particle data.");

  /* Prepare MPI requests for the asynchronous communications */
  MPI_Request *reqs;
194
195
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 2 * nr_nodes)) ==
      NULL)
196
197
    error("Failed to allocate MPI request list.");

198
  /* Only send and receive only "chunk" particles per request. So we need to
199
200
201
   * loop as many times as necessary here. Make 2Gb/sizeofparts so we only
   * send 2Gb packets. */
  const int chunk = INT_MAX / sizeofparts;
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
  int sent = 0;
  int recvd = 0;

  int activenodes = 1;
  while (activenodes) {

    for (int k = 0; k < 2 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

    /* Emit the sends and recvs for the data. */
    size_t offset_send = sent;
    size_t offset_recv = recvd;
    activenodes = 0;

    for (int k = 0; k < nr_nodes; k++) {

      /* Indices in the count arrays of the node of interest */
      const int ind_send = nodeID * nr_nodes + k;
      const int ind_recv = k * nr_nodes + nodeID;

      /* Are we sending any data this loop? */
      int sending = counts[ind_send] - sent;
      if (sending > 0) {
        activenodes++;
225
        if (sending > chunk) sending = chunk;
226
227
228
229

        /* If the send and receive is local then just copy. */
        if (k == nodeID) {
          int receiving = counts[ind_recv] - recvd;
230
          if (receiving > chunk) receiving = chunk;
231
          memcpy(&parts_new[offset_recv * sizeofparts],
232
                 &parts[offset_send * sizeofparts], sizeofparts * receiving);
233
234
        } else {
          /* Otherwise send it. */
235
236
237
          int res =
              MPI_Isend(&parts[offset_send * sizeofparts], sending, mpi_type, k,
                        ind_send, MPI_COMM_WORLD, &reqs[2 * k + 0]);
238
239
240
241
          if (res != MPI_SUCCESS)
            mpi_error(res, "Failed to isend parts to node %i.", k);
        }
      }
242

243
      /* If we're sending to this node, then move past it to next. */
244
      if (counts[ind_send] > 0) offset_send += counts[ind_send];
245

246
247
248
249
250
251
      /* Are we receiving any data from this node? Note already done if coming
       * from this node. */
      if (k != nodeID) {
        int receiving = counts[ind_recv] - recvd;
        if (receiving > 0) {
          activenodes++;
252
          if (receiving > chunk) receiving = chunk;
253
254
255
256
257
258
          int res = MPI_Irecv(&parts_new[offset_recv * sizeofparts], receiving,
                              mpi_type, k, ind_recv, MPI_COMM_WORLD,
                              &reqs[2 * k + 1]);
          if (res != MPI_SUCCESS)
            mpi_error(res, "Failed to emit irecv of parts from node %i.", k);
        }
259
260
      }

261
      /* If we're receiving from this node, then move past it to next. */
262
      if (counts[ind_recv] > 0) offset_recv += counts[ind_recv];
263
264
    }

265
266
267
268
269
270
271
    /* Wait for all the sends and recvs to tumble in. */
    MPI_Status stats[2 * nr_nodes];
    int res;
    if ((res = MPI_Waitall(2 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
      for (int k = 0; k < 2 * nr_nodes; k++) {
        char buff[MPI_MAX_ERROR_STRING];
        MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
272
273
        message("request from source %i, tag %i has error '%s'.",
                stats[k].MPI_SOURCE, stats[k].MPI_TAG, buff);
274
275
      }
      error("Failed during waitall for part data.");
276
    }
277
278
279
280

    /* Move to next chunks. */
    sent += chunk;
    recvd += chunk;
281
282
283
284
285
286
287
288
289
290
  }

  /* Free temps. */
  free(reqs);

  /* And return new memory. */
  return parts_new;
}
#endif

291
#ifdef WITH_MPI /* redist_mapper */
292

293
/* Support for engine_redistribute threadpool dest mappers. */
294
struct redist_mapper_data {
295
  int *counts;
296
297
298
299
300
  int *dest;
  int nodeID;
  int nr_nodes;
  struct cell *cells;
  struct space *s;
301
302
303
  void *base;
};

304
305
306
/* Generic function for accumulating counts for TYPE parts. Note
 * we use a local counts array to avoid the atomic_add in the parts
 * loop. */
Peter W. Draper's avatar
Peter W. Draper committed
307
#define ENGINE_REDISTRIBUTE_DEST_MAPPER(TYPE)                              \
308
309
310
  engine_redistribute_dest_mapper_##TYPE(void *map_data, int num_elements, \
                                         void *extra_data) {               \
    struct TYPE *parts = (struct TYPE *)map_data;                          \
311
312
    struct redist_mapper_data *mydata =                                    \
        (struct redist_mapper_data *)extra_data;                           \
313
314
315
316
    struct space *s = mydata->s;                                           \
    int *dest =                                                            \
        mydata->dest + (ptrdiff_t)(parts - (struct TYPE *)mydata->base);   \
    int *lcounts = NULL;                                                   \
317
318
    if ((lcounts = (int *)calloc(                                          \
             sizeof(int), mydata->nr_nodes * mydata->nr_nodes)) == NULL)   \
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
      error("Failed to allocate counts thread-specific buffer");           \
    for (int k = 0; k < num_elements; k++) {                               \
      for (int j = 0; j < 3; j++) {                                        \
        if (parts[k].x[j] < 0.0)                                           \
          parts[k].x[j] += s->dim[j];                                      \
        else if (parts[k].x[j] >= s->dim[j])                               \
          parts[k].x[j] -= s->dim[j];                                      \
      }                                                                    \
      const int cid = cell_getid(s->cdim, parts[k].x[0] * s->iwidth[0],    \
                                 parts[k].x[1] * s->iwidth[1],             \
                                 parts[k].x[2] * s->iwidth[2]);            \
      dest[k] = s->cells_top[cid].nodeID;                                  \
      size_t ind = mydata->nodeID * mydata->nr_nodes + dest[k];            \
      lcounts[ind] += 1;                                                   \
    }                                                                      \
    for (int k = 0; k < (mydata->nr_nodes * mydata->nr_nodes); k++)        \
      atomic_add(&mydata->counts[k], lcounts[k]);                          \
    free(lcounts);                                                         \
  }
338

339
340
/**
 * @brief Accumulate the counts of particles per cell.
341
 * Threadpool helper for accumulating the counts of particles per cell.
342
 *
343
 * part version.
344
 */
Peter W. Draper's avatar
Peter W. Draper committed
345
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(part);
346

347
348
/**
 * @brief Accumulate the counts of star particles per cell.
349
 * Threadpool helper for accumulating the counts of particles per cell.
350
 *
351
 * spart version.
352
 */
Peter W. Draper's avatar
Peter W. Draper committed
353
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(spart);
354

355
356
/**
 * @brief Accumulate the counts of gravity particles per cell.
357
 * Threadpool helper for accumulating the counts of particles per cell.
358
 *
359
 * gpart version.
360
 */
Peter W. Draper's avatar
Peter W. Draper committed
361
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(gpart);
362

363
#endif /* redist_mapper_data */
364

365
#ifdef WITH_MPI /* savelink_mapper_data */
366
367

/* Support for saving the linkage between gparts and parts/sparts. */
368
struct savelink_mapper_data {
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
  int nr_nodes;
  int *counts;
  void *parts;
  int nodeID;
};

/**
 * @brief Save the offset of each gravity partner of a part or spart.
 *
 * The offset is from the start of the sorted particles to be sent to a node.
 * This is possible as parts without gravity partners have a positive id.
 * These offsets are used to restore the pointers on the receiving node.
 *
 * CHECKS should be eliminated as dead code when optimizing.
 */
Peter W. Draper's avatar
Peter W. Draper committed
384
#define ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(TYPE, CHECKS)                      \
Peter W. Draper's avatar
Peter W. Draper committed
385
386
387
  engine_redistribute_savelink_mapper_##TYPE(void *map_data, int num_elements, \
                                             void *extra_data) {               \
    int *nodes = (int *)map_data;                                              \
388
389
    struct savelink_mapper_data *mydata =                                      \
        (struct savelink_mapper_data *)extra_data;                             \
Peter W. Draper's avatar
Peter W. Draper committed
390
391
392
393
394
395
396
    int nodeID = mydata->nodeID;                                               \
    int nr_nodes = mydata->nr_nodes;                                           \
    int *counts = mydata->counts;                                              \
    struct TYPE *parts = (struct TYPE *)mydata->parts;                         \
                                                                               \
    for (int j = 0; j < num_elements; j++) {                                   \
      int node = nodes[j];                                                     \
397
      int count = 0;                                                           \
Peter W. Draper's avatar
Peter W. Draper committed
398
399
400
      size_t offset = 0;                                                       \
      for (int i = 0; i < node; i++) offset += counts[nodeID * nr_nodes + i];  \
                                                                               \
401
      for (int k = 0; k < counts[nodeID * nr_nodes + node]; k++) {             \
Peter W. Draper's avatar
Peter W. Draper committed
402
403
        if (parts[k + offset].gpart != NULL) {                                 \
          if (CHECKS)                                                          \
404
            if (parts[k + offset].gpart->id_or_neg_offset > 0)                 \
Peter W. Draper's avatar
Peter W. Draper committed
405
406
407
408
409
410
              error("Trying to link a partnerless " #TYPE "!");                \
          parts[k + offset].gpart->id_or_neg_offset = -count;                  \
          count++;                                                             \
        }                                                                      \
      }                                                                        \
    }                                                                          \
411
412
413
414
415
416
417
  }

/**
 * @brief Save position of part-gpart links.
 * Threadpool helper for accumulating the counts of particles per cell.
 */
#ifdef SWIFT_DEBUG_CHECKS
Peter W. Draper's avatar
Peter W. Draper committed
418
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 1);
419
#else
Peter W. Draper's avatar
Peter W. Draper committed
420
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 0);
421
422
423
424
425
426
427
#endif

/**
 * @brief Save position of spart-gpart links.
 * Threadpool helper for accumulating the counts of particles per cell.
 */
#ifdef SWIFT_DEBUG_CHECKS
Peter W. Draper's avatar
Peter W. Draper committed
428
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 1);
429
#else
Peter W. Draper's avatar
Peter W. Draper committed
430
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 0);
431
432
#endif

433
#endif /* savelink_mapper_data */
434

435
#ifdef WITH_MPI /* relink_mapper_data */
436
437

/* Support for relinking parts, gparts and sparts after moving between nodes. */
438
struct relink_mapper_data {
439
440
441
442
  int nodeID;
  int nr_nodes;
  int *counts;
  int *s_counts;
443
444
445
446
447
448
449
450
451
  int *g_counts;
  struct space *s;
};

/**
 * @brief Restore the part/gpart and spart/gpart links for a list of nodes.
 *
 * @param map_data address of nodes to process.
 * @param num_elements the number nodes to process.
452
453
 * @param extra_data additional data defining the context (a
 * relink_mapper_data).
454
455
456
457
458
 */
static void engine_redistribute_relink_mapper(void *map_data, int num_elements,
                                              void *extra_data) {

  int *nodes = (int *)map_data;
459
  struct relink_mapper_data *mydata = (struct relink_mapper_data *)extra_data;
460
461
462
463

  int nodeID = mydata->nodeID;
  int nr_nodes = mydata->nr_nodes;
  int *counts = mydata->counts;
464
  int *g_counts = mydata->g_counts;
465
466
  int *s_counts = mydata->s_counts;
  struct space *s = mydata->s;
467
468
469
470

  for (int i = 0; i < num_elements; i++) {

    int node = nodes[i];
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485

    /* Get offsets to correct parts of the counts arrays for this node. */
    size_t offset_parts = 0;
    size_t offset_gparts = 0;
    size_t offset_sparts = 0;
    for (int n = 0; n < node; n++) {
      int ind_recv = n * nr_nodes + nodeID;
      offset_parts += counts[ind_recv];
      offset_gparts += g_counts[ind_recv];
      offset_sparts += s_counts[ind_recv];
    }

    /* Number of gparts sent from this node. */
    int ind_recv = node * nr_nodes + nodeID;
    const size_t count_gparts = g_counts[ind_recv];
486
487

    /* Loop over the gparts received from this node */
488
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; k++) {
489
490
491
492
493

      /* Does this gpart have a gas partner ? */
      if (s->gparts[k].type == swift_type_gas) {

        const ptrdiff_t partner_index =
494
            offset_parts - s->gparts[k].id_or_neg_offset;
495
496
497
498
499
500
501

        /* Re-link */
        s->gparts[k].id_or_neg_offset = -partner_index;
        s->parts[partner_index].gpart = &s->gparts[k];
      }

      /* Does this gpart have a star partner ? */
502
      else if (s->gparts[k].type == swift_type_stars) {
503
504

        const ptrdiff_t partner_index =
505
            offset_sparts - s->gparts[k].id_or_neg_offset;
506
507
508
509
510
511
512
513
514

        /* Re-link */
        s->gparts[k].id_or_neg_offset = -partner_index;
        s->sparts[partner_index].gpart = &s->gparts[k];
      }
    }
  }
}

515
#endif /* relink_mapper_data */
516

517
/**
518
 * @brief Redistribute the particles amongst the nodes according
519
520
 *      to their cell's node IDs.
 *
521
522
523
524
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
525
526
527
528
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
529
530
 *
 *
531
532
 * @param e The #engine.
 */
533
void engine_redistribute(struct engine *e) {
534

535
#ifdef WITH_MPI
536

537
538
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
539
  struct space *s = e->s;
540
  struct cell *cells = s->cells_top;
541
  const int nr_cells = s->nr_cells;
542
  struct xpart *xparts = s->xparts;
543
544
  struct part *parts = s->parts;
  struct gpart *gparts = s->gparts;
545
  struct spart *sparts = s->sparts;
546
  ticks tic = getticks();
547

548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
  size_t nr_parts = s->nr_parts;
  size_t nr_gparts = s->nr_gparts;
  size_t nr_sparts = s->nr_sparts;

  /* Start by moving inhibited particles to the end of the arrays */
  for (size_t k = 0; k < nr_parts; /* void */) {
    if (parts[k].time_bin == time_bin_inhibited) {
      nr_parts -= 1;

      /* Swap the particle */
      memswap(&parts[k], &parts[nr_parts], sizeof(struct part));

      /* Swap the xpart */
      memswap(&xparts[k], &xparts[nr_parts], sizeof(struct xpart));

      /* Swap the link with the gpart */
      if (parts[k].gpart != NULL) {
        parts[k].gpart->id_or_neg_offset = -k;
      }
      if (parts[nr_parts].gpart != NULL) {
        parts[nr_parts].gpart->id_or_neg_offset = -nr_parts;
      }
    } else {
      k++;
    }
  }

  /* Now move inhibited star particles to the end of the arrays */
  for (size_t k = 0; k < nr_sparts; /* void */) {
    if (sparts[k].time_bin == time_bin_inhibited) {
      nr_sparts -= 1;

      /* Swap the particle */
      memswap(&s->sparts[k], &s->sparts[nr_sparts], sizeof(struct spart));

      /* Swap the link with the gpart */
      if (s->sparts[k].gpart != NULL) {
        s->sparts[k].gpart->id_or_neg_offset = -k;
      }
      if (s->sparts[nr_sparts].gpart != NULL) {
        s->sparts[nr_sparts].gpart->id_or_neg_offset = -nr_sparts;
      }
    } else {
      k++;
    }
  }

  /* Finally do the same with the gravity particles */
  for (size_t k = 0; k < nr_gparts; /* void */) {
    if (gparts[k].time_bin == time_bin_inhibited) {
      nr_gparts -= 1;

      /* Swap the particle */
      memswap(&s->gparts[k], &s->gparts[nr_gparts], sizeof(struct gpart));

      /* Swap the link with part/spart */
      if (s->gparts[k].type == swift_type_gas) {
        s->parts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
      } else if (s->gparts[k].type == swift_type_stars) {
        s->sparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
      }
      if (s->gparts[nr_gparts].type == swift_type_gas) {
        s->parts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
            &s->gparts[nr_gparts];
      } else if (s->gparts[nr_gparts].type == swift_type_stars) {
        s->sparts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
            &s->gparts[nr_gparts];
      }
    } else {
      k++;
    }
  }

  /* Now we are ready to deal with real particles and can start the exchange. */

623
  /* Allocate temporary arrays to store the counts of particles to be sent
624
625
   * and the destination of each particle */
  int *counts;
626
  if ((counts = (int *)calloc(sizeof(int), nr_nodes * nr_nodes)) == NULL)
627
    error("Failed to allocate counts temporary buffer.");
628

629
  int *dest;
630
  if ((dest = (int *)malloc(sizeof(int) * nr_parts)) == NULL)
631
632
    error("Failed to allocate dest temporary buffer.");

633
634
635
636
637
  /* Simple index of node IDs, used for mappers over nodes. */
  int *nodes = NULL;
  if ((nodes = (int *)malloc(sizeof(int) * nr_nodes)) == NULL)
    error("Failed to allocate nodes temporary buffer.");
  for (int k = 0; k < nr_nodes; k++) nodes[k] = k;
638

639
  /* Get destination of each particle */
640
  struct redist_mapper_data redist_data;
641
642
643
  redist_data.s = s;
  redist_data.nodeID = nodeID;
  redist_data.nr_nodes = nr_nodes;
644

645
646
647
  redist_data.counts = counts;
  redist_data.dest = dest;
  redist_data.base = (void *)parts;
648

649
  threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_part, parts,
650
                 nr_parts, sizeof(struct part), 0, &redist_data);
651
652

  /* Sort the particles according to their cell index. */
653
  if (nr_parts > 0)
654
655
    space_parts_sort(s->parts, s->xparts, dest, &counts[nodeID * nr_nodes],
                     nr_nodes, 0);
656

657
658
#ifdef SWIFT_DEBUG_CHECKS
  /* Verify that the part have been sorted correctly. */
659
  for (size_t k = 0; k < nr_parts; k++) {
660
661
662
    const struct part *p = &s->parts[k];

    /* New cell index */
663
    const int new_cid =
664
665
666
667
        cell_getid(s->cdim, p->x[0] * s->iwidth[0], p->x[1] * s->iwidth[1],
                   p->x[2] * s->iwidth[2]);

    /* New cell of this part */
668
669
    const struct cell *c = &s->cells_top[new_cid];
    const int new_node = c->nodeID;
670

671
672
    if (dest[k] != new_node)
      error("part's new node index not matching sorted index.");
673
674
675
676
677

    if (p->x[0] < c->loc[0] || p->x[0] > c->loc[0] + c->width[0] ||
        p->x[1] < c->loc[1] || p->x[1] > c->loc[1] + c->width[1] ||
        p->x[2] < c->loc[2] || p->x[2] > c->loc[2] + c->width[2])
      error("part not sorted into the right top-level cell!");
678
679
680
  }
#endif

681
682
  /* We will need to re-link the gpart partners of parts, so save their
   * relative positions in the sent lists. */
683
  if (nr_parts > 0 && nr_gparts > 0) {
684

685
    struct savelink_mapper_data savelink_data;
686
687
688
689
690
691
    savelink_data.nr_nodes = nr_nodes;
    savelink_data.counts = counts;
    savelink_data.parts = (void *)parts;
    savelink_data.nodeID = nodeID;
    threadpool_map(&e->threadpool, engine_redistribute_savelink_mapper_part,
                   nodes, nr_nodes, sizeof(int), 0, &savelink_data);
692
  }
693
  free(dest);
694

695
  /* Get destination of each s-particle */
696
  int *s_counts;
697
  if ((s_counts = (int *)calloc(sizeof(int), nr_nodes * nr_nodes)) == NULL)
698
699
700
    error("Failed to allocate s_counts temporary buffer.");

  int *s_dest;
701
  if ((s_dest = (int *)malloc(sizeof(int) * nr_sparts)) == NULL)
702
703
    error("Failed to allocate s_dest temporary buffer.");

704
705
706
  redist_data.counts = s_counts;
  redist_data.dest = s_dest;
  redist_data.base = (void *)sparts;
707

708
  threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_spart, sparts,
709
                 nr_sparts, sizeof(struct spart), 0, &redist_data);
710
711

  /* Sort the particles according to their cell index. */
712
  if (nr_sparts > 0)
713
714
    space_sparts_sort(s->sparts, s_dest, &s_counts[nodeID * nr_nodes], nr_nodes,
                      0);
715

716
717
#ifdef SWIFT_DEBUG_CHECKS
  /* Verify that the spart have been sorted correctly. */
718
  for (size_t k = 0; k < nr_sparts; k++) {
719
720
721
    const struct spart *sp = &s->sparts[k];

    /* New cell index */
722
    const int new_cid =
723
724
725
726
        cell_getid(s->cdim, sp->x[0] * s->iwidth[0], sp->x[1] * s->iwidth[1],
                   sp->x[2] * s->iwidth[2]);

    /* New cell of this spart */
727
728
    const struct cell *c = &s->cells_top[new_cid];
    const int new_node = c->nodeID;
729

730
731
    if (s_dest[k] != new_node)
      error("spart's new node index not matching sorted index.");
732
733
734
735
736

    if (sp->x[0] < c->loc[0] || sp->x[0] > c->loc[0] + c->width[0] ||
        sp->x[1] < c->loc[1] || sp->x[1] > c->loc[1] + c->width[1] ||
        sp->x[2] < c->loc[2] || sp->x[2] > c->loc[2] + c->width[2])
      error("spart not sorted into the right top-level cell!");
737
738
739
  }
#endif

740
  /* We need to re-link the gpart partners of sparts. */
741
  if (nr_sparts > 0) {
742

743
    struct savelink_mapper_data savelink_data;
744
745
746
747
748
749
    savelink_data.nr_nodes = nr_nodes;
    savelink_data.counts = s_counts;
    savelink_data.parts = (void *)sparts;
    savelink_data.nodeID = nodeID;
    threadpool_map(&e->threadpool, engine_redistribute_savelink_mapper_spart,
                   nodes, nr_nodes, sizeof(int), 0, &savelink_data);
750
  }
751
752
  free(s_dest);

753
  /* Get destination of each g-particle */
754
  int *g_counts;
755
  if ((g_counts = (int *)calloc(sizeof(int), nr_nodes * nr_nodes)) == NULL)
756
757
758
    error("Failed to allocate g_gcount temporary buffer.");

  int *g_dest;
759
  if ((g_dest = (int *)malloc(sizeof(int) * nr_gparts)) == NULL)
760
761
    error("Failed to allocate g_dest temporary buffer.");

762
763
764
  redist_data.counts = g_counts;
  redist_data.dest = g_dest;
  redist_data.base = (void *)gparts;
765

766
  threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_gpart, gparts,
767
                 nr_gparts, sizeof(struct gpart), 0, &redist_data);
768
769

  /* Sort the gparticles according to their cell index. */
770
  if (nr_gparts > 0)
771
772
    space_gparts_sort(s->gparts, s->parts, s->sparts, g_dest,
                      &g_counts[nodeID * nr_nodes], nr_nodes);
773

774
775
#ifdef SWIFT_DEBUG_CHECKS
  /* Verify that the gpart have been sorted correctly. */
776
  for (size_t k = 0; k < nr_gparts; k++) {
777
778
779
    const struct gpart *gp = &s->gparts[k];

    /* New cell index */
780
    const int new_cid =
781
782
783
784
        cell_getid(s->cdim, gp->x[0] * s->iwidth[0], gp->x[1] * s->iwidth[1],
                   gp->x[2] * s->iwidth[2]);

    /* New cell of this gpart */
785
786
    const struct cell *c = &s->cells_top[new_cid];
    const int new_node = c->nodeID;
787

788
    if (g_dest[k] != new_node)
789
790
      error("gpart's new node index not matching sorted index (%d != %d).",
            g_dest[k], new_node);
791
792
793
794
795

    if (gp->x[0] < c->loc[0] || gp->x[0] > c->loc[0] + c->width[0] ||
        gp->x[1] < c->loc[1] || gp->x[1] > c->loc[1] + c->width[1] ||
        gp->x[2] < c->loc[2] || gp->x[2] > c->loc[2] + c->width[2])
      error("gpart not sorted into the right top-level cell!");
796
797
798
  }
#endif

799
800
  free(g_dest);

801
802
803
804
805
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

806
  /* Get all the s_counts from all the nodes. */
807
808
809
810
811
812
813
814
815
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, s_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce sparticle transfer counts.");

Peter W. Draper's avatar
Peter W. Draper committed
816
  /* Report how many particles will be moved. */
817
818
  if (e->verbose) {
    if (e->nodeID == 0) {
819
820
      size_t total = 0, g_total = 0, s_total = 0;
      size_t unmoved = 0, g_unmoved = 0, s_unmoved = 0;
821
      for (int p = 0, r = 0; p < nr_nodes; p++) {
822
        for (int n = 0; n < nr_nodes; n++) {
823
          total += counts[r];
824
825
          g_total += g_counts[r];
          s_total += s_counts[r];
826
          if (p == n) {
827
828
829
830
            unmoved += counts[r];
            g_unmoved += g_counts[r];
            s_unmoved += s_counts[r];
          }
831
832
833
          r++;
        }
      }
Matthieu Schaller's avatar
Matthieu Schaller committed
834
835
836
837
838
839
840
841
842
843
844
      if (total > 0)
        message("%ld of %ld (%.2f%%) of particles moved", total - unmoved,
                total, 100.0 * (double)(total - unmoved) / (double)total);
      if (g_total > 0)
        message("%ld of %ld (%.2f%%) of g-particles moved", g_total - g_unmoved,
                g_total,
                100.0 * (double)(g_total - g_unmoved) / (double)g_total);
      if (s_total > 0)
        message("%ld of %ld (%.2f%%) of s-particles moved", s_total - s_unmoved,
                s_total,
                100.0 * (double)(s_total - s_unmoved) / (double)s_total);
845
    }
846
847
  }

Peter W. Draper's avatar
Peter W. Draper committed
848
849
850
  /* Now each node knows how many parts, sparts and gparts will be transferred
   * to every other node.
   * Get the new numbers of particles for this node. */
851
  size_t nr_parts_new = 0, nr_gparts_new = 0, nr_sparts_new = 0;
852
  for (int k = 0; k < nr_nodes; k++)
853
    nr_parts_new += counts[k * nr_nodes + nodeID];
854
  for (int k = 0; k < nr_nodes; k++)
855
856
857
    nr_gparts_new += g_counts[k * nr_nodes + nodeID];
  for (int k = 0; k < nr_nodes; k++)
    nr_sparts_new += s_counts[k * nr_nodes + nodeID];
858

Peter W. Draper's avatar
Peter W. Draper committed
859
860
861
862
  /* Now exchange the particles, type by type to keep the memory required
   * under control. */

  /* SPH particles. */
863
864
865
  void *new_parts = engine_do_redistribute(
      counts, (char *)s->parts, nr_parts_new, sizeof(struct part), part_align,
      part_mpi_type, nr_nodes, nodeID);
866
  free(s->parts);
867
  s->parts = (struct part *)new_parts;
868
869
  s->nr_parts = nr_parts_new;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts_new;
870

Peter W. Draper's avatar
Peter W. Draper committed
871
  /* Extra SPH particle properties. */
872
  new_parts = engine_do_redistribute(counts, (char *)s->xparts, nr_parts_new,
873
874
875
                                     sizeof(struct xpart), xpart_align,
                                     xpart_mpi_type, nr_nodes, nodeID);
  free(s->xparts);
876
  s->xparts = (struct xpart *)new_parts;
877

Peter W. Draper's avatar
Peter W. Draper committed
878
  /* Gravity particles. */
879
  new_parts = engine_do_redistribute(g_counts, (char *)s->gparts, nr_gparts_new,
880
881
882
                                     sizeof(struct gpart), gpart_align,
                                     gpart_mpi_type, nr_nodes, nodeID);
  free(s->gparts);
883
  s->gparts = (struct gpart *)new_parts;
884
885
  s->nr_gparts = nr_gparts_new;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts_new;
886

Peter W. Draper's avatar
Peter W. Draper committed
887
  /* Star particles. */
888
  new_parts = engine_do_redistribute(s_counts, (char *)s->sparts, nr_sparts_new,
889
890
891
                                     sizeof(struct spart), spart_align,
                                     spart_mpi_type, nr_nodes, nodeID);
  free(s->sparts);
892
  s->sparts = (struct spart *)new_parts;
893
894
  s->nr_sparts = nr_sparts_new;
  s->size_sparts = engine_redistribute_alloc_margin * nr_sparts_new;
895

896
897
898
  /* All particles have now arrived. Time for some final operations on the
     stuff we just received */

899
900
901
  /* Restore the part<->gpart and spart<->gpart links.
   * Generate indices and counts for threadpool tasks. Note we process a node
   * at a time. */
902
  struct relink_mapper_data relink_data;
903
  relink_data.s = s;
904
905
906
907
908
  relink_data.counts = counts;
  relink_data.g_counts = g_counts;
  relink_data.s_counts = s_counts;
  relink_data.nodeID = nodeID;
  relink_data.nr_nodes = nr_nodes;
909

910
911
  threadpool_map(&e->threadpool, engine_redistribute_relink_mapper, nodes,
                 nr_nodes, sizeof(int), 1, &relink_data);
912
  free(nodes);
913

914
  /* Clean up the counts now we are done. */
915
916
917
918
  free(counts);
  free(g_counts);
  free(s_counts);

919
#ifdef SWIFT_DEBUG_CHECKS
920
  /* Verify that all parts are in the right place. */
921
  for (size_t k = 0; k < nr_parts_new; k++) {
922
923
924
    const int cid = cell_getid(s->cdim, s->parts[k].x[0] * s->iwidth[0],
                               s->parts[k].x[1] * s->iwidth[1],
                               s->parts[k].x[2] * s->iwidth[2]);
925
    if (cells[cid].nodeID != nodeID)
926
      error("Received particle (%zu) that does not belong here (nodeID=%i).", k,
927
928
            cells[cid].nodeID);
  }
929
  for (size_t k = 0; k < nr_gparts_new; k++) {
930
931
932
    const int cid = cell_getid(s->cdim, s->gparts[k].x[0] * s->iwidth[0],
                               s->gparts[k].x[1] * s->iwidth[1],
                               s->gparts[k].x[2] * s->iwidth[2]);
933
934
935
936
    if (cells[cid].nodeID != nodeID)
      error("Received g-particle (%zu) that does not belong here (nodeID=%i).",
            k, cells[cid].nodeID);
  }
937
  for (size_t k = 0; k < nr_sparts_new; k++) {
938
939
940
    const int cid = cell_getid(s->cdim, s->sparts[k].x[0] * s->iwidth[0],
                               s->sparts[k].x[1] * s->iwidth[1],
                               s->sparts[k].x[2] * s->iwidth[2]);
941
942
943
944
    if (cells[cid].nodeID != nodeID)
      error("Received s-particle (%zu) that does not belong here (nodeID=%i).",
            k, cells[cid].nodeID);
  }
945

946
  /* Verify that the links are correct */
947
948
  part_verify_links(s->parts, s->gparts, s->sparts, nr_parts_new, nr_gparts_new,
                    nr_sparts_new, e->verbose);
949
#endif
950

951
952
953
954
955
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
956
    message("node %i now has %zu parts, %zu sparts and %zu gparts in %i cells.",
957
            nodeID, nr_parts_new, nr_sparts_new, nr_gparts_new, my_cells);
958
959
  }

960
961
962
  /* Flag that a redistribute has taken place */
  e->step_props |= engine_step_prop_redistribute;

963
964
965
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
966
#else
967
  error("SWIFT was not compiled with MPI support.");
968
969
#endif
}
970

971
/**
972
 * @brief Repartition the cells amongst the nodes.
973
974
975
 *
 * @param e The #engine.
 */
Pedro Gonnet's avatar