engine.c 101 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
48
#include "engine.h"
49
50

/* Local headers. */
51
#include "atomic.h"
52
#include "cell.h"
53
#include "clocks.h"
54
55
#include "cycle.h"
#include "debug.h"
56
#include "error.h"
57
#include "hydro.h"
58
#include "minmax.h"
59
#include "parallel_io.h"
60
#include "part.h"
61
#include "partition.h"
62
63
#include "proxy.h"
#include "runner.h"
64
65
#include "serial_io.h"
#include "single_io.h"
66
#include "timers.h"
67
#include "units.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
68

Pedro Gonnet's avatar
Pedro Gonnet committed
69
70
71
72
73
74
75
76
77
78
79
80
81
const char *engine_policy_names[13] = {"none",
                                       "rand",
                                       "steal",
                                       "keep",
                                       "block",
                                       "fix_dt",
                                       "cpu_tight",
                                       "mpi",
                                       "numa_affinity",
                                       "hydro",
                                       "self_gravity",
                                       "external_gravity",
                                       "cosmology_integration"};
Pedro Gonnet's avatar
Pedro Gonnet committed
82

83
84
85
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

86
87
88
89
90
#ifdef HAVE_SETAFFINITY
/** The initial affinity of the main thread (set by engin_pin()) */
static cpu_set_t entry_affinity;
#endif

91
92
93
94
95
96
97
98
99
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
 * @param l The #link.
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
100
struct link *engine_addlink(struct engine *e, struct link *l, struct task *t) {
101

102
103
104
105
106
  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
107
108
109
110
  res->next = l;
  res->t = t;
  return res;
}
111
112

/**
113
114
 * @brief Generate the gravity hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
Matthieu Schaller's avatar
Matthieu Schaller committed
115
116
 *
 * Tasks are only created here. The dependencies will be added later on.
117
118
119
120
121
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
122
123
void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c,
                                            struct cell *super) {
124
125

  struct scheduler *s = &e->sched;
126
127
128
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity;
129
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;
130
131

  /* Am I the super-cell? */
132
133
134
  /* TODO(pedro): Add a condition for gravity tasks as well. */
  if (super == NULL &&
      (c->density != NULL || (!c->split && (c->count > 0 || c->gcount > 0)))) {
135

136
    /* This is the super cell, i.e. the first with density tasks attached. */
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      /* Add the init task. */
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);

      /* Add the drift task. */
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);

      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
      } else {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
      }

      if (is_with_external_gravity)
        c->grav_external = scheduler_addtask(
            s, task_type_grav_external, task_subtype_none, 0, 0, c, NULL, 0);
    }
  }

  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_gravity_hierarchical_tasks(e, c->progeny[k], super);
}

