engine.c 103 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
Matthieu Schaller's avatar
Matthieu Schaller committed
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
45
46
#ifdef HAVE_LIBNUMA
#include <numa.h>
#endif

47
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
48
#include "engine.h"
49
50

/* Local headers. */
51
#include "atomic.h"
52
#include "cell.h"
53
#include "clocks.h"
54
55
#include "cycle.h"
#include "debug.h"
56
#include "error.h"
57
#include "hydro.h"
58
#include "minmax.h"
Matthieu Schaller's avatar
Matthieu Schaller committed
59
#include "parallel_io.h"
60
#include "part.h"
61
#include "partition.h"
62
63
#include "proxy.h"
#include "runner.h"
64
65
#include "serial_io.h"
#include "single_io.h"
66
#include "timers.h"
67
#include "tools.h"
68
#include "units.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
69

Matthieu Schaller's avatar
Matthieu Schaller committed
70
71
72
73
74
75
76
77
78
79
80
81
82
const char *engine_policy_names[13] = {"none",
                                       "rand",
                                       "steal",
                                       "keep",
                                       "block",
                                       "fix_dt",
                                       "cpu_tight",
                                       "mpi",
                                       "numa_affinity",
                                       "hydro",
                                       "self_gravity",
                                       "external_gravity",
                                       "cosmology_integration"};
83

84
85
86
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

87
88
89
90
91
#ifdef HAVE_SETAFFINITY
/** The initial affinity of the main thread (set by engin_pin()) */
static cpu_set_t entry_affinity;
#endif

92
93
94
95
96
97
98
99
100
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
 * @param l The #link.
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
101
struct link *engine_addlink(struct engine *e, struct link *l, struct task *t) {
102

103
104
  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
105
    error("Link table overflow.");
106
107
  }
  struct link *res = &e->links[ind];
108
109
110
111
  res->next = l;
  res->t = t;
  return res;
}
112
113

/**
114
115
 * @brief Generate the gravity hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
Matthieu Schaller's avatar
Matthieu Schaller committed
116
117
 *
 * Tasks are only created here. The dependencies will be added later on.
118
119
120
121
122
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
123
124
void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c,
                                            struct cell *super) {
125
126

  struct scheduler *s = &e->sched;
127
128
129
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity;
130
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;
131

132
133
  /* Is this the super-cell? */
  if (super == NULL && (c->grav != NULL || (c->gcount > 0 && !c->split))) {
134

135
    /* This is the super cell, i.e. the first with gravity tasks attached. */
136
137
138
139
140
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
141
      /* Add the init task. */
142
143
144
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
145

Matthieu Schaller's avatar
Matthieu Schaller committed
146
      /* Add the drift task. */
147
148
149
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);
150

151
152
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
153
154
155
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
156
      } else {
157
158
159
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
160
      }
Tom Theuns's avatar
Tom Theuns committed
161

162
163
164
165
166
      if (is_with_external_gravity)
        c->grav_external = scheduler_addtask(
            s, task_type_grav_external, task_subtype_none, 0, 0, c, NULL, 0);
    }
  }
167

168
169
170
171
172
173
174
175
176
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_gravity_hierarchical_tasks(e, c->progeny[k], super);
}
177

