engine.c 103 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
48
#include "engine.h"
49
50

/* Local headers. */
51
#include "atomic.h"
52
#include "cell.h"
53
#include "clocks.h"
54
55
#include "cycle.h"
#include "debug.h"
56
#include "error.h"
57
#include "hydro.h"
58
#include "minmax.h"
59
#include "parallel_io.h"
60
#include "part.h"
61
#include "partition.h"
62
63
#include "proxy.h"
#include "runner.h"
64
65
#include "serial_io.h"
#include "single_io.h"
66
#include "timers.h"
67
#include "units.h"
68
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
69

Peter W. Draper's avatar
Peter W. Draper committed
70
71
72
73
74
75
76
77
78
79
80
81
82
const char *engine_policy_names[13] = {"none",
                                       "rand",
                                       "steal",
                                       "keep",
                                       "block",
                                       "fix_dt",
                                       "cpu_tight",
                                       "mpi",
                                       "numa_affinity",
                                       "hydro",
                                       "self_gravity",
                                       "external_gravity",
                                       "cosmology_integration"};
Pedro Gonnet's avatar
Pedro Gonnet committed
83

84
85
86
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

87
88
89
90
91
#ifdef HAVE_SETAFFINITY
/** The initial affinity of the main thread (set by engin_pin()) */
static cpu_set_t entry_affinity;
#endif

92
93
94
95
96
97
98
99
100
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
 * @param l The #link.
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
101
struct link *engine_addlink(struct engine *e, struct link *l, struct task *t) {
102

103
104
105
106
107
  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
108
109
110
111
  res->next = l;
  res->t = t;
  return res;
}
112
113

/**
114
115
 * @brief Generate the gravity hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
Matthieu Schaller's avatar
Matthieu Schaller committed
116
117
 *
 * Tasks are only created here. The dependencies will be added later on.
118
119
120
121
122
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
123
124
void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c,
                                            struct cell *super) {
125
126

  struct scheduler *s = &e->sched;
127
128
129
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity;
130
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;
131
132

  /* Am I the super-cell? */
133
134
135
  /* TODO(pedro): Add a condition for gravity tasks as well. */
  if (super == NULL &&
      (c->density != NULL || (!c->split && (c->count > 0 || c->gcount > 0)))) {
136

137
    /* This is the super cell, i.e. the first with density tasks attached. */
138
139
140
141
142
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
143
      /* Add the init task. */
144
145
146
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
147

Matthieu Schaller's avatar
Matthieu Schaller committed
148
      /* Add the drift task. */
149
150
151
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);
152

153
154
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
155
156
157
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
158
      } else {
159
160
161
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
162
      }
Tom Theuns's avatar
Tom Theuns committed
163

164
165
166
167
168
      if (is_with_external_gravity)
        c->grav_external = scheduler_addtask(
            s, task_type_grav_external, task_subtype_none, 0, 0, c, NULL, 0);
    }
  }
169

170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_gravity_hierarchical_tasks(e, c->progeny[k], super);
}

