engine.c 96.8 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
48
#include "engine.h"
49
50

/* Local headers. */
51
#include "atomic.h"
52
#include "cell.h"
53
#include "clocks.h"
54
55
#include "cycle.h"
#include "debug.h"
56
#include "error.h"
57
#include "hydro.h"
58
#include "minmax.h"
59
#include "parallel_io.h"
60
#include "part.h"
61
#include "partition.h"
62
63
#include "proxy.h"
#include "runner.h"
64
65
#include "serial_io.h"
#include "single_io.h"
66
#include "timers.h"
67
#include "units.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
68

Pedro Gonnet's avatar
Pedro Gonnet committed
69
70
71
72
73
74
75
76
77
78
79
80
81
const char *engine_policy_names[13] = {"none",
                                       "rand",
                                       "steal",
                                       "keep",
                                       "block",
                                       "fix_dt",
                                       "cpu_tight",
                                       "mpi",
                                       "numa_affinity",
                                       "hydro",
                                       "self_gravity",
                                       "external_gravity",
                                       "cosmology_integration"};
Pedro Gonnet's avatar
Pedro Gonnet committed
82

83
84
85
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

86
87
88
89
90
#ifdef HAVE_SETAFFINITY
/** The initial affinity of the main thread (set by engin_pin()) */
static cpu_set_t entry_affinity;
#endif

91
92
93
94
95
96
97
98
99
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
 * @param l The #link.
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
100
struct link *engine_addlink(struct engine *e, struct link *l, struct task *t) {
101

102
103
104
105
106
  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
107
108
109
110
  res->next = l;
  res->t = t;
  return res;
}
111
112

/**
113
114
 * @brief Generate the gravity hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
Matthieu Schaller's avatar
Matthieu Schaller committed
115
116
 *
 * Tasks are only created here. The dependencies will be added later on.
117
118
119
120
121
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
122
123
void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c,
                                            struct cell *super) {
124
125

  struct scheduler *s = &e->sched;
126
127
128
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity;
129
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;
130

131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
  /* Is this the super-cell? */
  if (super == NULL && (c->grav != NULL || (c->gcount > 0 && !c->split))) {

    /* This is the super cell, i.e. the first with gravity tasks attached. */
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      /* Add the init task. */
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);

      /* Add the drift task. */
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);

      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
      } else {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
      }

      if (is_with_external_gravity)
        c->grav_external = scheduler_addtask(
            s, task_type_grav_external, task_subtype_none, 0, 0, c, NULL, 0);
    }
  }

  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_gravity_hierarchical_tasks(e, c->progeny[k], super);
}