178
179
180
181
182
183
184
185
186
187
188
189
/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
void engine_make_hydro_hierarchical_tasks(struct engine *e, struct cell *c,
                                          struct cell *super) {
190

191
192
193
194
195
  struct scheduler *s = &e->sched;
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;

  /* Is this the super-cell? */
  if (super == NULL && (c->density != NULL || (c->count > 0 && !c->split))) {
196

197
    /* This is the super cell, i.e. the first with density tasks attached. */
198
199
200
201
202
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
203
      /* Add the init task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
204
205
206
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
207

Matthieu Schaller's avatar
Matthieu Schaller committed
208
      /* Add the drift task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
209
210
211
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);
212

213
214
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
215
216
217
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
218
      } else {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
219
220
221
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
222
      }
Tom Theuns's avatar
Tom Theuns committed
223

224
225
226
      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
227
    }
228
  }
229

230
231
232
233
234
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
235
    for (int k = 0; k < 8; k++)
236
      if (c->progeny[k] != NULL)
237
        engine_make_hydro_hierarchical_tasks(e, c->progeny[k], super);
238
}
239

240
/**
241
 * @brief Redistribute the particles amongst the nodes according
242
243
 *      to their cell's node IDs.
 *
244
245
246
247
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
248
249
250
251
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
252
253
 *
 *
254
255
 * @param e The #engine.
 */
256
void engine_redistribute(struct engine *e) {
257

258
#ifdef WITH_MPI
259

260
261
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
262
263
  struct space *s = e->s;
  struct cell *cells = s->cells;
264
  const int nr_cells = s->nr_cells;
265
266
267
268
269
270
  const int *cdim = s->cdim;
  const double ih[3] = {s->ih[0], s->ih[1], s->ih[2]};
  const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
  struct part *parts = s->parts;
  struct xpart *xparts = s->xparts;
  struct gpart *gparts = s->gparts;
271
  ticks tic = getticks();
272

273
274
275
  /* Allocate temporary arrays to store the counts of particles to be sent
     and the destination of each particle */
  int *counts, *g_counts;
276
  if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
277
    error("Failed to allocate count temporary buffer.");
278
  if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
279
    error("Failed to allocate gcount temporary buffer.");
280
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
281
282
  bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);

283
  /* Allocate the destination index arrays. */
284
285
  int *dest, *g_dest;
  if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
286
    error("Failed to allocate dest temporary buffer.");
287
  if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL)
288
289
    error("Failed to allocate g_dest temporary buffer.");

290
  /* Get destination of each particle */
291
  for (size_t k = 0; k < s->nr_parts; k++) {
292
293

    /* Periodic boundary conditions */
294
    for (int j = 0; j < 3; j++) {
295
296
297
298
299
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
300
301
    const int cid = cell_getid(cdim, parts[k].x[0] * ih[0],
                               parts[k].x[1] * ih[1], parts[k].x[2] * ih[2]);
302
303
304
305
306
307
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            parts[k].x[0], parts[k].x[1], parts[k].x[2]);
#endif

308
    dest[k] = cells[cid].nodeID;
309
310

    /* The counts array is indexed as count[from * nr_nodes + to]. */
311
312
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }
313
314

  /* Sort the particles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
315
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
316

317
  /* We need to re-link the gpart partners of parts. */
318
319
320
321
322
323
  if (s->nr_parts > 0) {
    int current_dest = dest[0];
    size_t count_this_dest = 0;
    for (size_t k = 0; k < s->nr_parts; ++k) {
      if (s->parts[k].gpart != NULL) {

324
325
326
327
328
        /* As the addresses will be invalidated by the communications, we will
         * instead store the absolute index from the start of the sub-array of
         * particles to be sent to a given node.
         * Recall that gparts without partners have a negative id.
         * We will restore the pointers on the receiving node later on. */
329
330
331
332
333
        if (dest[k] != current_dest) {
          current_dest = dest[k];
          count_this_dest = 0;
        }

334
335
336
337
#ifdef SWIFT_DEBUG_CHECKS
        if (s->parts[k].gpart->id < 0)
          error("Trying to link a partnerless gpart !");
#endif
338
339
340

        s->parts[k].gpart->id = count_this_dest;
        count_this_dest++;
341
342
343
344
345
      }
    }
  }

  /* Get destination of each g-particle */
346
  for (size_t k = 0; k < s->nr_gparts; k++) {
347
348

    /* Periodic boundary conditions */
349
    for (int j = 0; j < 3; j++) {
350
351
352
353
      if (gparts[k].x[j] < 0.0)
        gparts[k].x[j] += dim[j];
      else if (gparts[k].x[j] >= dim[j])
        gparts[k].x[j] -= dim[j];
354
    }
355
356
    const int cid = cell_getid(cdim, gparts[k].x[0] * ih[0],
                               gparts[k].x[1] * ih[1], gparts[k].x[2] * ih[2]);
357
358
359
360
361
362
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            gparts[k].x[0], gparts[k].x[1], gparts[k].x[2]);
#endif

363
    g_dest[k] = cells[cid].nodeID;
364
365

    /* The counts array is indexed as count[from * nr_nodes + to]. */
366
    g_counts[nodeID * nr_nodes + g_dest[k]] += 1;
367
  }
