engine.c 181 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
48
49
50
51
/* Load the profiler header, if needed. */
#ifdef WITH_PROFILER
#include <gperftools/profiler.h>
#endif

52
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
53
#include "engine.h"
54
55

/* Local headers. */
56
#include "active.h"
57
#include "atomic.h"
58
#include "cell.h"
59
#include "chemistry.h"
60
#include "clocks.h"
61
#include "cooling.h"
62
#include "cosmology.h"
63
64
#include "cycle.h"
#include "debug.h"
65
#include "equation_of_state.h"
66
#include "error.h"
67
#include "gravity.h"
68
#include "gravity_cache.h"
69
#include "hydro.h"
lhausamm's avatar
lhausamm committed
70
#include "logger.h"
lhausamm's avatar
lhausamm committed
71
#include "logger_io.h"
72
#include "map.h"
73
#include "memswap.h"
74
#include "minmax.h"
75
#include "outputlist.h"
76
#include "parallel_io.h"
77
#include "part.h"
78
#include "partition.h"
James Willis's avatar
James Willis committed
79
#include "profiler.h"
80
#include "proxy.h"
81
#include "restart.h"
82
#include "runner.h"
83
84
#include "serial_io.h"
#include "single_io.h"
85
#include "sort_part.h"
86
#include "sourceterms.h"
Loic Hausammann's avatar
Loic Hausammann committed
87
#include "stars_io.h"
88
#include "statistics.h"
89
#include "timers.h"
90
#include "tools.h"
91
#include "units.h"
92
#include "velociraptor_interface.h"
Matthieu Schaller's avatar
Matthieu Schaller committed
93
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
94

95
96
97
/* Particle cache size. */
#define CACHE_SIZE 512

98
99
100
101
102
const char *engine_policy_names[] = {"none",
                                     "rand",
                                     "steal",
                                     "keep",
                                     "block",
103
                                     "cpu tight",
104
                                     "mpi",
105
                                     "numa affinity",
106
                                     "hydro",
107
108
109
110
111
                                     "self gravity",
                                     "external gravity",
                                     "cosmological integration",
                                     "drift everything",
                                     "reconstruct multi-poles",
112
113
                                     "cooling",
                                     "sourceterms",
James Willis's avatar
James Willis committed
114
                                     "stars",
Loic Hausammann's avatar
Loic Hausammann committed
115
                                     "structure finding",
116
                                     "star formation",
117
                                     "feedback"};
Pedro Gonnet's avatar
Pedro Gonnet committed
118

119
120
121
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

122
123
124
125
126
/**
 * @brief Data collected from the cells at the end of a time-step
 */
struct end_of_step_data {

127
128
  size_t updated, g_updated, s_updated;
  size_t inhibited, g_inhibited, s_inhibited;
129
130
  integertime_t ti_hydro_end_min, ti_hydro_end_max, ti_hydro_beg_max;
  integertime_t ti_gravity_end_min, ti_gravity_end_max, ti_gravity_beg_max;
131
132
133
  struct engine *e;
};

134
135
136
137
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
138
 * @param l A pointer to the #link, will be modified atomically.
139
140
141
142
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
143
void engine_addlink(struct engine *e, struct link **l, struct task *t) {
144

145
  /* Get the next free link. */
146
  const size_t ind = atomic_inc(&e->nr_links);
147
148
149
150
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
151

152
  /* Set it atomically. */
153
  res->t = t;
154
  res->next = atomic_swap(l, res);
155
}
156

157
#ifdef WITH_MPI
158
/**
Peter W. Draper's avatar
Peter W. Draper committed
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
 * Do the exchange of one type of particles with all the other nodes.
 *
 * @param counts 2D array with the counts of particles to exchange with
 *               each other node.
 * @param parts the particle data to exchange
 * @param new_nr_parts the number of particles this node will have after all
 *                     exchanges have completed.
 * @param sizeofparts sizeof the particle struct.
 * @param alignsize the memory alignment required for this particle type.
 * @param mpi_type the MPI_Datatype for these particles.
 * @param nr_nodes the number of nodes to exchange with.
 * @param nodeID the id of this node.
 *
 * @result new particle data constructed from all the exchanges with the
 *         given alignment.
174
 */
175
static void *engine_do_redistribute(int *counts, char *parts,
176
177
                                    size_t new_nr_parts, size_t sizeofparts,
                                    size_t alignsize, MPI_Datatype mpi_type,
178
                                    int nr_nodes, int nodeID) {
179
180

  /* Allocate a new particle array with some extra margin */
181
  char *parts_new = NULL;
182
183
  if (posix_memalign(
          (void **)&parts_new, alignsize,
184
          sizeofparts * new_nr_parts * engine_redistribute_alloc_margin) != 0)
185
186
187
188
    error("Failed to allocate new particle data.");

  /* Prepare MPI requests for the asynchronous communications */
  MPI_Request *reqs;
189
190
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 2 * nr_nodes)) ==
      NULL)