/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
void engine_make_hydro_hierarchical_tasks(struct engine *e, struct cell *c,
                                          struct cell *super) {

  struct scheduler *s = &e->sched;
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;

  /* Is this the super-cell? */
  if (super == NULL && (c->density != NULL || (c->count > 0 && !c->split))) {
195

196
    /* This is the super cell, i.e. the first with density tasks attached. */
197
198
199
200
201
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
202
      /* Add the init task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
203
204
205
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
206

Matthieu Schaller's avatar
Matthieu Schaller committed
207
      /* Add the drift task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
208
209
210
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);
211

212
213
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
214
215
216
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
217
      } else {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
218
219
220
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
221
      }
Tom Theuns's avatar
Tom Theuns committed
222

223
224
225
      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
226
    }
227
  }
228

229
230
231
232
233
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
234
    for (int k = 0; k < 8; k++)
235
      if (c->progeny[k] != NULL)
236
        engine_make_hydro_hierarchical_tasks(e, c->progeny[k], super);
237
}
238

239
/**
240
 * @brief Redistribute the particles amongst the nodes according
241
242
 *      to their cell's node IDs.
 *
243
244
245
246
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
247
248
249
250
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
251
252
 *
 *
253
254
 * @param e The #engine.
 */
255
void engine_redistribute(struct engine *e) {
256

257
#ifdef WITH_MPI
258

259
260
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
261
262
  struct space *s = e->s;
  struct cell *cells = s->cells;
263
  const int nr_cells = s->nr_cells;
264
265
266
267
268
269
  const int *cdim = s->cdim;
  const double ih[3] = {s->ih[0], s->ih[1], s->ih[2]};
  const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
  struct part *parts = s->parts;
  struct xpart *xparts = s->xparts;
  struct gpart *gparts = s->gparts;
270
  ticks tic = getticks();
271

272
273
274
  /* Allocate temporary arrays to store the counts of particles to be sent
     and the destination of each particle */
  int *counts, *g_counts;
275
  if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
276
    error("Failed to allocate count temporary buffer.");
277
  if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
278
    error("Failed to allocate gcount temporary buffer.");
279
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
280
281
  bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);

282
  /* Allocate the destination index arrays. */
283
284
  int *dest, *g_dest;
  if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
285
    error("Failed to allocate dest temporary buffer.");
286
  if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL)
287
288
    error("Failed to allocate g_dest temporary buffer.");

289
  /* Get destination of each particle */
290
  for (size_t k = 0; k < s->nr_parts; k++) {
291
292

    /* Periodic boundary conditions */
293
    for (int j = 0; j < 3; j++) {
294
295
296
297
298
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
299
300
    const int cid = cell_getid(cdim, parts[k].x[0] * ih[0],
                               parts[k].x[1] * ih[1], parts[k].x[2] * ih[2]);
301
302
303
304
305
306
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            parts[k].x[0], parts[k].x[1], parts[k].x[2]);
#endif

307
    dest[k] = cells[cid].nodeID;
308
309

    /* The counts array is indexed as count[from * nr_nodes + to]. */
310
311
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }
312
313

  /* Sort the particles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
314
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
315

316
  /* We need to re-link the gpart partners of parts. */
317
318
319
320
321
322
  if (s->nr_parts > 0) {
    int current_dest = dest[0];
    size_t count_this_dest = 0;
    for (size_t k = 0; k < s->nr_parts; ++k) {
      if (s->parts[k].gpart != NULL) {

323
324
325
326
327
        /* As the addresses will be invalidated by the communications, we will
         * instead store the absolute index from the start of the sub-array of
         * particles to be sent to a given node.
         * Recall that gparts without partners have a negative id.
         * We will restore the pointers on the receiving node later on. */
328
329
330
331
        if (dest[k] != current_dest) {
          current_dest = dest[k];
          count_this_dest = 0;
        }
332

333
334
335
336
#ifdef SWIFT_DEBUG_CHECKS
        if (s->parts[k].gpart->id < 0)
          error("Trying to link a partnerless gpart !");
#endif
337

338
339
        s->parts[k].gpart->id = count_this_dest;
        count_this_dest++;
340
341
342
343
344
      }
    }
  }

  /* Get destination of each g-particle */
345
  for (size_t k = 0; k < s->nr_gparts; k++) {
346
347

    /* Periodic boundary conditions */
348
    for (int j = 0; j < 3; j++) {
349
350
351
352
      if (gparts[k].x[j] < 0.0)
        gparts[k].x[j] += dim[j];
      else if (gparts[k].x[j] >= dim[j])
        gparts[k].x[j] -= dim[j];
353
    }
354
355
    const int cid = cell_getid(cdim, gparts[k].x[0] * ih[0],
                               gparts[k].x[1] * ih[1], gparts[k].x[2] * ih[2]);
356
357
358
359
360
361
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            gparts[k].x[0], gparts[k].x[1], gparts[k].x[2]);
#endif

362
    g_dest[k] = cells[cid].nodeID;
363
364

    /* The counts array is indexed as count[from * nr_nodes + to]. */
365
    g_counts[nodeID * nr_nodes + g_dest[k]] += 1;
366
  }