368
369

  /* Sort the gparticles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
370
  space_gparts_sort(s, g_dest, s->nr_gparts, 0, nr_nodes - 1, e->verbose);
371

372
373
374
375
376
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

377
378
379
380
381
382
  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Each node knows how many parts and gparts will be transferred to every
383
     other node. We can start preparing to receive data */
384
385
386

  /* Get the new number of parts and gparts for this node */
  size_t nr_parts = 0, nr_gparts = 0;
387
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
388
389
390
391
  for (int k = 0; k < nr_nodes; k++)
    nr_gparts += g_counts[k * nr_nodes + nodeID];

  /* Allocate the new arrays with some extra margin */
392
  struct part *parts_new = NULL;
393
  struct xpart *xparts_new = NULL;
394
  struct gpart *gparts_new = NULL;
395
  if (posix_memalign((void **)&parts_new, part_align,
396
397
                     sizeof(struct part) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
398
    error("Failed to allocate new part data.");
399
400
401
402
403
404
405
406
407
408
  if (posix_memalign((void **)&xparts_new, xpart_align,
                     sizeof(struct xpart) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new xpart data.");
  if (posix_memalign((void **)&gparts_new, gpart_align,
                     sizeof(struct gpart) * nr_gparts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new gpart data.");

  /* Prepare MPI requests for the asynchronous communications */
409
  MPI_Request *reqs;
410
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
411
412
      NULL)
    error("Failed to allocate MPI request list.");
413
414
415
416
417
418
419
420
421
422
423
424
  for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

  /* Emit the sends and recvs for the particle and gparticle data. */
  size_t offset_send = 0, offset_recv = 0;
  size_t g_offset_send = 0, g_offset_recv = 0;
  for (int k = 0; k < nr_nodes; k++) {

    /* Indices in the count arrays of the node of interest */
    const int ind_send = nodeID * nr_nodes + k;
    const int ind_recv = k * nr_nodes + nodeID;

    /* Are we sending any part/xpart ? */
425
    if (counts[ind_send] > 0) {
426

427
      /* message("Sending %d part to node %d", counts[ind_send], k); */
Matthieu Schaller's avatar
Matthieu Schaller committed
428

429
      /* If the send is to the same node, just copy */
430
431
432
433
434
435
436
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];
437
438

        /* Else, emit some communications */
439
      } else {
440
441
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type,
                      k, 3 * ind_send + 0, MPI_COMM_WORLD,
442
443
                      &reqs[6 * k]) != MPI_SUCCESS)
          error("Failed to isend parts to node %i.", k);
444
445
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type,
                      k, 3 * ind_send + 1, MPI_COMM_WORLD,
446
447
                      &reqs[6 * k + 1]) != MPI_SUCCESS)
          error("Failed to isend xparts to node %i.", k);
448
449
        offset_send += counts[ind_send];
      }
450
    }
451
452
453
454

    /* Are we sending any gpart ? */
    if (g_counts[ind_send] > 0) {

455
      /* message("Sending %d gpart to node %d", g_counts[ind_send], k); */
456

457
458
      /* If the send is to the same node, just copy */
      if (k == nodeID) {
459
        memcpy(&gparts_new[g_offset_recv], &s->gparts[g_offset_send],
460
461
462
463
464
465
466
               sizeof(struct gpart) * g_counts[ind_recv]);
        g_offset_send += g_counts[ind_send];
        g_offset_recv += g_counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
467
                      gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
468
469
                      &reqs[6 * k + 2]) != MPI_SUCCESS)
          error("Failed to isend gparts to node %i.", k);