191
192
    error("Failed to allocate MPI request list.");

193
  /* Only send and receive only "chunk" particles per request. So we need to
194
195
196
   * loop as many times as necessary here. Make 2Gb/sizeofparts so we only
   * send 2Gb packets. */
  const int chunk = INT_MAX / sizeofparts;
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
  int sent = 0;
  int recvd = 0;

  int activenodes = 1;
  while (activenodes) {

    for (int k = 0; k < 2 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

    /* Emit the sends and recvs for the data. */
    size_t offset_send = sent;
    size_t offset_recv = recvd;
    activenodes = 0;

    for (int k = 0; k < nr_nodes; k++) {

      /* Indices in the count arrays of the node of interest */
      const int ind_send = nodeID * nr_nodes + k;
      const int ind_recv = k * nr_nodes + nodeID;

      /* Are we sending any data this loop? */
      int sending = counts[ind_send] - sent;
      if (sending > 0) {
        activenodes++;
220
        if (sending > chunk) sending = chunk;
221
222
223
224

        /* If the send and receive is local then just copy. */
        if (k == nodeID) {
          int receiving = counts[ind_recv] - recvd;
225
          if (receiving > chunk) receiving = chunk;
226
          memcpy(&parts_new[offset_recv * sizeofparts],
227
                 &parts[offset_send * sizeofparts], sizeofparts * receiving);
228
229
        } else {
          /* Otherwise send it. */
230
231
232
          int res =
              MPI_Isend(&parts[offset_send * sizeofparts], sending, mpi_type, k,
                        ind_send, MPI_COMM_WORLD, &reqs[2 * k + 0]);
233
234
235
236
          if (res != MPI_SUCCESS)
            mpi_error(res, "Failed to isend parts to node %i.", k);
        }
      }
237

238
      /* If we're sending to this node, then move past it to next. */
239
      if (counts[ind_send] > 0) offset_send += counts[ind_send];
240

241
242
243
244
245
246
      /* Are we receiving any data from this node? Note already done if coming
       * from this node. */
      if (k != nodeID) {
        int receiving = counts[ind_recv] - recvd;
        if (receiving > 0) {
          activenodes++;
247
          if (receiving > chunk) receiving = chunk;
248
249
250
251
252
253
          int res = MPI_Irecv(&parts_new[offset_recv * sizeofparts], receiving,
                              mpi_type, k, ind_recv, MPI_COMM_WORLD,
                              &reqs[2 * k + 1]);
          if (res != MPI_SUCCESS)
            mpi_error(res, "Failed to emit irecv of parts from node %i.", k);
        }
254
255
      }

256
      /* If we're receiving from this node, then move past it to next. */
257
      if (counts[ind_recv] > 0) offset_recv += counts[ind_recv];
258
259
    }

260
261
262
263
264
265
266
    /* Wait for all the sends and recvs to tumble in. */
    MPI_Status stats[2 * nr_nodes];
    int res;
    if ((res = MPI_Waitall(2 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
      for (int k = 0; k < 2 * nr_nodes; k++) {
        char buff[MPI_MAX_ERROR_STRING];
        MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
267
268
        message("request from source %i, tag %i has error '%s'.",
                stats[k].MPI_SOURCE, stats[k].MPI_TAG, buff);
269
270
      }
      error("Failed during waitall for part data.");
271
    }
272
273
274
275

    /* Move to next chunks. */
    sent += chunk;
    recvd += chunk;
276
277
278
279
280
281
282
283
284
285
  }

  /* Free temps. */
  free(reqs);

  /* And return new memory. */
  return parts_new;
}
#endif

286
#ifdef WITH_MPI /* redist_mapper */
287

288
/* Support for engine_redistribute threadpool dest mappers. */
289
struct redist_mapper_data {
290
  int *counts;
291
292
293
294
295
  int *dest;
  int nodeID;
  int nr_nodes;
  struct cell *cells;
  struct space *s;
296
297
298
  void *base;
};

299
300
301
/* Generic function for accumulating counts for TYPE parts. Note
 * we use a local counts array to avoid the atomic_add in the parts
 * loop. */
Peter W. Draper's avatar
Peter W. Draper committed
302
#define ENGINE_REDISTRIBUTE_DEST_MAPPER(TYPE)                              \
303
304
305
  engine_redistribute_dest_mapper_##TYPE(void *map_data, int num_elements, \
                                         void *extra_data) {               \
    struct TYPE *parts = (struct TYPE *)map_data;                          \
306
307
    struct redist_mapper_data *mydata =                                    \
        (struct redist_mapper_data *)extra_data;                           \
308
309
310
311
    struct space *s = mydata->s;                                           \
    int *dest =                                                            \
        mydata->dest + (ptrdiff_t)(parts - (struct TYPE *)mydata->base);   \
    int *lcounts = NULL;                                                   \
312
313
    if ((lcounts = (int *)calloc(                                          \
             sizeof(int), mydata->nr_nodes * mydata->nr_nodes)) == NULL)   \
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
      error("Failed to allocate counts thread-specific buffer");           \
    for (int k = 0; k < num_elements; k++) {                               \
      for (int j = 0; j < 3; j++) {                                        \
        if (parts[k].x[j] < 0.0)                                           \
          parts[k].x[j] += s->dim[j];                                      \
        else if (parts[k].x[j] >= s->dim[j])                               \
          parts[k].x[j] -= s->dim[j];                                      \
      }                                                                    \
      const int cid = cell_getid(s->cdim, parts[k].x[0] * s->iwidth[0],    \
                                 parts[k].x[1] * s->iwidth[1],             \
                                 parts[k].x[2] * s->iwidth[2]);            \
      dest[k] = s->cells_top[cid].nodeID;                                  \
      size_t ind = mydata->nodeID * mydata->nr_nodes + dest[k];            \
      lcounts[ind] += 1;                                                   \
    }                                                                      \
    for (int k = 0; k < (mydata->nr_nodes * mydata->nr_nodes); k++)        \
      atomic_add(&mydata->counts[k], lcounts[k]);                          \
    free(lcounts);                                                         \
  }
333

334
335
/**
 * @brief Accumulate the counts of particles per cell.
336
 * Threadpool helper for accumulating the counts of particles per cell.
337
 *
338
 * part version.
339
 */
Peter W. Draper's avatar
Peter W. Draper committed
340
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(part);
341

342
343
/**
 * @brief Accumulate the counts of star particles per cell.
344
 * Threadpool helper for accumulating the counts of particles per cell.
345
 *
346
 * spart version.
347
 */
Peter W. Draper's avatar
Peter W. Draper committed
348
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(spart);
349

350
351
/**
 * @brief Accumulate the counts of gravity particles per cell.
352
 * Threadpool helper for accumulating the counts of particles per cell.
353
 *
354
 * gpart version.
355
 */
Peter W. Draper's avatar
Peter W. Draper committed
356
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(gpart);
357

358
#endif /* redist_mapper_data */
359

360
#ifdef WITH_MPI /* savelink_mapper_data */
361
362

/* Support for saving the linkage between gparts and parts/sparts. */
363
struct savelink_mapper_data {
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
  int nr_nodes;
  int *counts;
  void *parts;
  int nodeID;
};

/**
 * @brief Save the offset of each gravity partner of a part or spart.
 *
 * The offset is from the start of the sorted particles to be sent to a node.
 * This is possible as parts without gravity partners have a positive id.
 * These offsets are used to restore the pointers on the receiving node.
 *
 * CHECKS should be eliminated as dead code when optimizing.
 */
Peter W. Draper's avatar
Peter W. Draper committed
379
#define ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(TYPE, CHECKS)                      \
Peter W. Draper's avatar
Peter W. Draper committed
380
381
382
  engine_redistribute_savelink_mapper_##TYPE(void *map_data, int num_elements, \
                                             void *extra_data) {               \
    int *nodes = (int *)map_data;                                              \
383
384
    struct savelink_mapper_data *mydata =                                      \
        (struct savelink_mapper_data *)extra_data;                             \
Peter W. Draper's avatar
Peter W. Draper committed
385
386
387
388
389
390
391
    int nodeID = mydata->nodeID;                                               \
    int nr_nodes = mydata->nr_nodes;                                           \
    int *counts = mydata->counts;                                              \
    struct TYPE *parts = (struct TYPE *)mydata->parts;                         \
                                                                               \
    for (int j = 0; j < num_elements; j++) {                                   \
      int node = nodes[j];                                                     \
392
      int count = 0;                                                           \
Peter W. Draper's avatar
Peter W. Draper committed
393
394
395
      size_t offset = 0;                                                       \
      for (int i = 0; i < node; i++) offset += counts[nodeID * nr_nodes + i];  \
                                                                               \
396
      for (int k = 0; k < counts[nodeID * nr_nodes + node]; k++) {             \
Peter W. Draper's avatar
Peter W. Draper committed
397
398
        if (parts[k + offset].gpart != NULL) {                                 \
          if (CHECKS)                                                          \
399
            if (parts[k + offset].gpart->id_or_neg_offset > 0)                 \
Peter W. Draper's avatar
Peter W. Draper committed
400
401
402
403
404
405
              error("Trying to link a partnerless " #TYPE "!");                \
          parts[k + offset].gpart->id_or_neg_offset = -count;                  \
          count++;                                                             \
        }                                                                      \
      }                                                                        \
    }                                                                          \
406
407
408
409
410
411
412
  }

/**
 * @brief Save position of part-gpart links.
 * Threadpool helper for accumulating the counts of particles per cell.
 */
#ifdef SWIFT_DEBUG_CHECKS
Peter W. Draper's avatar
Peter W. Draper committed
413
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 1);
414
#else
Peter W. Draper's avatar
Peter W. Draper committed
415
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 0);
416
417
418
419
420
421
422
#endif

