engine.c 117 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
48
#include "engine.h"
49
50

/* Local headers. */
51
#include "atomic.h"
52
#include "cell.h"
53
#include "clocks.h"
54
55
#include "cycle.h"
#include "debug.h"
56
#include "error.h"
57
#include "hydro.h"
58
#include "minmax.h"
59
#include "parallel_io.h"
60
#include "part.h"
61
#include "partition.h"
62
63
#include "proxy.h"
#include "runner.h"
64
65
#include "serial_io.h"
#include "single_io.h"
66
#include "timers.h"
67
#include "tools.h"
68
#include "units.h"
69
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
70

Pedro Gonnet's avatar
Pedro Gonnet committed
71
72
73
74
75
76
77
78
79
80
81
82
83
const char *engine_policy_names[13] = {"none",
                                       "rand",
                                       "steal",
                                       "keep",
                                       "block",
                                       "fix_dt",
                                       "cpu_tight",
                                       "mpi",
                                       "numa_affinity",
                                       "hydro",
                                       "self_gravity",
                                       "external_gravity",
                                       "cosmology_integration"};
Pedro Gonnet's avatar
Pedro Gonnet committed
84

85
86
87
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

88
89
90
91
92
#ifdef HAVE_SETAFFINITY
/** The initial affinity of the main thread (set by engin_pin()) */
static cpu_set_t entry_affinity;
#endif

93
94
95
96
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
97
 * @param l A pointer to the #link, will be modified atomically.
98
99
100
101
102
 * @param t The #task.
 *
 * @return The new #link pointer.
 */

103
void engine_addlink(struct engine *e, struct link **l, struct task *t) {
104

105
  /* Get the next free link. */
106
107
108
109
110
  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
111

112
  /* Set it atomically. */
113
  res->t = t;
114
  res->next = atomic_swap(l, res);
115
}
116
117

/**
118
119
 * @brief Generate the gravity hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
Matthieu Schaller's avatar
Matthieu Schaller committed
120
121
 *
 * Tasks are only created here. The dependencies will be added later on.
122
123
124
125
126
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
127
128
void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c,
                                            struct cell *super) {
129
130

  struct scheduler *s = &e->sched;
131
132
133
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity;
134
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;
135

136
  /* Is this the super-cell? */
Matthieu Schaller's avatar
Matthieu Schaller committed
137
  if (super == NULL && (c->grav != NULL || (!c->split && c->gcount > 0))) {
138

139
    /* This is the super cell, i.e. the first with gravity tasks attached. */
140
141
142
143
144
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
145
      /* Add the init task. */
146
147
148
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
149

150
151
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
152
153
154
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
155
      } else {
156
157
158
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
159
      }
Tom Theuns's avatar
Tom Theuns committed
160

161
162
163
164
165
      if (is_with_external_gravity)
        c->grav_external = scheduler_addtask(
            s, task_type_grav_external, task_subtype_none, 0, 0, c, NULL, 0);
    }
  }
166

167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_gravity_hierarchical_tasks(e, c->progeny[k], super);
}

