engine.c 105 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
48
#include "engine.h"
49
50

/* Local headers. */
51
#include "atomic.h"
52
#include "cell.h"
53
#include "clocks.h"
54
55
#include "cycle.h"
#include "debug.h"
56
#include "error.h"
57
#include "hydro.h"
58
#include "minmax.h"
59
#include "parallel_io.h"
60
#include "part.h"
61
#include "partition.h"
62
63
#include "proxy.h"
#include "runner.h"
64
65
#include "serial_io.h"
#include "single_io.h"
66
#include "timers.h"
67
#include "units.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
68

Pedro Gonnet's avatar
Pedro Gonnet committed
69
70
71
72
73
74
75
76
77
78
79
80
81
const char *engine_policy_names[13] = {"none",
                                       "rand",
                                       "steal",
                                       "keep",
                                       "block",
                                       "fix_dt",
                                       "cpu_tight",
                                       "mpi",
                                       "numa_affinity",
                                       "hydro",
                                       "self_gravity",
                                       "external_gravity",
                                       "cosmology_integration"};
Pedro Gonnet's avatar
Pedro Gonnet committed
82

83
84
85
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

86
87
88
89
90
#ifdef HAVE_SETAFFINITY
/** The initial affinity of the main thread (set by engin_pin()) */
static cpu_set_t entry_affinity;
#endif

91
92
93
94
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
95
 * @param l A pointer to the #link, will be modified atomically.
96
97
98
99
100
 * @param t The #task.
 *
 * @return The new #link pointer.
 */

101
void engine_addlink(struct engine *e, struct link **l, struct task *t) {
102

103
  /* Get the next free link. */
104
105
106
107
108
  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
109

110
  /* Set it atomically. */
111
  res->t = t;
112
  res->next = atomic_swap(l, res);
113
}
114
115

/**
116
117
 * @brief Generate the gravity hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
Matthieu Schaller's avatar
Matthieu Schaller committed
118
119
 *
 * Tasks are only created here. The dependencies will be added later on.
120
121
122
123
124
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
125
126
void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c,
                                            struct cell *super) {
127
128

  struct scheduler *s = &e->sched;
129
130
131
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity;
132
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;
133
134

  /* Am I the super-cell? */
135
136
137
  /* TODO(pedro): Add a condition for gravity tasks as well. */
  if (super == NULL &&
      (c->density != NULL || (!c->split && (c->count > 0 || c->gcount > 0)))) {
138

139
    /* This is the super cell, i.e. the first with density tasks attached. */
140
141
142
143
144
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
145
      /* Add the init task. */
146
147
148
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
149

150
151
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
152
153
154
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
155
      } else {
156
157
158
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
159
      }
Tom Theuns's avatar
Tom Theuns committed
160

161
162
163
164
165
      if (is_with_external_gravity)
        c->grav_external = scheduler_addtask(
            s, task_type_grav_external, task_subtype_none, 0, 0, c, NULL, 0);
    }
  }
166

167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_gravity_hierarchical_tasks(e, c->progeny[k], super);
}