/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
void engine_make_hydro_hierarchical_tasks(struct engine *e, struct cell *c,
                                          struct cell *super) {

  struct scheduler *s = &e->sched;
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;

  /* Is this the super-cell? */
  if (super == NULL && (c->density != NULL || (c->count > 0 && !c->split))) {
197

198
    /* This is the super cell, i.e. the first with density tasks attached. */
199
200
201
202
203
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
204
      /* Add the init task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
205
206
207
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
208

Matthieu Schaller's avatar
Matthieu Schaller committed
209
      /* Add the drift task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
210
211
212
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);
213

214
215
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
216
217
218
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
219
      } else {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
220
221
222
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
223
      }
Tom Theuns's avatar
Tom Theuns committed
224

225
226
227
      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
228
    }
229
  }
230

231
232
233
234
235
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
236
    for (int k = 0; k < 8; k++)
237
      if (c->progeny[k] != NULL)
238
        engine_make_hydro_hierarchical_tasks(e, c->progeny[k], super);
239
}
240

241
/**
242
 * @brief Redistribute the particles amongst the nodes according
243
244
 *      to their cell's node IDs.
 *
245
246
247
248
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
249
250
251
252
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
253
254
 *
 *
255
256
 * @param e The #engine.
 */
257
void engine_redistribute(struct engine *e) {
258

259
#ifdef WITH_MPI
260

261
262
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
263
264
  struct space *s = e->s;
  struct cell *cells = s->cells;
265
  const int nr_cells = s->nr_cells;
266
267
268
269
270
271
  const int *cdim = s->cdim;
  const double ih[3] = {s->ih[0], s->ih[1], s->ih[2]};
  const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
  struct part *parts = s->parts;
  struct xpart *xparts = s->xparts;
  struct gpart *gparts = s->gparts;
272
  ticks tic = getticks();
273

274
275
276
  /* Allocate temporary arrays to store the counts of particles to be sent
     and the destination of each particle */
  int *counts, *g_counts;
277
  if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
278
    error("Failed to allocate count temporary buffer.");
279
  if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
280
    error("Failed to allocate gcount temporary buffer.");
281
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
282
283
  bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);

284
  /* Allocate the destination index arrays. */
285
286
  int *dest, *g_dest;
  if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
287
    error("Failed to allocate dest temporary buffer.");
288
  if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL)
289
290
    error("Failed to allocate g_dest temporary buffer.");

291
  /* Get destination of each particle */
292
  for (size_t k = 0; k < s->nr_parts; k++) {
293
294

    /* Periodic boundary conditions */
295
    for (int j = 0; j < 3; j++) {
296
297
298
299
300
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
301
302
    const int cid = cell_getid(cdim, parts[k].x[0] * ih[0],
                               parts[k].x[1] * ih[1], parts[k].x[2] * ih[2]);
303
304
305
306
307
308
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            parts[k].x[0], parts[k].x[1], parts[k].x[2]);
#endif

309
    dest[k] = cells[cid].nodeID;
310
311

    /* The counts array is indexed as count[from * nr_nodes + to]. */
312
313
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }
314
315

  /* Sort the particles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
316
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
317

318
  /* We need to re-link the gpart partners of parts. */
319
320
321
322
323
324
  if (s->nr_parts > 0) {
    int current_dest = dest[0];
    size_t count_this_dest = 0;
    for (size_t k = 0; k < s->nr_parts; ++k) {
      if (s->parts[k].gpart != NULL) {

325
326
327
328
329
        /* As the addresses will be invalidated by the communications, we will
         * instead store the absolute index from the start of the sub-array of
         * particles to be sent to a given node.
         * Recall that gparts without partners have a negative id.
         * We will restore the pointers on the receiving node later on. */
330
331
332
333
        if (dest[k] != current_dest) {
          current_dest = dest[k];
          count_this_dest = 0;
        }
334

335
#ifdef SWIFT_DEBUG_CHECKS
Matthieu Schaller's avatar
Matthieu Schaller committed
336
        if (s->parts[k].gpart->id_or_neg_offset >= 0)
337
338
          error("Trying to link a partnerless gpart !");
#endif
339

340
        s->parts[k].gpart->id_or_neg_offset = -count_this_dest;
341
        count_this_dest++;
342
343
344
345
346
      }
    }
  }

  /* Get destination of each g-particle */
347
  for (size_t k = 0; k < s->nr_gparts; k++) {
348
349

    /* Periodic boundary conditions */
350
    for (int j = 0; j < 3; j++) {
351
352
353
354
      if (gparts[k].x[j] < 0.0)
        gparts[k].x[j] += dim[j];
      else if (gparts[k].x[j] >= dim[j])
        gparts[k].x[j] -= dim[j];
355
    }
356
357
    const int cid = cell_getid(cdim, gparts[k].x[0] * ih[0],
                               gparts[k].x[1] * ih[1], gparts[k].x[2] * ih[2]);
358
359
360
361
362
363
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            gparts[k].x[0], gparts[k].x[1], gparts[k].x[2]);
#endif

364
    g_dest[k] = cells[cid].nodeID;
365
366

    /* The counts array is indexed as count[from * nr_nodes + to]. */
367
    g_counts[nodeID * nr_nodes + g_dest[k]] += 1;
368
  }
369
370

  /* Sort the gparticles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
371
  space_gparts_sort(s, g_dest, s->nr_gparts, 0, nr_nodes - 1, e->verbose);
372

373
374
375
376
377
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

378
379
380
381
382
383
  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Each node knows how many parts and gparts will be transferred to every
384
     other node. We can start preparing to receive data */
385
386
387

  /* Get the new number of parts and gparts for this node */
  size_t nr_parts = 0, nr_gparts = 0;
388
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
389
390
391
392
  for (int k = 0; k < nr_nodes; k++)
    nr_gparts += g_counts[k * nr_nodes + nodeID];

  /* Allocate the new arrays with some extra margin */
393
  struct part *parts_new = NULL;