/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
void engine_make_hydro_hierarchical_tasks(struct engine *e, struct cell *c,
                                          struct cell *super) {

  struct scheduler *s = &e->sched;
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;

  /* Is this the super-cell? */
  if (super == NULL && (c->density != NULL || (c->count > 0 && !c->split))) {
195

196
    /* This is the super cell, i.e. the first with density tasks attached. */
197
198
199
200
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {
201

Matthieu Schaller's avatar
Matthieu Schaller committed
202
      /* Add the init task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
203
204
205
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
206

207
208
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
209
210
211
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
212
      } else {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
213
214
215
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
216
      }
Tom Theuns's avatar
Tom Theuns committed
217

218
219
220
      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
221
    }
222
  }
223

224
225
226
227
228
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
229
    for (int k = 0; k < 8; k++)
230
      if (c->progeny[k] != NULL)
231
        engine_make_hydro_hierarchical_tasks(e, c->progeny[k], super);
232
}
233

234
/**
235
 * @brief Redistribute the particles amongst the nodes according
236
237
 *      to their cell's node IDs.
 *
238
239
240
241
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
242
243
244
245
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
246
247
 *
 *
248
249
 * @param e The #engine.
 */
250
void engine_redistribute(struct engine *e) {
251

252
#ifdef WITH_MPI
253

254
255
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
256
257
  struct space *s = e->s;
  struct cell *cells = s->cells;
258
  const int nr_cells = s->nr_cells;
259
  const int *cdim = s->cdim;
260
  const double iwidth[3] = {s->iwidth[0], s->iwidth[1], s->iwidth[2]};
261
262
263
264
  const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
  struct part *parts = s->parts;
  struct xpart *xparts = s->xparts;
  struct gpart *gparts = s->gparts;
265
  ticks tic = getticks();
266

267
268
269
  /* Allocate temporary arrays to store the counts of particles to be sent
     and the destination of each particle */
  int *counts, *g_counts;
270
  if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
271
    error("Failed to allocate count temporary buffer.");
272
  if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
273
    error("Failed to allocate gcount temporary buffer.");
274
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
275
276
  bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);

277
  /* Allocate the destination index arrays. */
278
279
  int *dest, *g_dest;
  if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
280
    error("Failed to allocate dest temporary buffer.");
281
  if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL)
282
283
    error("Failed to allocate g_dest temporary buffer.");

284
  /* Get destination of each particle */
285
  for (size_t k = 0; k < s->nr_parts; k++) {
286
287

    /* Periodic boundary conditions */
288
    for (int j = 0; j < 3; j++) {
289
290
291
292
293
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
James Willis's avatar
James Willis committed
294
295
296
    const int cid =
        cell_getid(cdim, parts[k].x[0] * iwidth[0], parts[k].x[1] * iwidth[1],
                   parts[k].x[2] * iwidth[2]);
297
298
299
300
301
302
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            parts[k].x[0], parts[k].x[1], parts[k].x[2]);
#endif

303
    dest[k] = cells[cid].nodeID;
304
305

    /* The counts array is indexed as count[from * nr_nodes + to]. */
306
307
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }
308
309

  /* Sort the particles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
310
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
311

312
  /* We need to re-link the gpart partners of parts. */
313
314
315
316
317
318
  if (s->nr_parts > 0) {
    int current_dest = dest[0];
    size_t count_this_dest = 0;
    for (size_t k = 0; k < s->nr_parts; ++k) {
      if (s->parts[k].gpart != NULL) {

319
320
321
322
323
        /* As the addresses will be invalidated by the communications, we will
         * instead store the absolute index from the start of the sub-array of
         * particles to be sent to a given node.
         * Recall that gparts without partners have a negative id.
         * We will restore the pointers on the receiving node later on. */
324
325
326
327
        if (dest[k] != current_dest) {
          current_dest = dest[k];
          count_this_dest = 0;
        }
328

329
#ifdef SWIFT_DEBUG_CHECKS
Matthieu Schaller's avatar
Matthieu Schaller committed
330
        if (s->parts[k].gpart->id_or_neg_offset >= 0)
331
332
          error("Trying to link a partnerless gpart !");
#endif
333

334
        s->parts[k].gpart->id_or_neg_offset = -count_this_dest;
335
        count_this_dest++;
336
337
338
339
340
      }
    }
  }

  /* Get destination of each g-particle */
341
  for (size_t k = 0; k < s->nr_gparts; k++) {
342
343

    /* Periodic boundary conditions */
344
    for (int j = 0; j < 3; j++) {
345
346
347
348
      if (gparts[k].x[j] < 0.0)
        gparts[k].x[j] += dim[j];
      else if (gparts[k].x[j] >= dim[j])
        gparts[k].x[j] -= dim[j];
349
    }
James Willis's avatar
James Willis committed
350
351
352
    const int cid =
        cell_getid(cdim, gparts[k].x[0] * iwidth[0], gparts[k].x[1] * iwidth[1],
                   gparts[k].x[2] * iwidth[2]);
353
354
355
356
357
358
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            gparts[k].x[0], gparts[k].x[1], gparts[k].x[2]);
#endif

359
    g_dest[k] = cells[cid].nodeID;
360
361

    /* The counts array is indexed as count[from * nr_nodes + to]. */
362
    g_counts[nodeID * nr_nodes + g_dest[k]] += 1;
363
  }
364
365

  /* Sort the gparticles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
366
  space_gparts_sort(s, g_dest, s->nr_gparts, 0, nr_nodes - 1, e->verbose);
367

368
369
370
371
372
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

373
374
375
376
377
378
  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Each node knows how many parts and gparts will be transferred to every
379
     other node. We can start preparing to receive data */
380
381
382

  /* Get the new number of parts and gparts for this node */
  size_t nr_parts = 0, nr_gparts = 0;
383
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
384
385
386
387
  for (int k = 0; k < nr_nodes; k++)
    nr_gparts += g_counts[k * nr_nodes + nodeID];

  /* Allocate the new arrays with some extra margin */
388
  struct part *parts_new = NULL;
389
  struct xpart *xparts_new = NULL;
390
  struct gpart *gparts_new = NULL;
391
  if (posix_memalign((void **)&parts_new, part_align,
392
393
                     sizeof(struct part) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
394
    error("Failed to allocate new part data.");
395
396
397
398
399
400
401
402
403
404
  if (posix_memalign((void **)&xparts_new, xpart_align,
                     sizeof(struct xpart) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new xpart data.");
  if (posix_memalign((void **)&gparts_new, gpart_align,
                     sizeof(struct gpart) * nr_gparts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new gpart data.");

  /* Prepare MPI requests for the asynchronous communications */
405
  MPI_Request *reqs;
406
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
407
408
      NULL)
    error("Failed to allocate MPI request list.");
409
410
411
412
413
414
415
416
417
418
419
420
  for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

  /* Emit the sends and recvs for the particle and gparticle data. */
  size_t offset_send = 0, offset_recv = 0;
  size_t g_offset_send = 0, g_offset_recv = 0;
  for (int k = 0; k < nr_nodes; k++) {

    /* Indices in the count arrays of the node of interest */
    const int ind_send = nodeID * nr_nodes + k;
    const int ind_recv = k * nr_nodes + nodeID;

    /* Are we sending any part/xpart ? */
421
    if (counts[ind_send] > 0) {
422

423
      /* message("Sending %d part to node %d", counts[ind_send], k); */
Matthieu Schaller's avatar
Matthieu Schaller committed
424

425
      /* If the send is to the same node, just copy */
426
427
428
429
430
431
432
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];
433
434

        /* Else, emit some communications */
435
      } else {
436
437
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type,
                      k, 3 * ind_send + 0, MPI_COMM_WORLD,
438
439
                      &reqs[6 * k]) != MPI_SUCCESS)
          error("Failed to isend parts to node %i.", k);
440
441
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type,
                      k, 3 * ind_send + 1, MPI_COMM_WORLD,
442
443
                      &reqs[6 * k + 1]) != MPI_SUCCESS)
          error("Failed to isend xparts to node %i.", k);
444
445
        offset_send += counts[ind_send];
      }
446
    }
447
448
449
450

    /* Are we sending any gpart ? */
    if (g_counts[ind_send] > 0) {

451
      /* message("Sending %d gpart to node %d", g_counts[ind_send], k); */
452

453
454
      /* If the send is to the same node, just copy */
      if (k == nodeID) {
455
        memcpy(&gparts_new[g_offset_recv], &s->gparts[g_offset_send],
456
457
458
459
460
461
462
               sizeof(struct gpart) * g_counts[ind_recv]);
        g_offset_send += g_counts[ind_send];
        g_offset_recv += g_counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
463
                      gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
464
465
                      &reqs[6 * k + 2]) != MPI_SUCCESS)
          error("Failed to isend gparts to node %i.", k);
466
        g_offset_send += g_counts[ind_send];
467
468
469
470
471
472
      }
    }

    /* Now emit the corresponding Irecv() */

    /* Are we receiving any part/xpart from this node ? */