/**
 * @brief Save position of spart-gpart links.
 * Threadpool helper for accumulating the counts of particles per cell.
 */
#ifdef SWIFT_DEBUG_CHECKS
Peter W. Draper's avatar
Peter W. Draper committed
423
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 1);
424
#else
Peter W. Draper's avatar
Peter W. Draper committed
425
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 0);
426
427
#endif

428
#endif /* savelink_mapper_data */
429

430
#ifdef WITH_MPI /* relink_mapper_data */
431
432

/* Support for relinking parts, gparts and sparts after moving between nodes. */
433
struct relink_mapper_data {
434
435
436
437
  int nodeID;
  int nr_nodes;
  int *counts;
  int *s_counts;
438
439
440
441
442
443
444
445
446
  int *g_counts;
  struct space *s;
};

/**
 * @brief Restore the part/gpart and spart/gpart links for a list of nodes.
 *
 * @param map_data address of nodes to process.
 * @param num_elements the number nodes to process.
447
448
 * @param extra_data additional data defining the context (a
 * relink_mapper_data).
449
450
451
452
453
 */
static void engine_redistribute_relink_mapper(void *map_data, int num_elements,
                                              void *extra_data) {

  int *nodes = (int *)map_data;
454
  struct relink_mapper_data *mydata = (struct relink_mapper_data *)extra_data;
455
456
457
458

  int nodeID = mydata->nodeID;
  int nr_nodes = mydata->nr_nodes;
  int *counts = mydata->counts;
459
  int *g_counts = mydata->g_counts;
460
461
  int *s_counts = mydata->s_counts;
  struct space *s = mydata->s;
462
463
464
465

  for (int i = 0; i < num_elements; i++) {

    int node = nodes[i];
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480

    /* Get offsets to correct parts of the counts arrays for this node. */
    size_t offset_parts = 0;
    size_t offset_gparts = 0;
    size_t offset_sparts = 0;
    for (int n = 0; n < node; n++) {
      int ind_recv = n * nr_nodes + nodeID;
      offset_parts += counts[ind_recv];
      offset_gparts += g_counts[ind_recv];
      offset_sparts += s_counts[ind_recv];
    }

    /* Number of gparts sent from this node. */
    int ind_recv = node * nr_nodes + nodeID;
    const size_t count_gparts = g_counts[ind_recv];
481
482

    /* Loop over the gparts received from this node */
483
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; k++) {
484
485
486
487
488

      /* Does this gpart have a gas partner ? */
      if (s->gparts[k].type == swift_type_gas) {

        const ptrdiff_t partner_index =
489
            offset_parts - s->gparts[k].id_or_neg_offset;
490
491
492
493
494
495
496

        /* Re-link */
        s->gparts[k].id_or_neg_offset = -partner_index;
        s->parts[partner_index].gpart = &s->gparts[k];
      }

      /* Does this gpart have a star partner ? */
497
      else if (s->gparts[k].type == swift_type_stars) {
498
499

        const ptrdiff_t partner_index =
500
            offset_sparts - s->gparts[k].id_or_neg_offset;
501
502
503
504
505
506
507
508
509

        /* Re-link */
        s->gparts[k].id_or_neg_offset = -partner_index;
        s->sparts[partner_index].gpart = &s->gparts[k];
      }
    }
  }
}