/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
void engine_make_hydro_hierarchical_tasks(struct engine *e, struct cell *c,
                                          struct cell *super) {

  struct scheduler *s = &e->sched;
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;

  /* Is this the super-cell? */
  if (super == NULL && (c->density != NULL || (c->count > 0 && !c->split))) {
195

196
    /* This is the super cell, i.e. the first with density tasks attached. */
197
198
199
200
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {
201

Matthieu Schaller's avatar
Matthieu Schaller committed
202
      /* Add the init task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
203
204
205
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
206

207
208
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
209
210
211
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
212
      } else {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
213
214
215
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
216
      }
Tom Theuns's avatar
Tom Theuns committed
217

218
219
220
      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
221
    }
222
  }
223

224
225
226
227
228
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
229
    for (int k = 0; k < 8; k++)
230
      if (c->progeny[k] != NULL)
231
        engine_make_hydro_hierarchical_tasks(e, c->progeny[k], super);
232
}
233

234
/**
235
 * @brief Redistribute the particles amongst the nodes according
236
237
 *      to their cell's node IDs.
 *
238
239
240
241
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
242
243
244
245
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
246
247
 *
 *
248
249
 * @param e The #engine.
 */
250
void engine_redistribute(struct engine *e) {
251

252
#ifdef WITH_MPI
253

254
255
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
256
257
  struct space *s = e->s;
  struct cell *cells = s->cells;
258
  const int nr_cells = s->nr_cells;
259
260
261
262
263
264
  const int *cdim = s->cdim;
  const double ih[3] = {s->ih[0], s->ih[1], s->ih[2]};
  const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
  struct part *parts = s->parts;
  struct xpart *xparts = s->xparts;
  struct gpart *gparts = s->gparts;
265
  ticks tic = getticks();
266

267
268
269
  /* Allocate temporary arrays to store the counts of particles to be sent
     and the destination of each particle */
  int *counts, *g_counts;
270
  if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
271
    error("Failed to allocate count temporary buffer.");
272
  if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
273
    error("Failed to allocate gcount temporary buffer.");
274
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
275
276
  bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);

277
  /* Allocate the destination index arrays. */
278
279
  int *dest, *g_dest;
  if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
280
    error("Failed to allocate dest temporary buffer.");
281
  if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL)
282
283
    error("Failed to allocate g_dest temporary buffer.");

284
  /* Get destination of each particle */
285
  for (size_t k = 0; k < s->nr_parts; k++) {
286
287

    /* Periodic boundary conditions */
288
    for (int j = 0; j < 3; j++) {
289
290
291
292
293
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
294
295
    const int cid = cell_getid(cdim, parts[k].x[0] * ih[0],
                               parts[k].x[1] * ih[1], parts[k].x[2] * ih[2]);
296
297
298
299
300
301
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            parts[k].x[0], parts[k].x[1], parts[k].x[2]);
#endif

302
    dest[k] = cells[cid].nodeID;
303
304

    /* The counts array is indexed as count[from * nr_nodes + to]. */
305
306
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }
307
308

  /* Sort the particles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
309
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
310

311
  /* We need to re-link the gpart partners of parts. */
312
313
314
315
316
317
  if (s->nr_parts > 0) {
    int current_dest = dest[0];
    size_t count_this_dest = 0;
    for (size_t k = 0; k < s->nr_parts; ++k) {
      if (s->parts[k].gpart != NULL) {

318
319
320
321
322
        /* As the addresses will be invalidated by the communications, we will
         * instead store the absolute index from the start of the sub-array of
         * particles to be sent to a given node.
         * Recall that gparts without partners have a negative id.
         * We will restore the pointers on the receiving node later on. */
323
324
325
326
        if (dest[k] != current_dest) {
          current_dest = dest[k];
          count_this_dest = 0;
        }
327

328
#ifdef SWIFT_DEBUG_CHECKS
Matthieu Schaller's avatar
Matthieu Schaller committed
329
        if (s->parts[k].gpart->id_or_neg_offset >= 0)
330
331
          error("Trying to link a partnerless gpart !");
#endif
332

333
        s->parts[k].gpart->id_or_neg_offset = -count_this_dest;
334
        count_this_dest++;
335
336
337
338
339
      }
    }
  }

  /* Get destination of each g-particle */
340
  for (size_t k = 0; k < s->nr_gparts; k++) {
341
342

    /* Periodic boundary conditions */
343
    for (int j = 0; j < 3; j++) {
344
345
346
347
      if (gparts[k].x[j] < 0.0)
        gparts[k].x[j] += dim[j];
      else if (gparts[k].x[j] >= dim[j])
        gparts[k].x[j] -= dim[j];
348
    }
349
350
    const int cid = cell_getid(cdim, gparts[k].x[0] * ih[0],
                               gparts[k].x[1] * ih[1], gparts[k].x[2] * ih[2]);
351
352
353
354
355
356
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            gparts[k].x[0], gparts[k].x[1], gparts[k].x[2]);
#endif

357
    g_dest[k] = cells[cid].nodeID;
358
359

    /* The counts array is indexed as count[from * nr_nodes + to]. */
360
    g_counts[nodeID * nr_nodes + g_dest[k]] += 1;
361
  }
362
363

  /* Sort the gparticles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
364
  space_gparts_sort(s, g_dest, s->nr_gparts, 0, nr_nodes - 1, e->verbose);
365

366
367
368
369
370
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

371
372
373
374
375
376
  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Each node knows how many parts and gparts will be transferred to every
377
     other node. We can start preparing to receive data */
378
379
380

  /* Get the new number of parts and gparts for this node */
  size_t nr_parts = 0, nr_gparts = 0;
381
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
382
383
384
385
  for (int k = 0; k < nr_nodes; k++)
    nr_gparts += g_counts[k * nr_nodes + nodeID];

  /* Allocate the new arrays with some extra margin */
386
  struct part *parts_new = NULL;
387
  struct xpart *xparts_new = NULL;
388
  struct gpart *gparts_new = NULL;
389
  if (posix_memalign((void **)&parts_new, part_align,
390
391
                     sizeof(struct part) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
392
    error("Failed to allocate new part data.");
393
394
395
396
397
398
399
400
401
402
  if (posix_memalign((void **)&xparts_new, xpart_align,
                     sizeof(struct xpart) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new xpart data.");
  if (posix_memalign((void **)&gparts_new, gpart_align,
                     sizeof(struct gpart) * nr_gparts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new gpart data.");

  /* Prepare MPI requests for the asynchronous communications */
403
  MPI_Request *reqs;
404
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
405
406
      NULL)
    error("Failed to allocate MPI request list.");
407
408
409
410
411
412
413
414
415
416
417
418
  for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

  /* Emit the sends and recvs for the particle and gparticle data. */
  size_t offset_send = 0, offset_recv = 0;
  size_t g_offset_send = 0, g_offset_recv = 0;
  for (int k = 0; k < nr_nodes; k++) {

    /* Indices in the count arrays of the node of interest */
    const int ind_send = nodeID * nr_nodes + k;
    const int ind_recv = k * nr_nodes + nodeID;

    /* Are we sending any part/xpart ? */
419
    if (counts[ind_send] > 0) {
420

421
      /* message("Sending %d part to node %d", counts[ind_send], k); */
Matthieu Schaller's avatar
Matthieu Schaller committed
422

423
      /* If the send is to the same node, just copy */
424
425
426
427
428
429
430
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];
431
432

        /* Else, emit some communications */
433
      } else {
434
435
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type,
                      k, 3 * ind_send + 0, MPI_COMM_WORLD,
436
437
                      &reqs[6 * k]) != MPI_SUCCESS)
          error("Failed to isend parts to node %i.", k);
438
439
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type,
                      k, 3 * ind_send + 1, MPI_COMM_WORLD,
440
441
                      &reqs[6 * k + 1]) != MPI_SUCCESS)
          error("Failed to isend xparts to node %i.", k);
442
443
        offset_send += counts[ind_send];
      }
444
    }
445
446
447
448

    /* Are we sending any gpart ? */
    if (g_counts[ind_send] > 0) {

449
      /* message("Sending %d gpart to node %d", g_counts[ind_send], k); */
450

451
452
      /* If the send is to the same node, just copy */
      if (k == nodeID) {
453
        memcpy(&gparts_new[g_offset_recv], &s->gparts[g_offset_send],
454
455
456
457
458
459
460
               sizeof(struct gpart) * g_counts[ind_recv]);
        g_offset_send += g_counts[ind_send];
        g_offset_recv += g_counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
461
                      gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
462
463
                      &reqs[6 * k + 2]) != MPI_SUCCESS)
          error("Failed to isend gparts to node %i.", k);
464
        g_offset_send += g_counts[ind_send];
465
466
467
468
469
470
      }
    }

    /* Now emit the corresponding Irecv() */

    /* Are we receiving any part/xpart from this node ? */