394
  struct xpart *xparts_new = NULL;
395
  struct gpart *gparts_new = NULL;
396
  if (posix_memalign((void **)&parts_new, part_align,
397
398
                     sizeof(struct part) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
399
    error("Failed to allocate new part data.");
400
401
402
403
404
405
406
407
408
409
  if (posix_memalign((void **)&xparts_new, xpart_align,
                     sizeof(struct xpart) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new xpart data.");
  if (posix_memalign((void **)&gparts_new, gpart_align,
                     sizeof(struct gpart) * nr_gparts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new gpart data.");

  /* Prepare MPI requests for the asynchronous communications */
410
  MPI_Request *reqs;
411
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
412
413
      NULL)
    error("Failed to allocate MPI request list.");
414
415
416
417
418
419
420
421
422
423
424
425
  for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

  /* Emit the sends and recvs for the particle and gparticle data. */
  size_t offset_send = 0, offset_recv = 0;
  size_t g_offset_send = 0, g_offset_recv = 0;
  for (int k = 0; k < nr_nodes; k++) {

    /* Indices in the count arrays of the node of interest */
    const int ind_send = nodeID * nr_nodes + k;
    const int ind_recv = k * nr_nodes + nodeID;

    /* Are we sending any part/xpart ? */
426
    if (counts[ind_send] > 0) {
427

428
      /* message("Sending %d part to node %d", counts[ind_send], k); */
Matthieu Schaller's avatar
Matthieu Schaller committed
429

430
      /* If the send is to the same node, just copy */
431
432
433
434
435
436
437
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];
438
439

        /* Else, emit some communications */
440
      } else {
441
442
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type,
                      k, 3 * ind_send + 0, MPI_COMM_WORLD,
443
444
                      &reqs[6 * k]) != MPI_SUCCESS)
          error("Failed to isend parts to node %i.", k);
445
446
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type,
                      k, 3 * ind_send + 1, MPI_COMM_WORLD,
447
448
                      &reqs[6 * k + 1]) != MPI_SUCCESS)
          error("Failed to isend xparts to node %i.", k);
449
450
        offset_send += counts[ind_send];
      }
451
    }
452
453
454
455

    /* Are we sending any gpart ? */
    if (g_counts[ind_send] > 0) {

456
      /* message("Sending %d gpart to node %d", g_counts[ind_send], k); */
457

458
459
      /* If the send is to the same node, just copy */
      if (k == nodeID) {
460
        memcpy(&gparts_new[g_offset_recv], &s->gparts[g_offset_send],
461
462
463
464
465
466
467
               sizeof(struct gpart) * g_counts[ind_recv]);
        g_offset_send += g_counts[ind_send];
        g_offset_recv += g_counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
468
                      gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
469
470
                      &reqs[6 * k + 2]) != MPI_SUCCESS)
          error("Failed to isend gparts to node %i.", k);
471
        g_offset_send += g_counts[ind_send];
472
473
474
475
476
477
      }
    }

    /* Now emit the corresponding Irecv() */

    /* Are we receiving any part/xpart from this node ? */
478
    if (k != nodeID && counts[ind_recv] > 0) {
479
480
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], part_mpi_type, k,
                    3 * ind_recv + 0, MPI_COMM_WORLD,
481
482
                    &reqs[6 * k + 3]) != MPI_SUCCESS)
        error("Failed to emit irecv of parts from node %i.", k);
483
484
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], xpart_mpi_type,
                    k, 3 * ind_recv + 1, MPI_COMM_WORLD,
485
486
                    &reqs[6 * k + 4]) != MPI_SUCCESS)
        error("Failed to emit irecv of xparts from node %i.", k);
487
      offset_recv += counts[ind_recv];
488
    }
489
490
491
492

    /* Are we receiving any gpart from this node ? */
    if (k != nodeID && g_counts[ind_recv] > 0) {
      if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
493
                    gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
494
495
496
497
                    &reqs[6 * k + 5]) != MPI_SUCCESS)
        error("Failed to emit irecv of gparts from node %i.", k);
      g_offset_recv += g_counts[ind_recv];
    }
498
  }
499

500
  /* Wait for all the sends and recvs to tumble in. */
501
  MPI_Status stats[6 * nr_nodes];
502
  int res;