510
#endif /* relink_mapper_data */
511

512
/**
513
 * @brief Redistribute the particles amongst the nodes according
514
515
 *      to their cell's node IDs.
 *
516
517
518
519
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
520
521
522
523
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
524
525
 *
 *
526
527
 * @param e The #engine.
 */
528
void engine_redistribute(struct engine *e) {
529

530
#ifdef WITH_MPI
531

532
533
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
534
  struct space *s = e->s;
535
  struct cell *cells = s->cells_top;
536
  const int nr_cells = s->nr_cells;
537
  struct xpart *xparts = s->xparts;
538
539
  struct part *parts = s->parts;
  struct gpart *gparts = s->gparts;
540
  struct spart *sparts = s->sparts;
541
  ticks tic = getticks();
542

543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
  size_t nr_parts = s->nr_parts;
  size_t nr_gparts = s->nr_gparts;
  size_t nr_sparts = s->nr_sparts;

  /* Start by moving inhibited particles to the end of the arrays */
  for (size_t k = 0; k < nr_parts; /* void */) {
    if (parts[k].time_bin == time_bin_inhibited) {
      nr_parts -= 1;

      /* Swap the particle */
      memswap(&parts[k], &parts[nr_parts], sizeof(struct part));

      /* Swap the xpart */
      memswap(&xparts[k], &xparts[nr_parts], sizeof(struct xpart));

      /* Swap the link with the gpart */
      if (parts[k].gpart != NULL) {
        parts[k].gpart->id_or_neg_offset = -k;
      }
      if (parts[nr_parts].gpart != NULL) {
        parts[nr_parts].gpart->id_or_neg_offset = -nr_parts;
      }
    } else {
      k++;
    }
  }

  /* Now move inhibited star particles to the end of the arrays */
  for (size_t k = 0; k < nr_sparts; /* void */) {
    if (sparts[k].time_bin == time_bin_inhibited) {
      nr_sparts -= 1;

      /* Swap the particle */
      memswap(&s->sparts[k], &s->sparts[nr_sparts], sizeof(struct spart));

      /* Swap the link with the gpart */
      if (s->sparts[k].gpart != NULL) {
        s->sparts[k].gpart->id_or_neg_offset = -k;
      }
      if (s->sparts[nr_sparts].gpart != NULL) {
        s->sparts[nr_sparts].gpart->id_or_neg_offset = -nr_sparts;
      }
    } else {
      k++;
    }
  }

  /* Finally do the same with the gravity particles */
  for (size_t k = 0; k < nr_gparts; /* void */) {
    if (gparts[k].time_bin == time_bin_inhibited) {
      nr_gparts -= 1;

      /* Swap the particle */
      memswap(&s->gparts[k], &s->gparts[nr_gparts], sizeof(struct gpart));

      /* Swap the link with part/spart */
      if (s->gparts[k].type == swift_type_gas) {
        s->parts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
      } else if (s->gparts[k].type == swift_type_stars) {
        s->sparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
      }
      if (s->gparts[nr_gparts].type == swift_type_gas) {
        s->parts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
            &s->gparts[nr_gparts];
      } else if (s->gparts[nr_gparts].type == swift_type_stars) {
        s->sparts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
            &s->gparts[nr_gparts];
      }
    } else {
      k++;
    }
  }

  /* Now we are ready to deal with real particles and can start the exchange. */

618
  /* Allocate temporary arrays to store the counts of particles to be sent
619
620
   * and the destination of each particle */
  int *counts;
621
  if ((counts = (int *)calloc(sizeof(int), nr_nodes * nr_nodes)) == NULL)
622
    error("Failed to allocate counts temporary buffer.");
623

624
  int *dest;
625
  if ((dest = (int *)malloc(sizeof(int) * nr_parts)) == NULL)
626
627
    error("Failed to allocate dest temporary buffer.");

628
629
630
631
632
  /* Simple index of node IDs, used for mappers over nodes. */
  int *nodes = NULL;
  if ((nodes = (int *)malloc(sizeof(int) * nr_nodes)) == NULL)
    error("Failed to allocate nodes temporary buffer.");
  for (int k = 0; k < nr_nodes; k++) nodes[k] = k;
633

634
  /* Get destination of each particle */
635
  struct redist_mapper_data redist_data;
636
637
638
  redist_data.s = s;
  redist_data.nodeID = nodeID;
  redist_data.nr_nodes = nr_nodes;
639

640
641
642
  redist_data.counts = counts;
  redist_data.dest = dest;
  redist_data.base = (void *)parts;
643

644
  threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_part, parts,
645
                 nr_parts, sizeof(struct part), 0, &redist_data);