/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
void engine_make_hydro_hierarchical_tasks(struct engine *e, struct cell *c,
                                          struct cell *super) {

  struct scheduler *s = &e->sched;
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;

  /* Is this the super-cell? */
  if (super == NULL && (c->density != NULL || (c->count > 0 && !c->split))) {
198

199
    /* This is the super cell, i.e. the first with density tasks attached. */
200
201
202
203
204
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
205
      /* Add the init task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
206
207
208
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
209

Matthieu Schaller's avatar
Matthieu Schaller committed
210
      /* Add the drift task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
211
212
213
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);
214

215
216
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
217
218
219
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
220
      } else {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
221
222
223
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
224
      }
Tom Theuns's avatar
Tom Theuns committed
225

226
227
228
      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
229
    }
230
  }
231

232
233
234
235
236
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
237
    for (int k = 0; k < 8; k++)
238
      if (c->progeny[k] != NULL)
239
        engine_make_hydro_hierarchical_tasks(e, c->progeny[k], super);
240
}
241

242
/**
243
 * @brief Redistribute the particles amongst the nodes according
244
245
 *      to their cell's node IDs.
 *
246
247
248
249
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
250
251
252
253
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
254
255
 *
 *
256
257
 * @param e The #engine.
 */
258
void engine_redistribute(struct engine *e) {
259

260
#ifdef WITH_MPI
261

262
263
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
264
265
  struct space *s = e->s;
  struct cell *cells = s->cells;
266
  const int nr_cells = s->nr_cells;
267
268
269
270
271
272
  const int *cdim = s->cdim;
  const double ih[3] = {s->ih[0], s->ih[1], s->ih[2]};
  const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
  struct part *parts = s->parts;
  struct xpart *xparts = s->xparts;
  struct gpart *gparts = s->gparts;
273
  ticks tic = getticks();
274

275
276
277
  /* Allocate temporary arrays to store the counts of particles to be sent
     and the destination of each particle */
  int *counts, *g_counts;
278
  if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
279
    error("Failed to allocate count temporary buffer.");
280
  if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
281
    error("Failed to allocate gcount temporary buffer.");
282
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
283
284
  bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);

285
  /* Allocate the destination index arrays. */
286
287
  int *dest, *g_dest;
  if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
288
    error("Failed to allocate dest temporary buffer.");
289
  if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL)
290
291
    error("Failed to allocate g_dest temporary buffer.");

292
  /* Get destination of each particle */
293
  for (size_t k = 0; k < s->nr_parts; k++) {
294
295

    /* Periodic boundary conditions */
296
    for (int j = 0; j < 3; j++) {
297
298
299
300
301
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
302
303
    const int cid = cell_getid(cdim, parts[k].x[0] * ih[0],
                               parts[k].x[1] * ih[1], parts[k].x[2] * ih[2]);
304
305
306
307
308
309
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            parts[k].x[0], parts[k].x[1], parts[k].x[2]);
#endif

310
    dest[k] = cells[cid].nodeID;
311
312

    /* The counts array is indexed as count[from * nr_nodes + to]. */
313
314
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }
315
316

  /* Sort the particles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
317
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
318

319
  /* We need to re-link the gpart partners of parts. */
320
321
322
323
324
325
  if (s->nr_parts > 0) {
    int current_dest = dest[0];
    size_t count_this_dest = 0;
    for (size_t k = 0; k < s->nr_parts; ++k) {
      if (s->parts[k].gpart != NULL) {

326
327
328
329
330
        /* As the addresses will be invalidated by the communications, we will
         * instead store the absolute index from the start of the sub-array of
         * particles to be sent to a given node.
         * Recall that gparts without partners have a negative id.
         * We will restore the pointers on the receiving node later on. */
331
332
333
334
        if (dest[k] != current_dest) {
          current_dest = dest[k];
          count_this_dest = 0;
        }
335

336
#ifdef SWIFT_DEBUG_CHECKS
Matthieu Schaller's avatar
Matthieu Schaller committed
337
        if (s->parts[k].gpart->id_or_neg_offset >= 0)
338
339
          error("Trying to link a partnerless gpart !");
#endif
340

341
        s->parts[k].gpart->id_or_neg_offset = -count_this_dest;
342
        count_this_dest++;
343
344
345
346
347
      }
    }
  }

  /* Get destination of each g-particle */
348
  for (size_t k = 0; k < s->nr_gparts; k++) {
349
350

    /* Periodic boundary conditions */
351
    for (int j = 0; j < 3; j++) {
352
353
354
355
      if (gparts[k].x[j] < 0.0)
        gparts[k].x[j] += dim[j];
      else if (gparts[k].x[j] >= dim[j])
        gparts[k].x[j] -= dim[j];
356
    }
357
358
    const int cid = cell_getid(cdim, gparts[k].x[0] * ih[0],
                               gparts[k].x[1] * ih[1], gparts[k].x[2] * ih[2]);
359
360
361
362
363
364
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            gparts[k].x[0], gparts[k].x[1], gparts[k].x[2]);
#endif

365
    g_dest[k] = cells[cid].nodeID;
366
367

    /* The counts array is indexed as count[from * nr_nodes + to]. */
368
    g_counts[nodeID * nr_nodes + g_dest[k]] += 1;
369
  }
370
371

  /* Sort the gparticles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
372
  space_gparts_sort(s, g_dest, s->nr_gparts, 0, nr_nodes - 1, e->verbose);
373

374
375
376
377
378
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

379
380
381
382
383
384
  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Each node knows how many parts and gparts will be transferred to every
385
     other node. We can start preparing to receive data */