367
368

  /* Sort the gparticles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
369
  space_gparts_sort(s, g_dest, s->nr_gparts, 0, nr_nodes - 1, e->verbose);
370

371
372
373
374
375
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

376
377
378
379
380
381
  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Each node knows how many parts and gparts will be transferred to every
382
     other node. We can start preparing to receive data */
383
384
385

  /* Get the new number of parts and gparts for this node */
  size_t nr_parts = 0, nr_gparts = 0;
386
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
387
388
389
390
  for (int k = 0; k < nr_nodes; k++)
    nr_gparts += g_counts[k * nr_nodes + nodeID];

  /* Allocate the new arrays with some extra margin */
391
  struct part *parts_new = NULL;
392
  struct xpart *xparts_new = NULL;
393
  struct gpart *gparts_new = NULL;
394
  if (posix_memalign((void **)&parts_new, part_align,
395
396
                     sizeof(struct part) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
397
    error("Failed to allocate new part data.");
398
399
400
401
402
403
404
405
406
407
  if (posix_memalign((void **)&xparts_new, xpart_align,
                     sizeof(struct xpart) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new xpart data.");
  if (posix_memalign((void **)&gparts_new, gpart_align,
                     sizeof(struct gpart) * nr_gparts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new gpart data.");

  /* Prepare MPI requests for the asynchronous communications */
408
  MPI_Request *reqs;
409
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
410
411
      NULL)
    error("Failed to allocate MPI request list.");
412
413
414
415
416
417
418
419
420
421
422
423
  for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

  /* Emit the sends and recvs for the particle and gparticle data. */
  size_t offset_send = 0, offset_recv = 0;
  size_t g_offset_send = 0, g_offset_recv = 0;
  for (int k = 0; k < nr_nodes; k++) {

    /* Indices in the count arrays of the node of interest */
    const int ind_send = nodeID * nr_nodes + k;
    const int ind_recv = k * nr_nodes + nodeID;

    /* Are we sending any part/xpart ? */
424
    if (counts[ind_send] > 0) {
425

426
      /* message("Sending %d part to node %d", counts[ind_send], k); */
Matthieu Schaller's avatar
Matthieu Schaller committed
427

428
      /* If the send is to the same node, just copy */
429
430
431
432
433
434
435
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];
436
437

        /* Else, emit some communications */
438
      } else {
439
440
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type,
                      k, 3 * ind_send + 0, MPI_COMM_WORLD,
441
442
                      &reqs[6 * k]) != MPI_SUCCESS)
          error("Failed to isend parts to node %i.", k);
443
444
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type,
                      k, 3 * ind_send + 1, MPI_COMM_WORLD,
445
446
                      &reqs[6 * k + 1]) != MPI_SUCCESS)
          error("Failed to isend xparts to node %i.", k);
447
448
        offset_send += counts[ind_send];
      }
449
    }
450
451
452
453

    /* Are we sending any gpart ? */
    if (g_counts[ind_send] > 0) {

454
      /* message("Sending %d gpart to node %d", g_counts[ind_send], k); */
455

456
457
      /* If the send is to the same node, just copy */
      if (k == nodeID) {
458
        memcpy(&gparts_new[g_offset_recv], &s->gparts[g_offset_send],
459
460
461
462
463
464
465
               sizeof(struct gpart) * g_counts[ind_recv]);
        g_offset_send += g_counts[ind_send];
        g_offset_recv += g_counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
466
                      gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
467
468
                      &reqs[6 * k + 2]) != MPI_SUCCESS)
          error("Failed to isend gparts to node %i.", k);
469
        g_offset_send += g_counts[ind_send];
470
471
472
473
474
475
      }
    }

    /* Now emit the corresponding Irecv() */

    /* Are we receiving any part/xpart from this node ? */
476
    if (k != nodeID && counts[ind_recv] > 0) {
477
478
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], part_mpi_type, k,
                    3 * ind_recv + 0, MPI_COMM_WORLD,
479
480
                    &reqs[6 * k + 3]) != MPI_SUCCESS)
        error("Failed to emit irecv of parts from node %i.", k);