503
504
  if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
    for (int k = 0; k < 6 * nr_nodes; k++) {
505
506
507
      char buff[MPI_MAX_ERROR_STRING];
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
508
    }
509
510
    error("Failed during waitall for part data.");
  }
511

512
513
514
515
516
517
518
519
520
521
522
523
  /* We now need to restore the part<->gpart links */
  size_t offset_parts = 0, offset_gparts = 0;
  for (int node = 0; node < nr_nodes; ++node) {

    const int ind_recv = node * nr_nodes + nodeID;
    const size_t count_parts = counts[ind_recv];
    const size_t count_gparts = g_counts[ind_recv];

    /* Loop over the gparts received from that node */
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) {

      /* Does this gpart have a partner ? */
524
      if (gparts_new[k].id_or_neg_offset <= 0) {
525

Matthieu Schaller's avatar
Style    
Matthieu Schaller committed
526
527
        const ptrdiff_t partner_index =
            offset_parts - gparts_new[k].id_or_neg_offset;
528
529

        /* Re-link */
530
        gparts_new[k].id_or_neg_offset = -partner_index;
Pedro Gonnet's avatar
Pedro Gonnet committed
531
        parts_new[partner_index].gpart = &gparts_new[k];
532
533
534
535
536
537
538
      }
    }

    offset_parts += count_parts;
    offset_gparts += count_gparts;
  }

539
#ifdef SWIFT_DEBUG_CHECKS
540
  /* Verify that all parts are in the right place. */
541
542
543
544
545
546
547
  for (int k = 0; k < nr_parts; k++) {
    int cid = cell_getid(cdim, parts_new[k].x[0] * ih[0],
                         parts_new[k].x[1] * ih[1], parts_new[k].x[2] * ih[2]);
    if (cells[cid].nodeID != nodeID)
      error("Received particle (%i) that does not belong here (nodeID=%i).", k,
            cells[cid].nodeID);
  }
548

549
  /* Verify that the links are correct */
550
  for (size_t k = 0; k < nr_gparts; ++k) {
551

552
    if (gparts_new[k].id_or_neg_offset <= 0) {
553

554
      struct part *part = &parts_new[-gparts_new[k].id_or_neg_offset];
555

556
      if (part->gpart != &gparts_new[k]) error("Linking problem !");
557

558
      if (gparts_new[k].x[0] != part->x[0] ||
559
          gparts_new[k].x[1] != part->x[1] || gparts_new[k].x[2] != part->x[2])
560
561
562
563
        error("Linked particles are not at the same position !");
    }
  }
  for (size_t k = 0; k < nr_parts; ++k) {
Matthieu Schaller's avatar
Matthieu Schaller committed
564

565
566
    if (parts_new[k].gpart != NULL &&
        parts_new[k].gpart->id_or_neg_offset != -k) {
567
      error("Linking problem !");
568
569
570
    }
  }
#endif
571
572
573
574

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
575
  free(gparts);
576
577
  s->parts = parts_new;
  s->xparts = xparts_new;
578
  s->gparts = gparts_new;
579
  s->nr_parts = nr_parts;
580
581
582
  s->nr_gparts = nr_gparts;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;
583

584
  /* Clean up the temporary stuff. */
585
586
587
588
  free(reqs);
  free(counts);
  free(dest);

589
590
591
592
593
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
594
595
    message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID,
            nr_parts, nr_gparts, my_cells);
596
597
  }

598
599
600
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
601
#else
602
  error("SWIFT was not compiled with MPI support.");
603
604
#endif
}
605

606
/**
607
 * @brief Repartition the cells amongst the nodes.
608
609
610
 *
 * @param e The #engine.
 */
611
void engine_repartition(struct engine *e) {
612
613
614

#if defined(WITH_MPI) && defined(HAVE_METIS)

615
616
  ticks tic = getticks();

617
  /* Clear the repartition flag. */
618
  enum repartition_type reparttype = e->forcerepart;
619
  e->forcerepart = REPART_NONE;
620

621
622
  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
623
  if (e->nr_nodes == 1) return;
624

625
626
627
  /* Do the repartitioning. */
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);
628
629
630
631

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
632
     the parts array, and emitting the sends and receives.
633
634
635
636
637
638
639
640
641
642
643
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

644
645
646
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
647
#else
648
  error("SWIFT was not compiled with MPI and METIS support.");
649
#endif
650
}
651

652
653
654
655
656
657
658
659
/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
 */