470
        g_offset_send += g_counts[ind_send];
471
472
473
474
475
476
      }
    }

    /* Now emit the corresponding Irecv() */

    /* Are we receiving any part/xpart from this node ? */
477
    if (k != nodeID && counts[ind_recv] > 0) {
478
479
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], part_mpi_type, k,
                    3 * ind_recv + 0, MPI_COMM_WORLD,
480
481
                    &reqs[6 * k + 3]) != MPI_SUCCESS)
        error("Failed to emit irecv of parts from node %i.", k);
482
483
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], xpart_mpi_type,
                    k, 3 * ind_recv + 1, MPI_COMM_WORLD,
484
485
                    &reqs[6 * k + 4]) != MPI_SUCCESS)
        error("Failed to emit irecv of xparts from node %i.", k);
486
      offset_recv += counts[ind_recv];
487
    }
488
489
490
491

    /* Are we receiving any gpart from this node ? */
    if (k != nodeID && g_counts[ind_recv] > 0) {
      if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
492
                    gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
493
494
495
496
                    &reqs[6 * k + 5]) != MPI_SUCCESS)
        error("Failed to emit irecv of gparts from node %i.", k);
      g_offset_recv += g_counts[ind_recv];
    }
497
  }
498

499
  /* Wait for all the sends and recvs to tumble in. */
500
  MPI_Status stats[6 * nr_nodes];
501
  int res;
502
503
  if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
    for (int k = 0; k < 6 * nr_nodes; k++) {
504
505
506
      char buff[MPI_MAX_ERROR_STRING];
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
507
    }
508
509
    error("Failed during waitall for part data.");
  }
510

511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
  /* We now need to restore the part<->gpart links */
  size_t offset_parts = 0, offset_gparts = 0;
  for (int node = 0; node < nr_nodes; ++node) {

    const int ind_recv = node * nr_nodes + nodeID;
    const size_t count_parts = counts[ind_recv];
    const size_t count_gparts = g_counts[ind_recv];

    /* Loop over the gparts received from that node */
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) {

      /* Does this gpart have a partner ? */
      if (gparts_new[k].id >= 0) {

        const size_t partner_index = offset_parts + gparts_new[k].id;

        /* Re-link */
        gparts_new[k].part = &parts_new[partner_index];
        gparts_new[k].part->gpart = &gparts_new[k];
      }
    }

    offset_parts += count_parts;
    offset_gparts += count_gparts;
  }

537
#ifdef SWIFT_DEBUG_CHECKS
538
  /* Verify that all parts are in the right place. */
539
540
541
542
543
544
545
  for (int k = 0; k < nr_parts; k++) {
    int cid = cell_getid(cdim, parts_new[k].x[0] * ih[0],
                         parts_new[k].x[1] * ih[1], parts_new[k].x[2] * ih[2]);
    if (cells[cid].nodeID != nodeID)
      error("Received particle (%i) that does not belong here (nodeID=%i).", k,
            cells[cid].nodeID);
  }
546

547
548
549
550
551
  /* Verify that the links are correct */
  for (size_t k = 0; k < nr_gparts; ++k) {

    if (gparts_new[k].id > 0) {

Matthieu Schaller's avatar
Matthieu Schaller committed
552
553
      if (gparts_new[k].part->gpart != &gparts_new[k])
        error("Linking problem !");
554

555
556
557
558
      if (gparts_new[k].x[0] != gparts_new[k].part->x[0] ||
          gparts_new[k].x[1] != gparts_new[k].part->x[1] ||
          gparts_new[k].x[2] != gparts_new[k].part->x[2])
        error("Linked particles are not at the same position !");
559
560
    }
  }