481
482
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], xpart_mpi_type,
                    k, 3 * ind_recv + 1, MPI_COMM_WORLD,
483
484
                    &reqs[6 * k + 4]) != MPI_SUCCESS)
        error("Failed to emit irecv of xparts from node %i.", k);
485
      offset_recv += counts[ind_recv];
486
    }
487
488
489
490

    /* Are we receiving any gpart from this node ? */
    if (k != nodeID && g_counts[ind_recv] > 0) {
      if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
491
                    gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
492
493
494
495
                    &reqs[6 * k + 5]) != MPI_SUCCESS)
        error("Failed to emit irecv of gparts from node %i.", k);
      g_offset_recv += g_counts[ind_recv];
    }
496
  }
497

498
  /* Wait for all the sends and recvs to tumble in. */
499
  MPI_Status stats[6 * nr_nodes];
500
  int res;
501
502
  if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
    for (int k = 0; k < 6 * nr_nodes; k++) {
503
504
505
      char buff[MPI_MAX_ERROR_STRING];
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
506
    }
507
508
    error("Failed during waitall for part data.");
  }
509

510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
  /* We now need to restore the part<->gpart links */
  size_t offset_parts = 0, offset_gparts = 0;
  for (int node = 0; node < nr_nodes; ++node) {

    const int ind_recv = node * nr_nodes + nodeID;
    const size_t count_parts = counts[ind_recv];
    const size_t count_gparts = g_counts[ind_recv];

    /* Loop over the gparts received from that node */
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) {

      /* Does this gpart have a partner ? */
      if (gparts_new[k].id >= 0) {

        const size_t partner_index = offset_parts + gparts_new[k].id;

        /* Re-link */
        gparts_new[k].part = &parts_new[partner_index];
        gparts_new[k].part->gpart = &gparts_new[k];
      }
    }

    offset_parts += count_parts;
    offset_gparts += count_gparts;
  }

536
#ifdef SWIFT_DEBUG_CHECKS
537
  /* Verify that all parts are in the right place. */
538
539
540
541
542
543
544
  for (int k = 0; k < nr_parts; k++) {
    int cid = cell_getid(cdim, parts_new[k].x[0] * ih[0],
                         parts_new[k].x[1] * ih[1], parts_new[k].x[2] * ih[2]);
    if (cells[cid].nodeID != nodeID)
      error("Received particle (%i) that does not belong here (nodeID=%i).", k,
            cells[cid].nodeID);
  }
545

546
  /* Verify that the links are correct */
547
  for (size_t k = 0; k < nr_gparts; ++k) {
548

549
    if (gparts_new[k].id > 0) {
550

551
552
      if (gparts_new[k].part->gpart != &gparts_new[k])
        error("Linking problem !");
553

554
555
556
557
558
559
560
      if (gparts_new[k].x[0] != gparts_new[k].part->x[0] ||
          gparts_new[k].x[1] != gparts_new[k].part->x[1] ||
          gparts_new[k].x[2] != gparts_new[k].part->x[2])
        error("Linked particles are not at the same position !");
    }
  }
  for (size_t k = 0; k < nr_parts; ++k) {
Matthieu Schaller's avatar
Matthieu Schaller committed
561

562
    if (parts_new[k].gpart != NULL) {
563

564
565
566
567
      if (parts_new[k].gpart->part != &parts_new[k]) error("Linking problem !");
    }
  }
#endif
568
569
570
571

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
572
  free(gparts);
573
574
  s->parts = parts_new;
  s->xparts = xparts_new;
575
  s->gparts = gparts_new;
576
  s->nr_parts = nr_parts;
577
578
579
  s->nr_gparts = nr_gparts;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;
580

581
  /* Clean up the temporary stuff. */
582
583
584
585
  free(reqs);
  free(counts);
  free(dest);

586
587
588
589
590
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
591
592
    message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID,
            nr_parts, nr_gparts, my_cells);
593
594
  }