473
    if (k != nodeID && counts[ind_recv] > 0) {
474
475
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], part_mpi_type, k,
                    3 * ind_recv + 0, MPI_COMM_WORLD,
476
477
                    &reqs[6 * k + 3]) != MPI_SUCCESS)
        error("Failed to emit irecv of parts from node %i.", k);
478
479
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], xpart_mpi_type,
                    k, 3 * ind_recv + 1, MPI_COMM_WORLD,
480
481
                    &reqs[6 * k + 4]) != MPI_SUCCESS)
        error("Failed to emit irecv of xparts from node %i.", k);
482
      offset_recv += counts[ind_recv];
483
    }
484
485
486
487

    /* Are we receiving any gpart from this node ? */
    if (k != nodeID && g_counts[ind_recv] > 0) {
      if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
488
                    gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
489
490
491
492
                    &reqs[6 * k + 5]) != MPI_SUCCESS)
        error("Failed to emit irecv of gparts from node %i.", k);
      g_offset_recv += g_counts[ind_recv];
    }
493
  }
494

495
  /* Wait for all the sends and recvs to tumble in. */
496
  MPI_Status stats[6 * nr_nodes];
497
  int res;
498
499
  if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
    for (int k = 0; k < 6 * nr_nodes; k++) {
500
501
502
      char buff[MPI_MAX_ERROR_STRING];
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
503
    }