561
  for (size_t k = 0; k < nr_parts; ++k) {
Matthieu Schaller's avatar
Matthieu Schaller committed
562

563
564
    if (parts_new[k].gpart != NULL) {

Matthieu Schaller's avatar
Matthieu Schaller committed
565
      if (parts_new[k].gpart->part != &parts_new[k]) error("Linking problem !");
566
567
    }
  }
568
#endif
569
570
571
572

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
573
  free(gparts);
574
575
  s->parts = parts_new;
  s->xparts = xparts_new;
576
  s->gparts = gparts_new;
577
  s->nr_parts = nr_parts;
578
579
580
  s->nr_gparts = nr_gparts;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;
581

582
  /* Clean up the temporary stuff. */
583
584
585
586
  free(reqs);
  free(counts);
  free(dest);

587
588
589
590
591
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
592
593
    message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID,
            nr_parts, nr_gparts, my_cells);
594
595
  }

596
597
598
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
599
#else
600
  error("SWIFT was not compiled with MPI support.");
601
602
#endif
}
603

604
/**
605
 * @brief Repartition the cells amongst the nodes.
606
607
608
 *
 * @param e The #engine.
 */
609
void engine_repartition(struct engine *e) {
610
611
612

#if defined(WITH_MPI) && defined(HAVE_METIS)

613
614
  ticks tic = getticks();

615
  /* Clear the repartition flag. */
616
  enum repartition_type reparttype = e->forcerepart;
617
  e->forcerepart = REPART_NONE;
618

619
620
  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
621
  if (e->nr_nodes == 1) return;
622

623
  /* Do the repartitioning. */
624
625
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);
626
627
628
629

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
630
     the parts array, and emitting the sends and receives.
631
632
633
634
635
636
637
638
639
640
641
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

642
643
644
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
645
#else
646
  error("SWIFT was not compiled with MPI and METIS support.");
647
#endif
648
}
649

650
651
652
653
654
655
656
/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
Matthieu Schaller's avatar
Matthieu Schaller committed
657
 */
658
659
void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {
Matthieu Schaller's avatar
Matthieu Schaller committed
660

661
662
663
664
665
666
667
668
669
670
  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}
671

672
673
674
675
/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
676
677
 * @param ci The sending #cell.
 * @param cj The receiving #cell
678
 */
679
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj) {
680

Matthieu Schaller's avatar
Matthieu Schaller committed
681
#ifdef WITH_MPI
682
683
  struct link *l = NULL;
  struct scheduler *s = &e->sched;
684

685
686
687
688
689
  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
    if (l->t->ci->nodeID == cj->nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == cj->nodeID))
      break;
690

691
692
  /* If so, attach send tasks. */
  if (l != NULL) {
693

694
    /* Create the tasks. */
Pedro Gonnet's avatar
Pedro Gonnet committed
695
696
697
698
    struct task *t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
                                          2 * ci->tag, 0, ci, cj, 0);
    struct task *t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
                                           2 * ci->tag + 1, 0, ci, cj, 0);
699

700
701
    /* The send_rho task depends on the cell's ghost task. */
    scheduler_addunlock(s, ci->super->ghost, t_rho);
702

Matthieu Schaller's avatar
Matthieu Schaller committed
703
704
    /* The send_rho task should unlock the super-cell's kick task. */
    scheduler_addunlock(s, t_rho, ci->super->kick);
705

706
707
    /* The send_xv task should unlock the super-cell's ghost task. */
    scheduler_addunlock(s, t_xv, ci->super->ghost);
708

709
  }
710

711
712
  /* Recurse? */
  else if (ci->split)
713
    for (int k = 0; k < 8; k++)
714
      if (ci->progeny[k] != NULL) engine_addtasks_send(e, ci->progeny[k], cj);
Matthieu Schaller's avatar
Matthieu Schaller committed
715
716
717
718

#else
  error("SWIFT was not compiled with MPI support.");
#endif
719
}
720
721
722
723
724
725
726
727
728

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
 */