471
    if (k != nodeID && counts[ind_recv] > 0) {
472
473
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], part_mpi_type, k,
                    3 * ind_recv + 0, MPI_COMM_WORLD,
474
475
                    &reqs[6 * k + 3]) != MPI_SUCCESS)
        error("Failed to emit irecv of parts from node %i.", k);
476
477
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], xpart_mpi_type,
                    k, 3 * ind_recv + 1, MPI_COMM_WORLD,
478
479
                    &reqs[6 * k + 4]) != MPI_SUCCESS)
        error("Failed to emit irecv of xparts from node %i.", k);
480
      offset_recv += counts[ind_recv];
481
    }
482
483
484
485

    /* Are we receiving any gpart from this node ? */
    if (k != nodeID && g_counts[ind_recv] > 0) {
      if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
486
                    gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
487
488
489
490
                    &reqs[6 * k + 5]) != MPI_SUCCESS)
        error("Failed to emit irecv of gparts from node %i.", k);
      g_offset_recv += g_counts[ind_recv];
    }
491
  }
492

493
  /* Wait for all the sends and recvs to tumble in. */
494
  MPI_Status stats[6 * nr_nodes];
495
  int res;
496
497
  if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
    for (int k = 0; k < 6 * nr_nodes; k++) {
498
499
500
      char buff[MPI_MAX_ERROR_STRING];
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
501
    }