386
387
388

  /* Get the new number of parts and gparts for this node */
  size_t nr_parts = 0, nr_gparts = 0;
389
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
390
391
392
393
  for (int k = 0; k < nr_nodes; k++)
    nr_gparts += g_counts[k * nr_nodes + nodeID];

  /* Allocate the new arrays with some extra margin */
394
  struct part *parts_new = NULL;
395
  struct xpart *xparts_new = NULL;
396
  struct gpart *gparts_new = NULL;
397
  if (posix_memalign((void **)&parts_new, part_align,
398
399
                     sizeof(struct part) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
400
    error("Failed to allocate new part data.");
401
402
403
404
405
406
407
408
409
410
  if (posix_memalign((void **)&xparts_new, xpart_align,
                     sizeof(struct xpart) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new xpart data.");
  if (posix_memalign((void **)&gparts_new, gpart_align,
                     sizeof(struct gpart) * nr_gparts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new gpart data.");

  /* Prepare MPI requests for the asynchronous communications */
411
  MPI_Request *reqs;
412
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
413
414
      NULL)
    error("Failed to allocate MPI request list.");
415
416
417
418
419
420
421
422
423
424
425
426
  for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

  /* Emit the sends and recvs for the particle and gparticle data. */
  size_t offset_send = 0, offset_recv = 0;
  size_t g_offset_send = 0, g_offset_recv = 0;
  for (int k = 0; k < nr_nodes; k++) {

    /* Indices in the count arrays of the node of interest */
    const int ind_send = nodeID * nr_nodes + k;
    const int ind_recv = k * nr_nodes + nodeID;

    /* Are we sending any part/xpart ? */
427
    if (counts[ind_send] > 0) {
428

429
      /* message("Sending %d part to node %d", counts[ind_send], k); */
Matthieu Schaller's avatar
Matthieu Schaller committed
430

431
      /* If the send is to the same node, just copy */
432
433
434
435
436
437
438
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];
439
440

        /* Else, emit some communications */
441
      } else {
442
443
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type,
                      k, 3 * ind_send + 0, MPI_COMM_WORLD,
444
445
                      &reqs[6 * k]) != MPI_SUCCESS)
          error("Failed to isend parts to node %i.", k);
446
447
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type,
                      k, 3 * ind_send + 1, MPI_COMM_WORLD,
448
449
                      &reqs[6 * k + 1]) != MPI_SUCCESS)
          error("Failed to isend xparts to node %i.", k);
450
451
        offset_send += counts[ind_send];
      }
452
    }
453
454
455
456

    /* Are we sending any gpart ? */
    if (g_counts[ind_send] > 0) {

457
      /* message("Sending %d gpart to node %d", g_counts[ind_send], k); */
458

459
460
      /* If the send is to the same node, just copy */
      if (k == nodeID) {
461
        memcpy(&gparts_new[g_offset_recv], &s->gparts[g_offset_send],
462
463
464
465
466
467
468
               sizeof(struct gpart) * g_counts[ind_recv]);
        g_offset_send += g_counts[ind_send];
        g_offset_recv += g_counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
469
                      gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
470
471
                      &reqs[6 * k + 2]) != MPI_SUCCESS)
          error("Failed to isend gparts to node %i.", k);
472
        g_offset_send += g_counts[ind_send];
473
474
475
476
477
478
      }
    }

    /* Now emit the corresponding Irecv() */

    /* Are we receiving any part/xpart from this node ? */
479
    if (k != nodeID && counts[ind_recv] > 0) {
480
481
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], part_mpi_type, k,
                    3 * ind_recv + 0, MPI_COMM_WORLD,
482
483
                    &reqs[6 * k + 3]) != MPI_SUCCESS)
        error("Failed to emit irecv of parts from node %i.", k);
484
485
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], xpart_mpi_type,
                    k, 3 * ind_recv + 1, MPI_COMM_WORLD,
486
487
                    &reqs[6 * k + 4]) != MPI_SUCCESS)
        error("Failed to emit irecv of xparts from node %i.", k);
488
      offset_recv += counts[ind_recv];
489
    }