504
505
    error("Failed during waitall for part data.");
  }
506

507
508
509
510
511
512
513
514
515
516
517
518
  /* We now need to restore the part<->gpart links */
  size_t offset_parts = 0, offset_gparts = 0;
  for (int node = 0; node < nr_nodes; ++node) {

    const int ind_recv = node * nr_nodes + nodeID;
    const size_t count_parts = counts[ind_recv];
    const size_t count_gparts = g_counts[ind_recv];

    /* Loop over the gparts received from that node */
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) {

      /* Does this gpart have a partner ? */
519
      if (gparts_new[k].id_or_neg_offset <= 0) {
520

Matthieu Schaller's avatar
Style    
Matthieu Schaller committed
521
522
        const ptrdiff_t partner_index =
            offset_parts - gparts_new[k].id_or_neg_offset;
523
524

        /* Re-link */
525
        gparts_new[k].id_or_neg_offset = -partner_index;
Pedro Gonnet's avatar
Pedro Gonnet committed
526
        parts_new[partner_index].gpart = &gparts_new[k];
527
528
529
530
531
532
533
      }
    }

    offset_parts += count_parts;
    offset_gparts += count_gparts;
  }

534
#ifdef SWIFT_DEBUG_CHECKS
535
  /* Verify that all parts are in the right place. */
536
  for (int k = 0; k < nr_parts; k++) {
537
    int cid = cell_getid(cdim, parts_new[k].x[0] * iwidth[0],
James Willis's avatar
James Willis committed
538
539
                         parts_new[k].x[1] * iwidth[1],
                         parts_new[k].x[2] * iwidth[2]);
540
541
542
543
    if (cells[cid].nodeID != nodeID)
      error("Received particle (%i) that does not belong here (nodeID=%i).", k,
            cells[cid].nodeID);
  }
544

545
546
547
  /* Verify that the links are correct */
  for (size_t k = 0; k < nr_gparts; ++k) {

548
    if (gparts_new[k].id_or_neg_offset <= 0) {
549

550
      struct part *part = &parts_new[-gparts_new[k].id_or_neg_offset];
551

552
      if (part->gpart != &gparts_new[k]) error("Linking problem !");
553

554
      if (gparts_new[k].x[0] != part->x[0] ||
555
          gparts_new[k].x[1] != part->x[1] || gparts_new[k].x[2] != part->x[2])
556
        error("Linked particles are not at the same position !");
557
558
    }
  }
559
  for (size_t k = 0; k < nr_parts; ++k) {
Matthieu Schaller's avatar
Matthieu Schaller committed
560

561
562
    if (parts_new[k].gpart != NULL &&
        parts_new[k].gpart->id_or_neg_offset != -k) {
563
      error("Linking problem !");
564
565
    }
  }
566
#endif
567
568
569
570

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
571
  free(gparts);
572
573
  s->parts = parts_new;
  s->xparts = xparts_new;
574
  s->gparts = gparts_new;
575
  s->nr_parts = nr_parts;
576
577
578
  s->nr_gparts = nr_gparts;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;
579

580
  /* Clean up the temporary stuff. */