729
730
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
                          struct task *t_rho) {
731

Matthieu Schaller's avatar
Matthieu Schaller committed
732
#ifdef WITH_MPI
733
  struct scheduler *s = &e->sched;
734

735
736
  /* Do we need to construct a recv task? */
  if (t_xv == NULL && c->nr_density > 0) {
737

738
    /* Create the tasks. */
Pedro Gonnet's avatar
Pedro Gonnet committed
739
740
741
742
    t_xv = c->recv_xv = scheduler_addtask(s, task_type_recv, task_subtype_none,
                                          2 * c->tag, 0, c, NULL, 0);
    t_rho = c->recv_rho = scheduler_addtask(
        s, task_type_recv, task_subtype_none, 2 * c->tag + 1, 0, c, NULL, 0);
743
  }
744

745
746
747
748
749
750
751
752
753
754
755
  /* Add dependencies. */
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
  for (struct link *l = c->force; l != NULL; l = l->next)
    scheduler_addunlock(s, t_rho, l->t);
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);

  /* Recurse? */
  if (c->split)
756
    for (int k = 0; k < 8; k++)
757
758
      if (c->progeny[k] != NULL)
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho);
Matthieu Schaller's avatar
Matthieu Schaller committed
759
760
761
762

#else
  error("SWIFT was not compiled with MPI support.");
#endif
763
}
764
765
766
767
768
769

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
770
void engine_exchange_cells(struct engine *e) {
771
772
773

#ifdef WITH_MPI

774
775
  struct space *s = e->s;
  struct cell *cells = s->cells;
776
777
  const int nr_cells = s->nr_cells;
  const int nr_proxies = e->nr_proxies;
778
779
780
781
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;
782
  ticks tic = getticks();
783
784
785

  /* Run through the cells and get the size of the ones that will be sent off.
   */
786
787
788
  int count_out = 0;
  for (int k = 0; k < nr_cells; k++) {
    offset[k] = count_out;
789
    if (cells[k].sendto)
790
      count_out += (cells[k].pcell_size = cell_getsize(&cells[k]));
791
  }
792

793
  /* Allocate the pcells. */
794
  struct pcell *pcells;
795
796
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count_out)) ==
      NULL)
797
798
799
800
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
801
  for (int k = 0; k < nr_cells; k++)
802
803
804
805
806
807
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
808
  for (int k = 0; k < nr_proxies; k++) {
809
810
811
812
813
814
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
815
  for (int k = 0; k < nr_proxies; k++) {
816
    int pid = MPI_UNDEFINED;
817
818
819
820
821
822
823
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_cells_exch2(&e->proxies[pid]);
  }

824
  /* Wait for all the sends to have finished too. */
825
826
827
828
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Set the requests for the cells. */
829
  for (int k = 0; k < nr_proxies; k++) {
830
831
832
833
834
    reqs_in[k] = e->proxies[k].req_cells_in;
    reqs_out[k] = e->proxies[k].req_cells_out;
  }

  /* Wait for each pcell array to come in from the proxies. */
835
  for (int k = 0; k < nr_proxies; k++) {
836
    int pid = MPI_UNDEFINED;
837
838
839
840
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "cell data from proxy %i has arrived." , pid );
841
    for (int count = 0, j = 0; j < e->proxies[pid].nr_cells_in; j++)
842
843
844
845
      count += cell_unpack(&e->proxies[pid].pcells_in[count],
                           e->proxies[pid].cells_in[j], e->s);
  }

846
  /* Wait for all the sends to have finished too. */
847
848
849
850
851
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Count the number of particles we need to import and re-allocate
     the buffer if needed. */
852
  int count_parts_in = 0, count_gparts_in = 0;
853
  for (int k = 0; k < nr_proxies; k++)