490
491
492
493

    /* Are we receiving any gpart from this node ? */
    if (k != nodeID && g_counts[ind_recv] > 0) {
      if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
494
                    gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
495
496
497
498
                    &reqs[6 * k + 5]) != MPI_SUCCESS)
        error("Failed to emit irecv of gparts from node %i.", k);
      g_offset_recv += g_counts[ind_recv];
    }
499
  }
500

501
  /* Wait for all the sends and recvs to tumble in. */
502
  MPI_Status stats[6 * nr_nodes];
503
  int res;
504
505
  if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
    for (int k = 0; k < 6 * nr_nodes; k++) {
506
507
508
      char buff[MPI_MAX_ERROR_STRING];
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
509
    }
510
511
    error("Failed during waitall for part data.");
  }
512

513
514
515
516
517
518
519
520
521
522
523
524
  /* We now need to restore the part<->gpart links */
  size_t offset_parts = 0, offset_gparts = 0;
  for (int node = 0; node < nr_nodes; ++node) {

    const int ind_recv = node * nr_nodes + nodeID;
    const size_t count_parts = counts[ind_recv];
    const size_t count_gparts = g_counts[ind_recv];

    /* Loop over the gparts received from that node */
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) {

      /* Does this gpart have a partner ? */
525
      if (gparts_new[k].id_or_neg_offset <= 0) {
526

Matthieu Schaller's avatar
Style    
Matthieu Schaller committed
527
528
        const ptrdiff_t partner_index =
            offset_parts - gparts_new[k].id_or_neg_offset;
529
530

        /* Re-link */
531
        gparts_new[k].id_or_neg_offset = -partner_index;
Pedro Gonnet's avatar
Pedro Gonnet committed
532
        parts_new[partner_index].gpart = &gparts_new[k];
533
534
535
536
537
538
539
      }
    }

    offset_parts += count_parts;
    offset_gparts += count_gparts;
  }

540
#ifdef SWIFT_DEBUG_CHECKS
541
  /* Verify that all parts are in the right place. */
542
543
544
545
546
547
548
  for (int k = 0; k < nr_parts; k++) {
    int cid = cell_getid(cdim, parts_new[k].x[0] * ih[0],
                         parts_new[k].x[1] * ih[1], parts_new[k].x[2] * ih[2]);
    if (cells[cid].nodeID != nodeID)
      error("Received particle (%i) that does not belong here (nodeID=%i).", k,
            cells[cid].nodeID);
  }
549

550
  /* Verify that the links are correct */
551
  for (size_t k = 0; k < nr_gparts; ++k) {
552

553
    if (gparts_new[k].id_or_neg_offset <= 0) {
554

555
      struct part *part = &parts_new[-gparts_new[k].id_or_neg_offset];
556

557
      if (part->gpart != &gparts_new[k]) error("Linking problem !");
558

559
      if (gparts_new[k].x[0] != part->x[0] ||
560
          gparts_new[k].x[1] != part->x[1] || gparts_new[k].x[2] != part->x[2])
561
562
563
564
        error("Linked particles are not at the same position !");
    }
  }
  for (size_t k = 0; k < nr_parts; ++k) {
Matthieu Schaller's avatar
Matthieu Schaller committed
565

566
567
    if (parts_new[k].gpart != NULL &&
        parts_new[k].gpart->id_or_neg_offset != -k) {
568
      error("Linking problem !");
569
570
571
    }
  }
#endif
572
573
574
575

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
576
  free(gparts);
577
578
  s->parts = parts_new;
  s->xparts = xparts_new;
579
  s->gparts = gparts_new;
580
  s->nr_parts = nr_parts;
581
582
583
  s->nr_gparts = nr_gparts;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;
584

585
  /* Clean up the temporary stuff. */
586
587
588
589
  free(reqs);
  free(counts);
  free(dest);

590
591
592
593
594
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
595
596
    message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID,
            nr_parts, nr_gparts, my_cells);
597
598
  }

599
600
601
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
602
#else
603
  error("SWIFT was not compiled with MPI support.");
604
605
#endif
}
606

607
/**
608
 * @brief Repartition the cells amongst the nodes.
609
610
611
 *
 * @param e The #engine.
 */