660
661
void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {
662

663
664
665
666
667
668
669
670
671
672
  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}
673

674
675
676
677
/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
678
 * @param ci The sending #cell.
679
680
681
 * @param cj Dummy cell containing the nodeID of the receiving node.
 * @param t_xv The send_xv #task, if it has already been created.
 * @param t_rho The send_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
682
 * @param t_ti The send_ti #task, if required and has already been created.
683
 */
684
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj,
685
686
                          struct task *t_xv, struct task *t_rho,
                          struct task *t_ti) {
687

688
#ifdef WITH_MPI
689
690
  struct link *l = NULL;
  struct scheduler *s = &e->sched;
691
  const int nodeID = cj->nodeID;
692

693
694
  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
695
696
    if (l->t->ci->nodeID == nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == nodeID))
697
      break;
698

699
700
  /* If so, attach send tasks. */
  if (l != NULL) {
701

702
703
704
    /* Create the tasks and their dependencies? */
    if (t_xv == NULL) {
      t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
705
                               3 * ci->tag, 0, ci, cj, 0);
706
      t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
707
                                3 * ci->tag + 1, 0, ci, cj, 0);
708
      if (!(e->policy & engine_policy_fixdt))
709
        t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend,
710
                                 3 * ci->tag + 2, 0, ci, cj, 0);
711

712
713
      /* The send_rho task depends on the cell's ghost task. */
      scheduler_addunlock(s, ci->super->ghost, t_rho);
714

715
716
      /* The send_rho task should unlock the super-cell's kick task. */
      scheduler_addunlock(s, t_rho, ci->super->kick);
717

718
719
      /* The send_xv task should unlock the super-cell's ghost task. */
      scheduler_addunlock(s, t_xv, ci->super->ghost);
720

721
      /* The super-cell's kick task should unlock the send_ti task. */
722
      if (t_ti != NULL) scheduler_addunlock(s, ci->super->kick, t_ti);
723
    }
724

725
726
727
    /* Add them to the local cell. */
    ci->send_xv = engine_addlink(e, ci->send_xv, t_xv);
    ci->send_rho = engine_addlink(e, ci->send_rho, t_rho);
728
    if (t_ti != NULL) ci->send_ti = engine_addlink(e, ci->send_ti, t_ti);
729
  }
730

731
  /* Recurse? */
732
  if (ci->split)
733
    for (int k = 0; k < 8; k++)
734
      if (ci->progeny[k] != NULL)
735
        engine_addtasks_send(e, ci->progeny[k], cj, t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
736
737
738
739

#else
  error("SWIFT was not compiled with MPI support.");
#endif
740
}
741
742
743
744
745

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
746
 * @param c The foreign #cell.
747
748
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
749
 * @param t_ti The recv_ti #task, if required and has already been created.
750
 */
751
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
752
                          struct task *t_rho, struct task *t_ti) {
753

754
#ifdef WITH_MPI
755
  struct scheduler *s = &e->sched;
756

757
758
759
760
  /* Do we need to construct a recv task?
     Note that since c is a foreign cell, all its density tasks will involve
     only the current rank, and thus we don't have to check them.*/
  if (t_xv == NULL && c->density != NULL) {
761

762
    /* Create the tasks. */
763
    t_xv = scheduler_addtask(s, task_type_recv, task_subtype_none, 3 * c->tag,
764
765
                             0, c, NULL, 0);
    t_rho = scheduler_addtask(s, task_type_recv, task_subtype_none,
766
                              3 * c->tag + 1, 0, c, NULL, 0);
767
    if (!(e->policy & engine_policy_fixdt))
768
      t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend,
769
                               3 * c->tag + 2, 0, c, NULL, 0);
770
  }
771
772
  c->recv_xv = t_xv;
  c->recv_rho = t_rho;
773
  c->recv_ti = t_ti;
774

775
776
777
778
779
  /* Add dependencies. */
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
780
  for (struct link *l = c->force; l != NULL; l = l->next) {
781
    scheduler_addunlock(s, t_rho, l->t);
782
    if (t_ti != NULL) scheduler_addunlock(s, l->t, t_ti);
783
  }
784
785
786
787
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);

  /* Recurse? */
  if (c->split)
788
    for (int k = 0; k < 8; k++)
789
      if (c->progeny[k] != NULL)
790
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
791
792
793
794