646
647

  /* Sort the particles according to their cell index. */
648
  if (nr_parts > 0)
649
650
    space_parts_sort(s->parts, s->xparts, dest, &counts[nodeID * nr_nodes],
                     nr_nodes, 0);
651

652
653
#ifdef SWIFT_DEBUG_CHECKS
  /* Verify that the part have been sorted correctly. */
654
  for (size_t k = 0; k < nr_parts; k++) {
655
656
657
    const struct part *p = &s->parts[k];

    /* New cell index */
658
    const int new_cid =
659
660
661
662
        cell_getid(s->cdim, p->x[0] * s->iwidth[0], p->x[1] * s->iwidth[1],
                   p->x[2] * s->iwidth[2]);

    /* New cell of this part */
663
664
    const struct cell *c = &s->cells_top[new_cid];
    const int new_node = c->nodeID;
665

666
667
    if (dest[k] != new_node)
      error("part's new node index not matching sorted index.");
668
669
670
671
672

    if (p->x[0] < c->loc[0] || p->x[0] > c->loc[0] + c->width[0] ||
        p->x[1] < c->loc[1] || p->x[1] > c->loc[1] + c->width[1] ||
        p->x[2] < c->loc[2] || p->x[2] > c->loc[2] + c->width[2])
      error("part not sorted into the right top-level cell!");
673
674
675
  }