581
582
583
584
  free(reqs);
  free(counts);
  free(dest);

585
586
587
588
589
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
590
591
    message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID,
            nr_parts, nr_gparts, my_cells);
592
593
  }

594
595
596
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
597
#else
598
  error("SWIFT was not compiled with MPI support.");
599
600
#endif
}
601

602
/**
603
 * @brief Repartition the cells amongst the nodes.
604
605
606
 *
 * @param e The #engine.
 */
607
void engine_repartition(struct engine *e) {
608
609
610

#if defined(WITH_MPI) && defined(HAVE_METIS)

611
612
  ticks tic = getticks();

613
  /* Clear the repartition flag. */
614
  enum repartition_type reparttype = e->forcerepart;
615
  e->forcerepart = REPART_NONE;
616

617
618
  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
619
  if (e->nr_nodes == 1) return;
620

621
622
623
  /* Do the repartitioning. */
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);
624
625
626
627

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
628
     the parts array, and emitting the sends and receives.
629
630
631
632
633
634
635
636
637
638
639
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

640
641
642
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
643
#else
644
  error("SWIFT was not compiled with MPI and METIS support.");
645
#endif
646
}
647

648
649
650
651
652
653
654
655
/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
 */
656
657
void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {
658

659
660
661
662
663
664
665
666
667
668
  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}
669

670
671
672
673
/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
674
 * @param ci The sending #cell.
675
676
677
 * @param cj Dummy cell containing the nodeID of the receiving node.
 * @param t_xv The send_xv #task, if it has already been created.
 * @param t_rho The send_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
678
 * @param t_ti The send_ti #task, if required and has already been created.
679
 */
680
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj,
681
682
                          struct task *t_xv, struct task *t_rho,
                          struct task *t_ti) {
683

684
#ifdef WITH_MPI
685
686
  struct link *l = NULL;
  struct scheduler *s = &e->sched;
687
  const int nodeID = cj->nodeID;
688

689
690
  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
691
692
    if (l->t->ci->nodeID == nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == nodeID))
693
      break;
694

695
696
  /* If so, attach send tasks. */
  if (l != NULL) {
697

698
699
700
    /* Create the tasks and their dependencies? */
    if (t_xv == NULL) {
      t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
701
                               3 * ci->tag, 0, ci, cj, 0);
702
      t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
703
                                3 * ci->tag + 1, 0, ci, cj, 0);
704
      if (!(e->policy & engine_policy_fixdt))
705
        t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend,
706
                                 3 * ci->tag + 2, 0, ci, cj, 0);
707

708
709
      /* The send_rho task depends on the cell's ghost task. */
      scheduler_addunlock(s, ci->super->ghost, t_rho);
710

711
712
      /* The send_rho task should unlock the super-cell's kick task. */
      scheduler_addunlock(s, t_rho, ci->super->kick);
713

714
715
      /* The send_xv task should unlock the super-cell's ghost task. */
      scheduler_addunlock(s, t_xv, ci->super->ghost);
716

717
      /* The super-cell's kick task should unlock the send_ti task. */
718
      if (t_ti != NULL) scheduler_addunlock(s, ci->super->kick, t_ti);
719
    }
720

721
    /* Add them to the local cell. */
722
723
724
    engine_addlink(e, &ci->send_xv, t_xv);
    engine_addlink(e, &ci->send_rho, t_rho);
    if (t_ti != NULL) engine_addlink(e, &ci->send_ti, t_ti);
725
  }
726

727
  /* Recurse? */
728
  if (ci->split)
729
    for (int k = 0; k < 8; k++)
730
      if (ci->progeny[k] != NULL)
731
        engine_addtasks_send(e, ci->progeny[k], cj, t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
732
733
734
735

#else
  error("SWIFT was not compiled with MPI support.");
#endif
736
}
737
738
739
740
741

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
742
 * @param c The foreign #cell.
743
744
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
745
 * @param t_ti The recv_ti #task, if required and has already been created.
746
747
 */