612
void engine_repartition(struct engine *e) {
613
614
615

#if defined(WITH_MPI) && defined(HAVE_METIS)

616
617
  ticks tic = getticks();

618
  /* Clear the repartition flag. */
619
  enum repartition_type reparttype = e->forcerepart;
620
  e->forcerepart = REPART_NONE;
621

622
623
  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
624
  if (e->nr_nodes == 1) return;
625

626
627
628
  /* Do the repartitioning. */
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);
629
630
631
632

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
633
     the parts array, and emitting the sends and receives.
634
635
636
637
638
639
640
641
642
643
644
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

645
646
647
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
648
#else
649
  error("SWIFT was not compiled with MPI and METIS support.");
650
#endif
651
}
652

653
654
655
656
657
658
659
660
/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
 */
661
662
void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {
663

664
665
666
667
668
669
670
671
672
673
  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}
674

675
676
677
678
/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
679
 * @param ci The sending #cell.
680
681
682
 * @param cj Dummy cell containing the nodeID of the receiving node.
 * @param t_xv The send_xv #task, if it has already been created.
 * @param t_rho The send_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
683
 * @param t_ti The send_ti #task, if required and has already been created.
684
 */
685
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj,
686
687
                          struct task *t_xv, struct task *t_rho,
                          struct task *t_ti) {
688

689
#ifdef WITH_MPI
690
691
  struct link *l = NULL;
  struct scheduler *s = &e->sched;
692
  const int nodeID = cj->nodeID;
693

694
695
  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
696
697
    if (l->t->ci->nodeID == nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == nodeID))
698
      break;
699

700
701
  /* If so, attach send tasks. */
  if (l != NULL) {
702

703
704
705
    /* Create the tasks and their dependencies? */
    if (t_xv == NULL) {
      t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
706
                               3 * ci->tag, 0, ci, cj, 0);
707
      t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
708
                                3 * ci->tag + 1, 0, ci, cj, 0);
709
      if (!(e->policy & engine_policy_fixdt))
710
        t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend,
711
                                 3 * ci->tag + 2, 0, ci, cj, 0);
712

713
714
      /* The send_rho task depends on the cell's ghost task. */
      scheduler_addunlock(s, ci->super->ghost, t_rho);
715

716
717
      /* The send_rho task should unlock the super-cell's kick task. */
      scheduler_addunlock(s, t_rho, ci->super->kick);
718

719
720
      /* The send_xv task should unlock the super-cell's ghost task. */
      scheduler_addunlock(s, t_xv, ci->super->ghost);
721

722
      /* The super-cell's kick task should unlock the send_ti task. */
723
      if (t_ti != NULL) scheduler_addunlock(s, ci->super->kick, t_ti);
724
    }
725

726
727
728
    /* Add them to the local cell. */
    ci->send_xv = engine_addlink(e, ci->send_xv, t_xv);
    ci->send_rho = engine_addlink(e, ci->send_rho, t_rho);
729
    if (t_ti != NULL) ci->send_ti = engine_addlink(e, ci->send_ti, t_ti);
730
  }
731

732
  /* Recurse? */
733
  if (ci->split)
734
    for (int k = 0; k < 8; k++)
735
      if (ci->progeny[k] != NULL)
736
        engine_addtasks_send(e, ci->progeny[k], cj, t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
737
738
739
740

#else
  error("SWIFT was not compiled with MPI support.");
#endif
741
}
742
743
744
745
746

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
747
 * @param c The foreign #cell.
748
749
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
750
 * @param t_ti The recv_ti #task, if required and has already been created.
751
 */
752
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
753
                          struct task *t_rho, struct task *t_ti) {
754

755
#ifdef WITH_MPI
756
  struct scheduler *s = &e->sched;
757

758
759
760
761
  /* Do we need to construct a recv task?
     Note that since c is a foreign cell, all its density tasks will involve
     only the current rank, and thus we don't have to check them.*/
  if (t_xv == NULL && c->density != NULL) {
762

763
    /* Create the tasks. */
764
    t_xv = scheduler_addtask(s, task_type_recv, task_subtype_none, 3 * c->tag,
765
766
                             0, c, NULL, 0);
    t_rho = scheduler_addtask(s, task_type_recv, task_subtype_none,
767
                              3 * c->tag + 1, 0, c, NULL, 0);
768
    if (!(e->policy & engine_policy_fixdt))
769
      t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend,
770
                               3 * c->tag + 2, 0, c, NULL, 0);
771
  }