#endif

676
677
  /* We will need to re-link the gpart partners of parts, so save their
   * relative positions in the sent lists. */
678
  if (nr_parts > 0 && nr_gparts > 0) {
679

680
    struct savelink_mapper_data savelink_data;
681
682
683
684
685
686
    savelink_data.nr_nodes = nr_nodes;
    savelink_data.counts = counts;
    savelink_data.parts = (void *)parts;
    savelink_data.nodeID = nodeID;
    threadpool_map(&e->threadpool, engine_redistribute_savelink_mapper_part,
                   nodes, nr_nodes, sizeof(int), 0, &savelink_data);
687
  }
688
  free(dest);
689

690
  /* Get destination of each s-particle */
691
  int *s_counts;
692
  if ((s_counts = (int *)calloc(sizeof(int), nr_nodes * nr_nodes)) == NULL)
693
694
695
    error("Failed to allocate s_counts temporary buffer.");

  int *s_dest;
696
  if ((s_dest = (int *)malloc(sizeof(int) * nr_sparts)) == NULL)
697
698
    error("Failed to allocate s_dest temporary buffer.");

699
700
701
  redist_data.counts = s_counts;
  redist_data.dest = s_dest;
  redist_data.base = (void *)sparts;
702

703
  threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_spart, sparts,
704
                 nr_sparts, sizeof(struct spart), 0, &redist_data);
705
706

  /* Sort the particles according to their cell index. */
707
  if (nr_sparts > 0)
708
709
    space_sparts_sort(s->sparts, s_dest, &s_counts[nodeID * nr_nodes], nr_nodes,
                      0);
710

711
712
#ifdef SWIFT_DEBUG_CHECKS
  /* Verify that the spart have been sorted correctly. */
713
  for (size_t k = 0; k < nr_sparts; k++) {
714
715
716
    const struct spart *sp = &s->sparts[k];

    /* New cell index */
717
    const int new_cid =
718
719
720
721
        cell_getid(s->cdim, sp->x[0] * s->iwidth[0], sp->x[1] * s->iwidth[1],
                   sp->x[2] * s->iwidth[2]);

    /* New cell of this spart */
722
723
    const struct cell *c = &s->cells_top[new_cid];
    const int new_node = c->nodeID;
724

725
726
    if (s_dest[k] != new_node)
      error("spart's new node index not matching sorted index.");
727
728
729
730
731

    if (sp->x[0] < c->loc[0] || sp->x[0] > c->loc[0] + c->width[0] ||
        sp->x[1] < c->loc[1] || sp->x[1] > c->loc[1] + c->width[1] ||
        sp->x[2] < c->loc[2] || sp->x[2] > c->loc[2] + c->width[2])
      error("spart not sorted into the right top-level cell!");
732
733
734
  }
#endif

735
  /* We need to re-link the gpart partners of sparts. */
736
  if (nr_sparts > 0) {
737

738
    struct savelink_mapper_data savelink_data;
739
740
741
742
743
744
    savelink_data.nr_nodes = nr_nodes;
    savelink_data.counts = s_counts;
    savelink_data.parts = (void *)sparts;
    savelink_data.nodeID = nodeID;
    threadpool_map(&e->threadpool, engine_redistribute_savelink_mapper_spart,
                   nodes, nr_nodes, sizeof(int), 0, &savelink_data);
745
  }
746
747
  free(s_dest);

748
  /* Get destination of each g-particle */
749
  int *g_counts;
750
  if ((g_counts = (int *)calloc(sizeof(int), nr_nodes * nr_nodes)) == NULL)
751
752
753
    error("Failed to allocate g_gcount temporary buffer.");

  int *g_dest;
754
  if ((g_dest = (int *)malloc(sizeof(int) * nr_gparts)) == NULL)
755
756
    error("Failed to allocate g_dest temporary buffer.");

757
758
759
  redist_data.counts = g_counts;
  redist_data.dest = g_dest;
  redist_data.base = (void *)gparts;
760

761
  threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_gpart, gparts,