595
596
597
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
598
#else
599
  error("SWIFT was not compiled with MPI support.");
600
601
#endif
}
602

603
/**
604
 * @brief Repartition the cells amongst the nodes.
605
606
607
 *
 * @param e The #engine.
 */
608
void engine_repartition(struct engine *e) {
609
610
611

#if defined(WITH_MPI) && defined(HAVE_METIS)

612
613
  ticks tic = getticks();

614
  /* Clear the repartition flag. */
615
  enum repartition_type reparttype = e->forcerepart;
616
  e->forcerepart = REPART_NONE;
617

618
619
  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
620
  if (e->nr_nodes == 1) return;
621

622
623
624
  /* Do the repartitioning. */
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);
625
626
627
628

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
629
     the parts array, and emitting the sends and receives.
630
631
632
633
634
635
636
637
638
639
640
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

641
642
643
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
644
#else
645
  error("SWIFT was not compiled with MPI and METIS support.");
646
#endif
647
}
648

649
650
651
652
653
654
655
656
/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
 */
657
658
void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {
659

660
661
662
663
664
665
666
667
668
669
  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}
670

671
672
673
674
/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
675
676
 * @param ci The sending #cell.
 * @param cj The receiving #cell
677
 */
678
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj) {
679

680
#ifdef WITH_MPI
681
682
  struct link *l = NULL;
  struct scheduler *s = &e->sched;
683

684
685
686
687
688
  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
    if (l->t->ci->nodeID == cj->nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == cj->nodeID))
      break;
689

690
691
  /* If so, attach send tasks. */
  if (l != NULL) {
692

693
    /* Create the tasks. */
Pedro Gonnet's avatar
Pedro Gonnet committed
694
695
696
697
    struct task *t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
                                          2 * ci->tag, 0, ci, cj, 0);
    struct task *t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
                                           2 * ci->tag + 1, 0, ci, cj, 0);
698

699
700
    /* The send_rho task depends on the cell's ghost task. */
    scheduler_addunlock(s, ci->super->ghost, t_rho);
701

Matthieu Schaller's avatar
Matthieu Schaller committed
702
703
    /* The send_rho task should unlock the super-cell's kick task. */
    scheduler_addunlock(s, t_rho, ci->super->kick);
704

705
706
    /* The send_xv task should unlock the super-cell's ghost task. */
    scheduler_addunlock(s, t_xv, ci->super->ghost);
707

708
  }
709

710
711
  /* Recurse? */
  else if (ci->split)
712
    for (int k = 0; k < 8; k++)
713
      if (ci->progeny[k] != NULL) engine_addtasks_send(e, ci->progeny[k], cj);
Matthieu Schaller's avatar
Matthieu Schaller committed
714
715
716
717

#else
  error("SWIFT was not compiled with MPI support.");
#endif
718
}
719
720
721
722
723
724
725
726
727
728

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
 */

729
730
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
                          struct task *t_rho) {
731

732
#ifdef WITH_MPI
733
  struct scheduler *s = &e->sched;
734

735
736
  /* Do we need to construct a recv task? */
  if (t_xv == NULL && c->nr_density > 0) {
737

738
    /* Create the tasks. */
Pedro Gonnet's avatar
Pedro Gonnet committed
739
740
741
742
    t_xv = c->recv_xv = scheduler_addtask(s, task_type_recv, task_subtype_none,
                                          2 * c->tag, 0, c, NULL, 0);
    t_rho = c->recv_rho = scheduler_addtask(
        s, task_type_recv, task_subtype_none, 2 * c->tag + 1, 0, c, NULL, 0);
743
  }
744

745
746
747
748
749
750
751
752
753
754
755
  /* Add dependencies. */
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
  for (struct link *l = c->force; l != NULL; l = l->next)
    scheduler_addunlock(s, t_rho, l->t);
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);

  /* Recurse? */
  if (c->split)
756
    for (int k = 0; k < 8; k++)