772
773
  c->recv_xv = t_xv;
  c->recv_rho = t_rho;
774
  c->recv_ti = t_ti;
775

776
777
778
779
780
  /* Add dependencies. */
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
781
  for (struct link *l = c->force; l != NULL; l = l->next) {
782
    scheduler_addunlock(s, t_rho, l->t);
783
    if (t_ti != NULL) scheduler_addunlock(s, l->t, t_ti);
784
  }
785
786
787
788
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);

  /* Recurse? */
  if (c->split)
789
    for (int k = 0; k < 8; k++)
790
      if (c->progeny[k] != NULL)
791
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
792
793
794
795

#else
  error("SWIFT was not compiled with MPI support.");
#endif
796
}
797
798
799
800
801
802

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
803
void engine_exchange_cells(struct engine *e) {
804
805
806

#ifdef WITH_MPI

807
808
  struct space *s = e->s;
  struct cell *cells = s->cells;
809
810
  const int nr_cells = s->nr_cells;
  const int nr_proxies = e->nr_proxies;
811
812
813
814
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;
815
  ticks tic = getticks();
816
817
818

  /* Run through the cells and get the size of the ones that will be sent off.
   */
819
820
821
  int count_out = 0;
  for (int k = 0; k < nr_cells; k++) {
    offset[k] = count_out;
822
    if (cells[k].sendto)
823
      count_out += (cells[k].pcell_size = cell_getsize(&cells[k]));
824
  }
825

826
  /* Allocate the pcells. */
827
  struct pcell *pcells;
828
829
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count_out)) ==
      NULL)
830
831
832
833
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
834
  for (int k = 0; k < nr_cells; k++)
835
836
837
838
839
840
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
841
  for (int k = 0; k < nr_proxies; k++) {
842
843
844
845
846
847
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
848
  for (int k = 0; k < nr_proxies; k++) {
849
    int pid = MPI_UNDEFINED;
850
851
852
853
854
855
856
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_cells_exch2(&e->proxies[pid]);
  }

857
  /* Wait for all the sends to have finished too. */
858
859
860
861
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Set the requests for the cells. */
862
  for (int k = 0; k < nr_proxies; k++) {
863
864
865
866
867
    reqs_in[k] = e->proxies[k].req_cells_in;
    reqs_out[k] = e->proxies[k].req_cells_out;
  }

  /* Wait for each pcell array to come in from the proxies. */
868
  for (int k = 0; k < nr_proxies; k++) {
869
    int pid = MPI_UNDEFINED;
870
871
872
873
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "cell data from proxy %i has arrived." , pid );
874
    for (int count = 0, j = 0; j < e->proxies[pid].nr_cells_in; j++)
875
876
877
878
      count += cell_unpack(&e->proxies[pid].pcells_in[count],
                           e->proxies[pid].cells_in[j], e->s);
  }

879
  /* Wait for all the sends to have finished too. */
880
881
882
883
884
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Count the number of particles we need to import and re-allocate
     the buffer if needed. */
885
  int count_parts_in = 0, count_gparts_in = 0;
886
  for (int k = 0; k < nr_proxies; k++)
887
888
889
890
891
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
      count_parts_in += e->proxies[k].cells_in[j]->count;
      count_gparts_in += e->proxies[k].cells_in[j]->gcount;
    }
  if (count_parts_in > s->size_parts_foreign) {
892
    if (s->parts_foreign != NULL) free(s->parts_foreign);
893
    s->size_parts_foreign = 1.1 * count_parts_in;
894
895
896
897
    if (posix_memalign((void **)&s->parts_foreign, part_align,
                       sizeof(struct part) * s->size_parts_foreign) != 0)
      error("Failed to allocate foreign part data.");
  }
898
899
900
901
902
903
904
  if (count_gparts_in > s->size_gparts_foreign) {
    if (s->gparts_foreign != NULL) free(s->gparts_foreign);
    s->size_gparts_foreign = 1.1 * count_gparts_in;
    if (posix_memalign((void **)&s->gparts_foreign, gpart_align,
                       sizeof(struct gpart) * s->size_gparts_foreign) != 0)
      error("Failed to allocate foreign gpart data.");
  }
905