502
503
    error("Failed during waitall for part data.");
  }
504

505
506
507
508
509
510
511
512
513
514
515
516
  /* We now need to restore the part<->gpart links */
  size_t offset_parts = 0, offset_gparts = 0;
  for (int node = 0; node < nr_nodes; ++node) {

    const int ind_recv = node * nr_nodes + nodeID;
    const size_t count_parts = counts[ind_recv];
    const size_t count_gparts = g_counts[ind_recv];

    /* Loop over the gparts received from that node */
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) {

      /* Does this gpart have a partner ? */
517
      if (gparts_new[k].id_or_neg_offset <= 0) {
518

Matthieu Schaller's avatar
Style    
Matthieu Schaller committed
519
520
        const ptrdiff_t partner_index =
            offset_parts - gparts_new[k].id_or_neg_offset;
521
522

        /* Re-link */
523
        gparts_new[k].id_or_neg_offset = -partner_index;
Pedro Gonnet's avatar
Pedro Gonnet committed
524
        parts_new[partner_index].gpart = &gparts_new[k];
525
526
527
528
529
530
531
      }
    }

    offset_parts += count_parts;
    offset_gparts += count_gparts;
  }

532
#ifdef SWIFT_DEBUG_CHECKS
533
  /* Verify that all parts are in the right place. */
534
535
536
537
538
539
540
  for (int k = 0; k < nr_parts; k++) {
    int cid = cell_getid(cdim, parts_new[k].x[0] * ih[0],
                         parts_new[k].x[1] * ih[1], parts_new[k].x[2] * ih[2]);
    if (cells[cid].nodeID != nodeID)
      error("Received particle (%i) that does not belong here (nodeID=%i).", k,
            cells[cid].nodeID);
  }
541

542
543
544
  /* Verify that the links are correct */
  for (size_t k = 0; k < nr_gparts; ++k) {

545
    if (gparts_new[k].id_or_neg_offset <= 0) {
546

547
      struct part *part = &parts_new[-gparts_new[k].id_or_neg_offset];
548

549
      if (part->gpart != &gparts_new[k]) error("Linking problem !");
550

551
      if (gparts_new[k].x[0] != part->x[0] ||
552
          gparts_new[k].x[1] != part->x[1] || gparts_new[k].x[2] != part->x[2])
553
        error("Linked particles are not at the same position !");
554
555
    }
  }
556
  for (size_t k = 0; k < nr_parts; ++k) {
Matthieu Schaller's avatar
Matthieu Schaller committed
557

558
559
    if (parts_new[k].gpart != NULL &&
        parts_new[k].gpart->id_or_neg_offset != -k) {
560
      error("Linking problem !");
561
562
    }
  }
563
#endif
564
565
566
567

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
568
  free(gparts);
569
570
  s->parts = parts_new;
  s->xparts = xparts_new;
571
  s->gparts = gparts_new;
572
  s->nr_parts = nr_parts;
573
574
575
  s->nr_gparts = nr_gparts;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;
576

577
  /* Clean up the temporary stuff. */
578
579
580
581
  free(reqs);
  free(counts);
  free(dest);

582
583
584
585
586
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
587
588
    message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID,
            nr_parts, nr_gparts, my_cells);
589
590
  }

591
592
593
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
594
#else
595
  error("SWIFT was not compiled with MPI support.");
596
597
#endif
}
598

599
/**
600
 * @brief Repartition the cells amongst the nodes.
601
602
603
 *
 * @param e The #engine.
 */