757
758
      if (c->progeny[k] != NULL)
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho);
Matthieu Schaller's avatar
Matthieu Schaller committed
759
760
761
762

#else
  error("SWIFT was not compiled with MPI support.");
#endif
763
}
764
765
766
767
768
769

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
770
void engine_exchange_cells(struct engine *e) {
771
772
773

#ifdef WITH_MPI

774
775
  struct space *s = e->s;
  struct cell *cells = s->cells;
776
777
  const int nr_cells = s->nr_cells;
  const int nr_proxies = e->nr_proxies;
778
779
780
781
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;
782
  ticks tic = getticks();
783

784
  /* Run through the cells and get the size of the ones that will be sent */
785
786
787
  int count_out = 0;
  for (int k = 0; k < nr_cells; k++) {
    offset[k] = count_out;
788
    if (cells[k].sendto)
789
      count_out += (cells[k].pcell_size = cell_getsize(&cells[k]));
790
  }
791

792
  /* Allocate the pcells. */
793
  struct pcell *pcells;
794
795
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count_out)) ==
      NULL)
796
797
798
799
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
800
  for (int k = 0; k < nr_cells; k++)
801
802
803
804
805
806
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
807
  for (int k = 0; k < nr_proxies; k++) {
808
809
810
811
812
813
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
814
  for (int k = 0; k < nr_proxies; k++) {
815
    int pid = MPI_UNDEFINED;
816
817
818
819
820
821
822
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_cells_exch2(&e->proxies[pid]);
  }

823
  /* Wait for all the sends to have finished too. */
824
825
826
827
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Set the requests for the cells. */
828
  for (int k = 0; k < nr_proxies; k++) {
829
830
831
832
833
    reqs_in[k] = e->proxies[k].req_cells_in;
    reqs_out[k] = e->proxies[k].req_cells_out;
  }

  /* Wait for each pcell array to come in from the proxies. */
834
  for (int k = 0; k < nr_proxies; k++) {
835
    int pid = MPI_UNDEFINED;
836
837
838
839
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "cell data from proxy %i has arrived." , pid );
840
    for (int count = 0, j = 0; j < e->proxies[pid].nr_cells_in; j++)
841
842
843
844
      count += cell_unpack(&e->proxies[pid].pcells_in[count],
                           e->proxies[pid].cells_in[j], e->s);
  }

845
  /* Wait for all the sends to have finished too. */
846
847
848
849
850
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Count the number of particles we need to import and re-allocate
     the buffer if needed. */
851
  int count_parts_in = 0, count_gparts_in = 0;
852
  for (int k = 0; k < nr_proxies; k++)
853
854
855
856
857
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
      count_parts_in += e->proxies[k].cells_in[j]->count;
      count_gparts_in += e->proxies[k].cells_in[j]->gcount;
    }
  if (count_parts_in > s->size_parts_foreign) {
858
    if (s->parts_foreign != NULL) free(s->parts_foreign);
859
    s->size_parts_foreign = 1.1 * count_parts_in;
860
861
862
863
    if (posix_memalign((void **)&s->parts_foreign, part_align,
                       sizeof(struct part) * s->size_parts_foreign) != 0)
      error("Failed to allocate foreign part data.");
  }
864
865
866
867
868
869
870
  if (count_gparts_in > s->size_gparts_foreign) {
    if (s->gparts_foreign != NULL) free(s->gparts_foreign);
    s->size_gparts_foreign = 1.1 * count_gparts_in;
    if (posix_memalign((void **)&s->gparts_foreign, gpart_align,
                       sizeof(struct gpart) * s->size_gparts_foreign) != 0)
      error("Failed to allocate foreign gpart data.");
  }
871

872
  /* Unpack the cells and link to the particle data. */
873
  struct part *parts = s->parts_foreign;
874
  struct gpart *gparts = s->gparts_foreign;