748
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
749
                          struct task *t_rho, struct task *t_ti) {
750

751
#ifdef WITH_MPI
752
  struct scheduler *s = &e->sched;
753

754
755
756
757
  /* Do we need to construct a recv task?
     Note that since c is a foreign cell, all its density tasks will involve
     only the current rank, and thus we don't have to check them.*/
  if (t_xv == NULL && c->density != NULL) {
758

759
    /* Create the tasks. */
760
    t_xv = scheduler_addtask(s, task_type_recv, task_subtype_none, 3 * c->tag,
761
762
                             0, c, NULL, 0);
    t_rho = scheduler_addtask(s, task_type_recv, task_subtype_none,
763
                              3 * c->tag + 1, 0, c, NULL, 0);
764
    if (!(e->policy & engine_policy_fixdt))
765
      t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend,
766
                               3 * c->tag + 2, 0, c, NULL, 0);
767
  }
768
769
  c->recv_xv = t_xv;
  c->recv_rho = t_rho;
770
  c->recv_ti = t_ti;
771

772
773
774
775
776
  /* Add dependencies. */
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
777
  for (struct link *l = c->force; l != NULL; l = l->next) {
778
    scheduler_addunlock(s, t_rho, l->t);
779
    if (t_ti != NULL) scheduler_addunlock(s, l->t, t_ti);
780
  }
781
782
783
784
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);

  /* Recurse? */
  if (c->split)
785
    for (int k = 0; k < 8; k++)
786
      if (c->progeny[k] != NULL)
787
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
788
789
790
791

#else
  error("SWIFT was not compiled with MPI support.");
#endif
792
}
793
794
795
796
797
798

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
799
void engine_exchange_cells(struct engine *e) {
800
801
802

#ifdef WITH_MPI

803
804
  struct space *s = e->s;
  struct cell *cells = s->cells;
805
806
  const int nr_cells = s->nr_cells;
  const int nr_proxies = e->nr_proxies;
807
808
809
810
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;
811
  ticks tic = getticks();
812
813
814

  /* Run through the cells and get the size of the ones that will be sent off.
   */
815
816
817
  int count_out = 0;
  for (int k = 0; k < nr_cells; k++) {
    offset[k] = count_out;
818
    if (cells[k].sendto)
819
      count_out += (cells[k].pcell_size = cell_getsize(&cells[k]));
820
  }
821

822
  /* Allocate the pcells. */
823
  struct pcell *pcells;
824
825
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count_out)) ==
      NULL)
826
827
828
829
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
830
  for (int k = 0; k < nr_cells; k++)
831
832
833
834
835
836
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
837
  for (int k = 0; k < nr_proxies; k++) {
838
839
840
841
842
843
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
844
  for (int k = 0; k < nr_proxies; k++) {
845
    int pid = MPI_UNDEFINED;
846
847
848
849
850
851
852
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_cells_exch2(&e->proxies[pid]);
  }

853
  /* Wait for all the sends to have finished too. */
854
855
856
857
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Set the requests for the cells. */
858
  for (int k = 0; k < nr_proxies; k++) {
859
860
861
862
863
    reqs_in[k] = e->proxies[k].req_cells_in;
    reqs_out[k] = e->proxies[k].req_cells_out;
  }

  /* Wait for each pcell array to come in from the proxies. */
864
  for (int k = 0; k < nr_proxies; k++) {
865
    int pid = MPI_UNDEFINED;
866
867
868
869
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "cell data from proxy %i has arrived." , pid );
870
    for (int count = 0, j = 0; j < e->proxies[pid].nr_cells_in; j++)
871
872
873
874
      count += cell_unpack(&e->proxies[pid].pcells_in[count],
                           e->proxies[pid].cells_in[j], e->s);
  }

875
  /* Wait for all the sends to have finished too. */
876
877
878
879
880
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Count the number of particles we need to import and re-allocate
     the buffer if needed. */
881
  int count_parts_in = 0, count_gparts_in = 0;
882
  for (int k = 0; k < nr_proxies; k++)
883
884
885
886
887
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
      count_parts_in += e->proxies[k].cells_in[j]->count;
      count_gparts_in += e->proxies[k].cells_in[j]->gcount;
    }
  if (count_parts_in > s->size_parts_foreign) {
888
    if (s->parts_foreign != NULL) free(s->parts_foreign);