854
855
856
857
858
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
      count_parts_in += e->proxies[k].cells_in[j]->count;
      count_gparts_in += e->proxies[k].cells_in[j]->gcount;
    }
  if (count_parts_in > s->size_parts_foreign) {
859
    if (s->parts_foreign != NULL) free(s->parts_foreign);
860
    s->size_parts_foreign = 1.1 * count_parts_in;
861
862
863
864
    if (posix_memalign((void **)&s->parts_foreign, part_align,
                       sizeof(struct part) * s->size_parts_foreign) != 0)
      error("Failed to allocate foreign part data.");
  }
865
866
867
868
869
870
871
  if (count_gparts_in > s->size_gparts_foreign) {
    if (s->gparts_foreign != NULL) free(s->gparts_foreign);
    s->size_gparts_foreign = 1.1 * count_gparts_in;
    if (posix_memalign((void **)&s->gparts_foreign, gpart_align,
                       sizeof(struct gpart) * s->size_gparts_foreign) != 0)
      error("Failed to allocate foreign gpart data.");
  }
872

873
  /* Unpack the cells and link to the particle data. */
874
  struct part *parts = s->parts_foreign;
875
  struct gpart *gparts = s->gparts_foreign;
876
877
  for (int k = 0; k < nr_proxies; k++) {
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
878
      cell_link_parts(e->proxies[k].cells_in[j], parts);
879
      cell_link_gparts(e->proxies[k].cells_in[j], gparts);
880
      parts = &parts[e->proxies[k].cells_in[j]->count];
881
      gparts = &gparts[e->proxies[k].cells_in[j]->gcount];
882
    }
883
884
  }
  s->nr_parts_foreign = parts - s->parts_foreign;
885
  s->nr_gparts_foreign = gparts - s->gparts_foreign;
886
887
888

  /* Free the pcell buffer. */
  free(pcells);
889

890
891
892
893
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());

894
895
896
897
#else
  error("SWIFT was not compiled with MPI support.");
#endif
}
898
899
900
901
902

/**
 * @brief Exchange straying parts with other nodes.
 *
 * @param e The #engine.
Pedro Gonnet's avatar
Pedro Gonnet committed
903
 * @param offset_parts The index in the parts array as of which the foreign
Pedro Gonnet's avatar
Pedro Gonnet committed
904
 *        parts reside.
905
906
907
 * @param ind_part The foreign #cell ID of each part.
 * @param Npart The number of stray parts, contains the number of parts received
 *        on return.
Pedro Gonnet's avatar
Pedro Gonnet committed
908
 * @param offset_gparts The index in the gparts array as of which the foreign
Pedro Gonnet's avatar
Pedro Gonnet committed
909
 *        parts reside.
910
 * @param ind_gpart The foreign #cell ID of each gpart.
Pedro Gonnet's avatar
Pedro Gonnet committed
911
 * @param Ngpart The number of stray gparts, contains the number of gparts
Pedro Gonnet's avatar
Pedro Gonnet committed
912
 *        received on return.
913
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
914
915
 * Note that this function does not mess-up the linkage between parts and
 * gparts, i.e. the received particles have correct linkeage.
916
 */
Pedro Gonnet's avatar
Pedro Gonnet committed
917
918
919
void engine_exchange_strays(struct engine *e, size_t offset_parts,
                            int *ind_part, size_t *Npart, size_t offset_gparts,
                            int *ind_gpart, size_t *Ngpart) {
920
921
922

#ifdef WITH_MPI

923
  struct space *s = e->s;
924
  ticks tic = getticks();
925
926

  /* Re-set the proxies. */
Pedro Gonnet's avatar
Pedro Gonnet committed
927
928
929
930
  for (int k = 0; k < e->nr_proxies; k++) {
    e->proxies[k].nr_parts_out = 0;
    e->proxies[k].nr_gparts_out = 0;
  }
931

932
  /* Put the parts and gparts into the corresponding proxies. */
933
  for (size_t k = 0; k < *Npart; k++) {
934
    /* Get the target node and proxy ID. */
935
    const int node_id = e->s->cells[ind_part[k]].nodeID;
936
937
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
938
    const int pid = e->proxy_ind[node_id];
Pedro Gonnet's avatar