762
                 nr_gparts, sizeof(struct gpart), 0, &redist_data);
763
764

  /* Sort the gparticles according to their cell index. */
765
  if (nr_gparts > 0)
766
767
    space_gparts_sort(s->gparts, s->parts, s->sparts, g_dest,
                      &g_counts[nodeID * nr_nodes], nr_nodes);
768

769
770
#ifdef SWIFT_DEBUG_CHECKS
  /* Verify that the gpart have been sorted correctly. */
771
  for (size_t k = 0; k < nr_gparts; k++) {
772
773
774
    const struct gpart *gp = &s->gparts[k];

    /* New cell index */
775
    const int new_cid =
776
777
778
779
        cell_getid(s->cdim, gp->x[0] * s->iwidth[0], gp->x[1] * s->iwidth[1],
                   gp->x[2] * s->iwidth[2]);

    /* New cell of this gpart */
780
781
    const struct cell *c = &s->cells_top[new_cid];
    const int new_node = c->nodeID;
782

783
    if (g_dest[k] != new_node)
784
785
      error("gpart's new node index not matching sorted index (%d != %d).",
            g_dest[k], new_node);
786
787
788
789
790

    if (gp->x[0] < c->loc[0] || gp->x[0] > c->loc[0] + c->width[0] ||
        gp->x[1] < c->loc[1] || gp->x[1] > c->loc[1] + c->width[1] ||
        gp->x[2] < c->loc[2] || gp->x[2] > c->loc[2] + c->width[2])
      error("gpart not sorted into the right top-level cell!");
791
792
793
  }
#endif

794
795
  free(g_dest);

796
797
798
799
800
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

801
  /* Get all the s_counts from all the nodes. */
802
803
804
805
806
807
808
809
810
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, s_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce sparticle transfer counts.");

Peter W. Draper's avatar
Peter W. Draper committed
811
  /* Report how many particles will be moved. */
812
813
  if (e->verbose) {
    if (e->nodeID == 0) {
814
815
      size_t total = 0, g_total = 0, s_total = 0;
      size_t unmoved = 0, g_unmoved = 0, s_unmoved = 0;
816
      for (int p = 0, r = 0; p < nr_nodes; p++) {
817
        for (int n = 0; n < nr_nodes; n++) {
818
          total += counts[r];
819
820
          g_total += g_counts[r];
          s_total += s_counts[r];
821
          if (p == n) {
822
823
824
825
            unmoved += counts[r];
            g_unmoved += g_counts[r];
            s_unmoved += s_counts[r];
          }
826
827
828
          r++;
        }
      }
Matthieu Schaller's avatar
Matthieu Schaller committed
829
830
831
832
833
834
835
836
837
838
839
      if (total > 0)
        message("%ld of %ld (%.2f%%) of particles moved", total - unmoved,
                total, 100.0 * (double)(total - unmoved) / (double)total);
      if (g_total > 0)
        message("%ld of %ld (%.2f%%) of g-particles moved", g_total - g_unmoved,
                g_total,
                100.0 * (double)(g_total - g_unmoved) / (double)g_total);
      if (s_total > 0)
        message("%ld of %ld (%.2f%%) of s-particles moved", s_total - s_unmoved,
                s_total,
                100.0 * (double)(s_total - s_unmoved) / (double)s_total);
840
    }
841
842
  }

Peter W. Draper's avatar
Peter W. Draper committed
843
844
845
  /* Now each node knows how many parts, sparts and gparts will be transferred
   * to every other node.
   * Get the new numbers of particles for this node. */
846
  size_t nr_parts_new = 0, nr_gparts_new = 0, nr_sparts_new = 0;
847
  for (int k = 0; k < nr_nodes; k++)
848
    nr_parts_new += counts[k * nr_nodes + nodeID];
849
  for (int k = 0; k < nr_nodes; k++)
850
851
852
    nr_gparts_new += g_counts[k * nr_nodes + nodeID];
  for (int k = 0; k < nr_nodes; k++)
    nr_sparts_new += s_counts[k * nr_nodes + nodeID];
853

Peter W. Draper's avatar
Peter W. Draper committed
854
855
856
857
  /* Now exchange the particles, type by type to keep the memory required
   * under control. */

  /* SPH particles. */
858
859
860
  void *new_parts = engine_do_redistribute(
      counts, (char *)s->parts, nr_parts_new, sizeof(struct part), part_align,
      part_mpi_type, nr_nodes, nodeID);
861
  free(s->parts);
862
  s->parts = (struct part *)new_parts;
863
864
  s->nr_parts = nr_parts_new;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts_new;
865

Peter W. Draper's avatar
Peter W. Draper committed
866
  /* Extra SPH particle properties. */