#else
  error("SWIFT was not compiled with MPI support.");
#endif
795
}
796
797
798
799
800
801

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
802
void engine_exchange_cells(struct engine *e) {
803
804
805

#ifdef WITH_MPI

806
807
  struct space *s = e->s;
  struct cell *cells = s->cells;
808
809
  const int nr_cells = s->nr_cells;
  const int nr_proxies = e->nr_proxies;
810
811
812
813
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;
814
  ticks tic = getticks();
815
816
817

  /* Run through the cells and get the size of the ones that will be sent off.
   */
818
819
820
  int count_out = 0;
  for (int k = 0; k < nr_cells; k++) {
    offset[k] = count_out;
821
    if (cells[k].sendto)
822
      count_out += (cells[k].pcell_size = cell_getsize(&cells[k]));
823
  }
824

825
  /* Allocate the pcells. */
826
  struct pcell *pcells;
827
828
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count_out)) ==
      NULL)
829
830
831
832
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
833
  for (int k = 0; k < nr_cells; k++)
834
835
836
837
838
839
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
840
  for (int k = 0; k < nr_proxies; k++) {
841
842
843
844
845
846
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
847
  for (int k = 0; k < nr_proxies; k++) {
848
    int pid = MPI_UNDEFINED;
849
850
851
852
853
854
855
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_cells_exch2(&e->proxies[pid]);
  }

856
  /* Wait for all the sends to have finished too. */
857
858
859
860
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Set the requests for the cells. */
861
  for (int k = 0; k < nr_proxies; k++) {
862
863
864
865
866
    reqs_in[k] = e->proxies[k].req_cells_in;
    reqs_out[k] = e->proxies[k].req_cells_out;
  }

  /* Wait for each pcell array to come in from the proxies. */
867
  for (int k = 0; k < nr_proxies; k++) {
868
    int pid = MPI_UNDEFINED;
869
870
871
872
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "cell data from proxy %i has arrived." , pid );
873
    for (int count = 0, j = 0; j < e->proxies[pid].nr_cells_in; j++)
874
875
876
877
      count += cell_unpack(&e->proxies[pid].pcells_in[count],
                           e->proxies[pid].cells_in[j], e->s);
  }

878
  /* Wait for all the sends to have finished too. */
879
880
881
882
883
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Count the number of particles we need to import and re-allocate
     the buffer if needed. */
884
  int count_parts_in = 0, count_gparts_in = 0;
885
  for (int k = 0; k < nr_proxies; k++)
886
887
888
889
890
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
      count_parts_in += e->proxies[k].cells_in[j]->count;
      count_gparts_in += e->proxies[k].cells_in[j]->gcount;
    }
  if (count_parts_in > s->size_parts_foreign) {
891
    if (s->parts_foreign != NULL) free(s->parts_foreign);
892
    s->size_parts_foreign = 1.1 * count_parts_in;
893
894
895
896
    if (posix_memalign((void **)&s->parts_foreign, part_align,
                       sizeof(struct part) * s->size_parts_foreign) != 0)
      error("Failed to allocate foreign part data.");
  }
897
898
899
900
901
902
903
  if (count_gparts_in > s->size_gparts_foreign) {
    if (s->gparts_foreign != NULL) free(s->gparts_foreign);
    s->size_gparts_foreign = 1.1 * count_gparts_in;
    if (posix_memalign((void **)&s->gparts_foreign, gpart_align,
                       sizeof(struct gpart) * s->size_gparts_foreign) != 0)
      error("Failed to allocate foreign gpart data.");
  }
904

905
  /* Unpack the cells and link to the particle data. */
906
  struct part *parts = s->parts_foreign;
907
  struct gpart *gparts = s->gparts_foreign;
908
909
  for (int k = 0; k < nr_proxies; k++) {
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
910
      cell_link_parts(e->proxies[k].cells_in[j], parts);
911
      cell_link_gparts(e->proxies[k].cells_in[j], gparts);
912
      parts = &parts[e->proxies[k].cells_in[j]->count];
913
      gparts = &gparts[e->proxies[k].cells_in[j]->gcount];
914
    }
915
916
  }
  s->nr_parts_foreign = parts - s->parts_foreign;
917
  s->nr_gparts_foreign = gparts - s->gparts_foreign;
918
919
920

  /* Free the pcell buffer. */
  free(pcells);