604
void engine_repartition(struct engine *e) {
605
606
607

#if defined(WITH_MPI) && defined(HAVE_METIS)

608
609
  ticks tic = getticks();

610
  /* Clear the repartition flag. */
611
  enum repartition_type reparttype = e->forcerepart;
612
  e->forcerepart = REPART_NONE;
613

614
615
  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
616
  if (e->nr_nodes == 1) return;
617

618
619
620
  /* Do the repartitioning. */
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);
621
622
623
624

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
625
     the parts array, and emitting the sends and receives.
626
627
628
629
630
631
632
633
634
635
636
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

637
638
639
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
640
#else
641
  error("SWIFT was not compiled with MPI and METIS support.");
642
#endif
643
}
644

645
646
647
648
649
650
651
652
/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
 */
653
654
void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {
655

656
657
658
659
660
661
662
663
664
665
  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}
666

667
668
669
670
/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
671
 * @param ci The sending #cell.
672
673
674
 * @param cj Dummy cell containing the nodeID of the receiving node.
 * @param t_xv The send_xv #task, if it has already been created.
 * @param t_rho The send_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
675
 * @param t_ti The send_ti #task, if required and has already been created.
676
 */
677
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj,
678
679
                          struct task *t_xv, struct task *t_rho,
                          struct task *t_ti) {
680

681
#ifdef WITH_MPI
682
683
  struct link *l = NULL;
  struct scheduler *s = &e->sched;
684
  const int nodeID = cj->nodeID;
685

686
687
  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
688
689
    if (l->t->ci->nodeID == nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == nodeID))
690
      break;
691

692
693
  /* If so, attach send tasks. */
  if (l != NULL) {
694

695
696
697
    /* Create the tasks and their dependencies? */
    if (t_xv == NULL) {
      t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
698
                               3 * ci->tag, 0, ci, cj, 0);
699
      t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
700
                                3 * ci->tag + 1, 0, ci, cj, 0);
701
      if (!(e->policy & engine_policy_fixdt))
702
        t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend,
703
                                 3 * ci->tag + 2, 0, ci, cj, 0);
704

705
706
      /* The send_rho task depends on the cell's ghost task. */
      scheduler_addunlock(s, ci->super->ghost, t_rho);
707

708
709
      /* The send_rho task should unlock the super-cell's kick task. */
      scheduler_addunlock(s, t_rho, ci->super->kick);
710

711
712
      /* The send_xv task should unlock the super-cell's ghost task. */
      scheduler_addunlock(s, t_xv, ci->super->ghost);
713

714
      /* The super-cell's kick task should unlock the send_ti task. */
715
      if (t_ti != NULL) scheduler_addunlock(s, ci->super->kick, t_ti);
716
    }
717

718
    /* Add them to the local cell. */
719
720
721
    engine_addlink(e, &ci->send_xv, t_xv);
    engine_addlink(e, &ci->send_rho, t_rho);
    if (t_ti != NULL) engine_addlink(e, &ci->send_ti, t_ti);
722
  }
723

724
  /* Recurse? */
725
  if (ci->split)
726
    for (int k = 0; k < 8; k++)
727
      if (ci->progeny[k] != NULL)
728
        engine_addtasks_send(e, ci->progeny[k], cj, t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
729
730
731
732

#else
  error("SWIFT was not compiled with MPI support.");
#endif
733
}
734
735
736
737
738

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
739
 * @param c The foreign #cell.
740
741
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
742
 * @param t_ti The recv_ti #task, if required and has already been created.
743
744
 */

745
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
746
                          struct task *t_rho, struct task *t_ti) {
747

748
#ifdef WITH_MPI
749
  struct scheduler *s = &e->sched;
750

751
752
753
754
  /* Do we need to construct a recv task?
     Note that since c is a foreign cell, all its density tasks will involve
     only the current rank, and thus we don't have to check them.*/
  if (t_xv == NULL && c->density != NULL) {
755

756
    /* Create the tasks. */
757
    t_xv = scheduler_addtask(s, task_type_recv, task_subtype_none, 3 * c->tag,
758
759
                             0, c, NULL, 0);
    t_rho = scheduler_addtask(s, task_type_recv, task_subtype_none,
760
                              3 * c->tag + 1, 0, c, NULL, 0);
761
    if (!(e->policy & engine_policy_fixdt))
762
      t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend,
763
                               3 * c->tag + 2, 0, c, NULL, 0);
764
  }