867
  new_parts = engine_do_redistribute(counts, (char *)s->xparts, nr_parts_new,
868
869
870
                                     sizeof(struct xpart), xpart_align,
                                     xpart_mpi_type, nr_nodes, nodeID);
  free(s->xparts);
871
  s->xparts = (struct xpart *)new_parts;
872

Peter W. Draper's avatar
Peter W. Draper committed
873
  /* Gravity particles. */
874
  new_parts = engine_do_redistribute(g_counts, (char *)s->gparts, nr_gparts_new,
875
876
877
                                     sizeof(struct gpart), gpart_align,
                                     gpart_mpi_type, nr_nodes, nodeID);
  free(s->gparts);
878
  s->gparts = (struct gpart *)new_parts;
879
880
  s->nr_gparts = nr_gparts_new;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts_new;
881

Peter W. Draper's avatar
Peter W. Draper committed
882
  /* Star particles. */
883
  new_parts = engine_do_redistribute(s_counts, (char *)s->sparts, nr_sparts_new,
884
885
886
                                     sizeof(struct spart), spart_align,
                                     spart_mpi_type, nr_nodes, nodeID);
  free(s->sparts);
887
  s->sparts = (struct spart *)new_parts;
888
889
  s->nr_sparts = nr_sparts_new;
  s->size_sparts = engine_redistribute_alloc_margin * nr_sparts_new;
890

891
892
893
  /* All particles have now arrived. Time for some final operations on the
     stuff we just received */

894
895
896
  /* Restore the part<->gpart and spart<->gpart links.
   * Generate indices and counts for threadpool tasks. Note we process a node
   * at a time. */
897
  struct relink_mapper_data relink_data;
898
  relink_data.s = s;
899
900
901
902
903
  relink_data.counts = counts;
  relink_data.g_counts = g_counts;
  relink_data.s_counts = s_counts;
  relink_data.nodeID = nodeID;
  relink_data.nr_nodes = nr_nodes;
904

905
906
  threadpool_map(&e->threadpool, engine_redistribute_relink_mapper, nodes,
                 nr_nodes, sizeof(int), 1, &relink_data);
907
  free(nodes);
908

909
  /* Clean up the counts now we are done. */
910
911
912
913
  free(counts);
  free(g_counts);
  free(s_counts);

914
#ifdef SWIFT_DEBUG_CHECKS
915
  /* Verify that all parts are in the right place. */
916
  for (size_t k = 0; k < nr_parts_new; k++) {
917
918
919
    const int cid = cell_getid(s->cdim, s->parts[k].x[0] * s->iwidth[0],
                               s->parts[k].x[1] * s->iwidth[1],
                               s->parts[k].x[2] * s->iwidth[2]);
920
    if (cells[cid].nodeID != nodeID)
921
      error("Received particle (%zu) that does not belong here (nodeID=%i).", k,
922
923
            cells[cid].nodeID);
  }
924
  for (size_t k = 0; k < nr_gparts_new; k++) {
925
926
927
    const int cid = cell_getid(s->cdim, s->gparts[k].x[0] * s->iwidth[0],
                               s->gparts[k].x[1] * s->iwidth[1],
                               s->gparts[k].x[2] * s->iwidth[2]);
928
929
930
931
    if (cells[cid].nodeID != nodeID)
      error("Received g-particle (%zu) that does not belong here (nodeID=%i).",
            k, cells[cid].nodeID);
  }
932
  for (size_t k = 0; k < nr_sparts_new; k++) {
933
934
935
    const int cid = cell_getid(s->cdim, s->sparts[k].x[0] * s->iwidth[0],
                               s->sparts[k].x[1] * s->iwidth[1],
                               s->sparts[k].x[2] * s->iwidth[2]);
936
937
938
939
    if (cells[cid].nodeID != nodeID)
      error("Received s-particle (%zu) that does not belong here (nodeID=%i).",
            k, cells[cid].nodeID);
  }
940

941
  /* Verify that the links are correct */
942
943
  part_verify_links(s->parts, s->gparts, s->sparts, nr_parts_new, nr_gparts_new,
                    nr_sparts_new, e->verbose);
944
#endif
945

946
947
948
949
950
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
951
    message("node %i now has %zu parts, %zu sparts and %zu gparts in %i cells.",
952
            nodeID, nr_parts_new, nr_sparts_new, nr_gparts_new, my_cells);
953
954
  }

955
956
957
  /* Flag that a redistribute has taken place */
  e->step_props |= engine_step_prop_redistribute;

958
959
960
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
961
#else
962
  error("SWIFT was not compiled with MPI support.");
963
964
#endif
}
965

966
/**
967
 * @brief Repartition the cells amongst the nodes.