875
876
  for (int k = 0; k < nr_proxies; k++) {
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
877
      cell_link_parts(e->proxies[k].cells_in[j], parts);
878
      cell_link_gparts(e->proxies[k].cells_in[j], gparts);
879
      parts = &parts[e->proxies[k].cells_in[j]->count];
880
      gparts = &gparts[e->proxies[k].cells_in[j]->gcount];
881
    }
882
883
  }
  s->nr_parts_foreign = parts - s->parts_foreign;
884
  s->nr_gparts_foreign = gparts - s->gparts_foreign;
885
886
887

  /* Free the pcell buffer. */
  free(pcells);
888

889
890
891
892
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());

893
894
895
896
#else
  error("SWIFT was not compiled with MPI support.");
#endif
}
897
898
899
900
901

/**
 * @brief Exchange straying parts with other nodes.
 *
 * @param e The #engine.
Pedro Gonnet's avatar
Pedro Gonnet committed
902
 * @param offset_parts The index in the parts array as of which the foreign
Pedro Gonnet's avatar
Pedro Gonnet committed
903
 *        parts reside.
904
 * @param ind_part The foreign #cell ID of each part.
905
 * @param Npart The number of stray parts, contains the number of parts received
906
 *        on return.
Pedro Gonnet's avatar
Pedro Gonnet committed
907
 * @param offset_gparts The index in the gparts array as of which the foreign
Pedro Gonnet's avatar
Pedro Gonnet committed
908
 *        parts reside.
909
 * @param ind_gpart The foreign #cell ID of each gpart.
Pedro Gonnet's avatar
Pedro Gonnet committed
910
 * @param Ngpart The number of stray gparts, contains the number of gparts
Pedro Gonnet's avatar
Pedro Gonnet committed
911
 *        received on return.
912
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
913
914
 * Note that this function does not mess-up the linkage between parts and
 * gparts, i.e. the received particles have correct linkeage.
915
 */
Pedro Gonnet's avatar
Pedro Gonnet committed
916
917
918
void engine_exchange_strays(struct engine *e, size_t offset_parts,
                            int *ind_part, size_t *Npart, size_t offset_gparts,
                            int *ind_gpart, size_t *Ngpart) {
919
920
921

#ifdef WITH_MPI

922
  struct space *s = e->s;
923
  ticks tic = getticks();
924
925

  /* Re-set the proxies. */
Pedro Gonnet's avatar
Pedro Gonnet committed
926
927
928
929
  for (int k = 0; k < e->nr_proxies; k++) {
    e->proxies[k].nr_parts_out = 0;
    e->proxies[k].nr_gparts_out = 0;
  }
930

931
  /* Put the parts and gparts into the corresponding proxies. */
932
  for (size_t k = 0; k < *Npart; k++) {
933
    /* Get the target node and proxy ID. */
934
    const int node_id = e->s->cells[ind_part[k]].nodeID;
935
936
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
937
    const int pid = e->proxy_ind[node_id];
938
    if (pid < 0) {
939
940
941
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%llu, x=[%e,%e,%e].",
Pedro Gonnet's avatar
Pedro Gonnet committed
942
943
944
          node_id, s->parts[offset_parts + k].id,
          s->parts[offset_parts + k].x[0], s->parts[offset_parts + k].x[1],
          s->parts[offset_parts + k].x[2]);
945
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
946

947
948
    /* Re-link the associated gpart with the buffer offset of the part. */
    if (s->parts[offset_parts + k].gpart != NULL) {
Pedro Gonnet's avatar
Pedro Gonnet committed
949
      s->parts[offset_parts + k].gpart->id = e->proxies[pid].nr_parts_out;
950
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
951

952
    /* Load the part and xpart into the proxy. */
953
954
955
    proxy_parts_load(&e->proxies[pid], &s->parts[offset_parts + k],
                     &s->xparts[offset_parts + k], 1);
  }
956
  for (size_t k = 0; k < *Ngpart; k++) {
957
    const int node_id = e->s->cells[ind_gpart[k]].nodeID;
958
959
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
960
    const int pid = e->proxy_ind[node_id];