765
766
  c->recv_xv = t_xv;
  c->recv_rho = t_rho;
767
  c->recv_ti = t_ti;
768

769
770
771
772
773
  /* Add dependencies. */
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
774
  for (struct link *l = c->force; l != NULL; l = l->next) {
775
    scheduler_addunlock(s, t_rho, l->t);
776
    if (t_ti != NULL) scheduler_addunlock(s, l->t, t_ti);
777
  }
778
779
780
781
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);

  /* Recurse? */
  if (c->split)
782
    for (int k = 0; k < 8; k++)
783
      if (c->progeny[k] != NULL)
784
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
785
786
787
788

#else
  error("SWIFT was not compiled with MPI support.");
#endif
789
}
790
791
792
793
794
795

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
796
void engine_exchange_cells(struct engine *e) {
797
798
799

#ifdef WITH_MPI

800
801
  struct space *s = e->s;
  struct cell *cells = s->cells;
802
803
  const int nr_cells = s->nr_cells;
  const int nr_proxies = e->nr_proxies;
804
805
806
807
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;
808
  ticks tic = getticks();
809
810
811

  /* Run through the cells and get the size of the ones that will be sent off.
   */
812
813
814
  int count_out = 0;
  for (int k = 0; k < nr_cells; k++) {
    offset[k] = count_out;
815
    if (cells[k].sendto)
816
      count_out += (cells[k].pcell_size = cell_getsize(&cells[k]));
817
  }
818

819
  /* Allocate the pcells. */
820
  struct pcell *pcells;
821
822
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count_out)) ==
      NULL)
823
824
825
826
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
827
  for (int k = 0; k < nr_cells; k++)
828
829
830
831
832
833
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
834
  for (int k = 0; k < nr_proxies; k++) {
835
836
837
838
839
840
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
841
  for (int k = 0; k < nr_proxies; k++) {
842
    int pid = MPI_UNDEFINED;
843
844
845
846
847
848
849
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_cells_exch2(&e->proxies[pid]);
  }

850
  /* Wait for all the sends to have finished too. */
851
852
853
854
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Set the requests for the cells. */
855
  for (int k = 0; k < nr_proxies; k++) {
856
857
858
859
860
    reqs_in[k] = e->proxies[k].req_cells_in;
    reqs_out[k] = e->proxies[k].req_cells_out;
  }

  /* Wait for each pcell array to come in from the proxies. */
861
  for (int k = 0; k < nr_proxies; k++) {
862
    int pid = MPI_UNDEFINED;
863
864
865
866
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "cell data from proxy %i has arrived." , pid );
867
    for (int count = 0, j = 0; j < e->proxies[pid].nr_cells_in; j++)
868
869
870
871
      count += cell_unpack(&e->proxies[pid].pcells_in[count],
                           e->proxies[pid].cells_in[j], e->s);
  }

872
  /* Wait for all the sends to have finished too. */
873
874
875
876
877
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Count the number of particles we need to import and re-allocate
     the buffer if needed. */
878
  int count_parts_in = 0, count_gparts_in = 0;
879
  for (int k = 0; k < nr_proxies; k++)
880
881
882
883
884
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
      count_parts_in += e->proxies[k].cells_in[j]->count;
      count_gparts_in += e->proxies[k].cells_in[j]->gcount;
    }
  if (count_parts_in > s->size_parts_foreign) {
885
    if (s->parts_foreign != NULL) free(s->parts_foreign);
886
    s->size_parts_foreign = 1.1 * count_parts_in;
887
888
889
890
    if (posix_memalign((void **)&s->parts_foreign, part_align,
                       sizeof(struct part) * s->size_parts_foreign) != 0)
      error("Failed to allocate foreign part data.");
  }
891
892
893
894
895
896
897
  if (count_gparts_in > s->size_gparts_foreign) {
    if (s->gparts_foreign != NULL) free(s->gparts_foreign);
    s->size_gparts_foreign = 1.1 * count_gparts_in;
    if (posix_memalign((void **)&s->gparts_foreign, gpart_align,
                       sizeof(struct gpart) * s->size_gparts_foreign) != 0)
      error("Failed to allocate foreign gpart data.");
  }
898