engine.c 113 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
48
#include "engine.h"
49
50

/* Local headers. */
51
#include "atomic.h"
52
#include "cell.h"
53
#include "clocks.h"
54
55
#include "cycle.h"
#include "debug.h"
56
#include "error.h"
57
#include "hydro.h"
58
#include "minmax.h"
59
#include "parallel_io.h"
60
#include "part.h"
61
#include "partition.h"
62
63
#include "proxy.h"
#include "runner.h"
64
65
#include "serial_io.h"
#include "single_io.h"
66
#include "timers.h"
67
#include "tools.h"
68
#include "units.h"
69
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
70

Pedro Gonnet's avatar
Pedro Gonnet committed
71
72
73
74
75
76
77
78
79
80
81
82
83
const char *engine_policy_names[13] = {"none",
                                       "rand",
                                       "steal",
                                       "keep",
                                       "block",
                                       "fix_dt",
                                       "cpu_tight",
                                       "mpi",
                                       "numa_affinity",
                                       "hydro",
                                       "self_gravity",
                                       "external_gravity",
                                       "cosmology_integration"};
Pedro Gonnet's avatar
Pedro Gonnet committed
84

85
86
87
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

88
89
90
91
92
#ifdef HAVE_SETAFFINITY
/** The initial affinity of the main thread (set by engin_pin()) */
static cpu_set_t entry_affinity;
#endif

93
94
95
96
97
98
99
100
101
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
 * @param l The #link.
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
102
struct link *engine_addlink(struct engine *e, struct link *l, struct task *t) {
103

104
105
106
107
108
  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
109
110
111
112
  res->next = l;
  res->t = t;
  return res;
}
113
114

/**
115
116
 * @brief Generate the gravity hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
Matthieu Schaller's avatar
Matthieu Schaller committed
117
118
 *
 * Tasks are only created here. The dependencies will be added later on.
119
120
121
122
123
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
124
125
void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c,
                                            struct cell *super) {
126
127

  struct scheduler *s = &e->sched;
128
129
130
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity;
131
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;
132

133
  /* Is this the super-cell? */
Matthieu Schaller's avatar
Matthieu Schaller committed
134
  if (super == NULL && (c->grav != NULL || (!c->split && c->gcount > 0))) {
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196

    /* This is the super cell, i.e. the first with gravity tasks attached. */
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      /* Add the init task. */
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);

      /* Add the drift task. */
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);

      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
      } else {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
      }

      if (is_with_external_gravity)
        c->grav_external = scheduler_addtask(
            s, task_type_grav_external, task_subtype_none, 0, 0, c, NULL, 0);
    }
  }

  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_gravity_hierarchical_tasks(e, c->progeny[k], super);
}

/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
void engine_make_hydro_hierarchical_tasks(struct engine *e, struct cell *c,
                                          struct cell *super) {

  struct scheduler *s = &e->sched;
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;

  /* Is this the super-cell? */
  if (super == NULL && (c->density != NULL || (c->count > 0 && !c->split))) {
197

198
    /* This is the super cell, i.e. the first with density tasks attached. */
199
200
201
202
203
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
204
      /* Add the init task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
205
206
207
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
208

Matthieu Schaller's avatar
Matthieu Schaller committed
209
      /* Add the drift task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
210
211
212
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);
213

214
215
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
216
217
218
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
219
      } else {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
220
221
222
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
223
      }
Tom Theuns's avatar
Tom Theuns committed
224

225
226
227
      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
228
229
230
231
232
233

#ifdef EXTRA_HYDRO_LOOP
      /* Generate the extra ghost task. */
      c->extra_ghost = scheduler_addtask(s, task_type_extra_ghost,
                                         task_subtype_none, 0, 0, c, NULL, 0);
#endif
234
    }
235
  }
236

237
238
239
240
241
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
242
    for (int k = 0; k < 8; k++)
243
      if (c->progeny[k] != NULL)
244
        engine_make_hydro_hierarchical_tasks(e, c->progeny[k], super);
245
}
246

247
/**
248
 * @brief Redistribute the particles amongst the nodes according
249
250
 *      to their cell's node IDs.
 *
251
252
253
254
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
255
256
257
258
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
259
260
 *
 *
261
262
 * @param e The #engine.
 */
263
void engine_redistribute(struct engine *e) {
264

265
#ifdef WITH_MPI
266

267
268
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
269
270
  struct space *s = e->s;
  struct cell *cells = s->cells;
271
  const int nr_cells = s->nr_cells;
272
  const int *cdim = s->cdim;
273
  const double iwidth[3] = {s->iwidth[0], s->iwidth[1], s->iwidth[2]};
274
275
276
277
  const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
  struct part *parts = s->parts;
  struct xpart *xparts = s->xparts;
  struct gpart *gparts = s->gparts;
278
  ticks tic = getticks();
279

280
281
282
  /* Allocate temporary arrays to store the counts of particles to be sent
     and the destination of each particle */
  int *counts, *g_counts;
283
  if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
284
    error("Failed to allocate count temporary buffer.");
285
  if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
286
    error("Failed to allocate gcount temporary buffer.");
287
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
288
289
  bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);

290
  /* Allocate the destination index arrays. */
291
292
  int *dest, *g_dest;
  if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
293
    error("Failed to allocate dest temporary buffer.");
294
  if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL)
295
296
    error("Failed to allocate g_dest temporary buffer.");

297
  /* Get destination of each particle */
298
  for (size_t k = 0; k < s->nr_parts; k++) {
299
300

    /* Periodic boundary conditions */
301
    for (int j = 0; j < 3; j++) {
302
303
304
305
306
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
James Willis's avatar
James Willis committed
307
308
309
    const int cid =
        cell_getid(cdim, parts[k].x[0] * iwidth[0], parts[k].x[1] * iwidth[1],
                   parts[k].x[2] * iwidth[2]);
310
311
312
313
314
315
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            parts[k].x[0], parts[k].x[1], parts[k].x[2]);
#endif

316
    dest[k] = cells[cid].nodeID;
317
318

    /* The counts array is indexed as count[from * nr_nodes + to]. */
319
320
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }
321
322

  /* Sort the particles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
323
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
324

325
  /* We need to re-link the gpart partners of parts. */
326
327
328
329
330
331
  if (s->nr_parts > 0) {
    int current_dest = dest[0];
    size_t count_this_dest = 0;
    for (size_t k = 0; k < s->nr_parts; ++k) {
      if (s->parts[k].gpart != NULL) {

332
333
334
335
336
        /* As the addresses will be invalidated by the communications, we will
         * instead store the absolute index from the start of the sub-array of
         * particles to be sent to a given node.
         * Recall that gparts without partners have a negative id.
         * We will restore the pointers on the receiving node later on. */
337
338
339
340
        if (dest[k] != current_dest) {
          current_dest = dest[k];
          count_this_dest = 0;
        }
341

342
#ifdef SWIFT_DEBUG_CHECKS
Matthieu Schaller's avatar
Matthieu Schaller committed
343
        if (s->parts[k].gpart->id_or_neg_offset >= 0)
344
345
          error("Trying to link a partnerless gpart !");
#endif
346

347
        s->parts[k].gpart->id_or_neg_offset = -count_this_dest;
348
        count_this_dest++;
349
350
351
352
353
      }
    }
  }

  /* Get destination of each g-particle */
354
  for (size_t k = 0; k < s->nr_gparts; k++) {
355
356

    /* Periodic boundary conditions */
357
    for (int j = 0; j < 3; j++) {
358
359
360
361
      if (gparts[k].x[j] < 0.0)
        gparts[k].x[j] += dim[j];
      else if (gparts[k].x[j] >= dim[j])
        gparts[k].x[j] -= dim[j];
362
    }
James Willis's avatar
James Willis committed
363
364
365
    const int cid =
        cell_getid(cdim, gparts[k].x[0] * iwidth[0], gparts[k].x[1] * iwidth[1],
                   gparts[k].x[2] * iwidth[2]);
366
367
368
369
370
371
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            gparts[k].x[0], gparts[k].x[1], gparts[k].x[2]);
#endif

372
    g_dest[k] = cells[cid].nodeID;
373
374

    /* The counts array is indexed as count[from * nr_nodes + to]. */
375
    g_counts[nodeID * nr_nodes + g_dest[k]] += 1;
376
  }
377
378

  /* Sort the gparticles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
379
  space_gparts_sort(s, g_dest, s->nr_gparts, 0, nr_nodes - 1, e->verbose);
380

381
382
383
384
385
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

386
387
388
389
390
391
  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Each node knows how many parts and gparts will be transferred to every
392
     other node. We can start preparing to receive data */
393
394
395

  /* Get the new number of parts and gparts for this node */
  size_t nr_parts = 0, nr_gparts = 0;
396
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
397
398
399
400
  for (int k = 0; k < nr_nodes; k++)
    nr_gparts += g_counts[k * nr_nodes + nodeID];

  /* Allocate the new arrays with some extra margin */
401
  struct part *parts_new = NULL;
402
  struct xpart *xparts_new = NULL;
403
  struct gpart *gparts_new = NULL;
404
  if (posix_memalign((void **)&parts_new, part_align,
405
406
                     sizeof(struct part) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
407
    error("Failed to allocate new part data.");
408
409
410
411
412
413
414
415
416
417
  if (posix_memalign((void **)&xparts_new, xpart_align,
                     sizeof(struct xpart) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new xpart data.");
  if (posix_memalign((void **)&gparts_new, gpart_align,
                     sizeof(struct gpart) * nr_gparts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new gpart data.");

  /* Prepare MPI requests for the asynchronous communications */
418
  MPI_Request *reqs;
419
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
420
421
      NULL)
    error("Failed to allocate MPI request list.");
422
423
424
425
426
427
428
429
430
431
432
433
  for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

  /* Emit the sends and recvs for the particle and gparticle data. */
  size_t offset_send = 0, offset_recv = 0;
  size_t g_offset_send = 0, g_offset_recv = 0;
  for (int k = 0; k < nr_nodes; k++) {

    /* Indices in the count arrays of the node of interest */
    const int ind_send = nodeID * nr_nodes + k;
    const int ind_recv = k * nr_nodes + nodeID;

    /* Are we sending any part/xpart ? */
434
    if (counts[ind_send] > 0) {
435

436
      /* message("Sending %d part to node %d", counts[ind_send], k); */
Matthieu Schaller's avatar
Matthieu Schaller committed
437

438
      /* If the send is to the same node, just copy */
439
440
441
442
443
444
445
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];
446
447

        /* Else, emit some communications */
448
      } else {
449
450
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type,
                      k, 3 * ind_send + 0, MPI_COMM_WORLD,
451
452
                      &reqs[6 * k]) != MPI_SUCCESS)
          error("Failed to isend parts to node %i.", k);
453
454
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type,
                      k, 3 * ind_send + 1, MPI_COMM_WORLD,
455
456
                      &reqs[6 * k + 1]) != MPI_SUCCESS)
          error("Failed to isend xparts to node %i.", k);
457
458
        offset_send += counts[ind_send];
      }
459
    }
460
461
462
463

    /* Are we sending any gpart ? */
    if (g_counts[ind_send] > 0) {

464
      /* message("Sending %d gpart to node %d", g_counts[ind_send], k); */
465

466
467
      /* If the send is to the same node, just copy */
      if (k == nodeID) {
468
        memcpy(&gparts_new[g_offset_recv], &s->gparts[g_offset_send],
469
470
471
472
473
474
475
               sizeof(struct gpart) * g_counts[ind_recv]);
        g_offset_send += g_counts[ind_send];
        g_offset_recv += g_counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
476
                      gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
477
478
                      &reqs[6 * k + 2]) != MPI_SUCCESS)
          error("Failed to isend gparts to node %i.", k);
479
        g_offset_send += g_counts[ind_send];
480
481
482
483
484
485
      }
    }

    /* Now emit the corresponding Irecv() */

    /* Are we receiving any part/xpart from this node ? */
486
    if (k != nodeID && counts[ind_recv] > 0) {
487
488
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], part_mpi_type, k,
                    3 * ind_recv + 0, MPI_COMM_WORLD,
489
490
                    &reqs[6 * k + 3]) != MPI_SUCCESS)
        error("Failed to emit irecv of parts from node %i.", k);
491
492
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], xpart_mpi_type,
                    k, 3 * ind_recv + 1, MPI_COMM_WORLD,
493
494
                    &reqs[6 * k + 4]) != MPI_SUCCESS)
        error("Failed to emit irecv of xparts from node %i.", k);
495
      offset_recv += counts[ind_recv];
496
    }
497
498
499
500

    /* Are we receiving any gpart from this node ? */
    if (k != nodeID && g_counts[ind_recv] > 0) {
      if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
501
                    gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
502
503
504
505
                    &reqs[6 * k + 5]) != MPI_SUCCESS)
        error("Failed to emit irecv of gparts from node %i.", k);
      g_offset_recv += g_counts[ind_recv];
    }
506
  }
507

508
  /* Wait for all the sends and recvs to tumble in. */
509
  MPI_Status stats[6 * nr_nodes];
510
  int res;
511
512
  if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
    for (int k = 0; k < 6 * nr_nodes; k++) {
513
514
515
      char buff[MPI_MAX_ERROR_STRING];
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
516
    }
517
518
    error("Failed during waitall for part data.");
  }
519

520
521
522
523
524
525
526
527
528
529
530
531
  /* We now need to restore the part<->gpart links */
  size_t offset_parts = 0, offset_gparts = 0;
  for (int node = 0; node < nr_nodes; ++node) {

    const int ind_recv = node * nr_nodes + nodeID;
    const size_t count_parts = counts[ind_recv];
    const size_t count_gparts = g_counts[ind_recv];

    /* Loop over the gparts received from that node */
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) {

      /* Does this gpart have a partner ? */
532
      if (gparts_new[k].id_or_neg_offset <= 0) {
533

Matthieu Schaller's avatar
Style    
Matthieu Schaller committed
534
535
        const ptrdiff_t partner_index =
            offset_parts - gparts_new[k].id_or_neg_offset;
536
537

        /* Re-link */
538
        gparts_new[k].id_or_neg_offset = -partner_index;
Pedro Gonnet's avatar
Pedro Gonnet committed
539
        parts_new[partner_index].gpart = &gparts_new[k];
540
541
542
543
544
545
546
      }
    }

    offset_parts += count_parts;
    offset_gparts += count_gparts;
  }

547
#ifdef SWIFT_DEBUG_CHECKS
548
  /* Verify that all parts are in the right place. */
549
  for (int k = 0; k < nr_parts; k++) {
550
    int cid = cell_getid(cdim, parts_new[k].x[0] * iwidth[0],
James Willis's avatar
James Willis committed
551
552
                         parts_new[k].x[1] * iwidth[1],
                         parts_new[k].x[2] * iwidth[2]);
553
554
555
556
    if (cells[cid].nodeID != nodeID)
      error("Received particle (%i) that does not belong here (nodeID=%i).", k,
            cells[cid].nodeID);
  }
557

558
  /* Verify that the links are correct */
559
  for (size_t k = 0; k < nr_gparts; ++k) {
560

561
    if (gparts_new[k].id_or_neg_offset <= 0) {
562

563
      struct part *part = &parts_new[-gparts_new[k].id_or_neg_offset];
564

565
      if (part->gpart != &gparts_new[k]) error("Linking problem !");
566

567
      if (gparts_new[k].x[0] != part->x[0] ||
568
          gparts_new[k].x[1] != part->x[1] || gparts_new[k].x[2] != part->x[2])
569
570
571
572
        error("Linked particles are not at the same position !");
    }
  }
  for (size_t k = 0; k < nr_parts; ++k) {
Matthieu Schaller's avatar
Matthieu Schaller committed
573

574
575
    if (parts_new[k].gpart != NULL &&
        parts_new[k].gpart->id_or_neg_offset != -k) {
576
      error("Linking problem !");
577
578
579
    }
  }
#endif
580
581
582
583

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
584
  free(gparts);
585
586
  s->parts = parts_new;
  s->xparts = xparts_new;
587
  s->gparts = gparts_new;
588
  s->nr_parts = nr_parts;
589
590
591
  s->nr_gparts = nr_gparts;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;
592

593
  /* Clean up the temporary stuff. */
594
595
596
597
  free(reqs);
  free(counts);
  free(dest);

598
599
600
601
602
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
603
604
    message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID,
            nr_parts, nr_gparts, my_cells);
605
606
  }

607
608
609
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
610
#else
611
  error("SWIFT was not compiled with MPI support.");
612
613
#endif
}
614

615
/**
616
 * @brief Repartition the cells amongst the nodes.
617
618
619
 *
 * @param e The #engine.
 */
620
void engine_repartition(struct engine *e) {
621
622
623

#if defined(WITH_MPI) && defined(HAVE_METIS)

624
625
  ticks tic = getticks();

626
  /* Clear the repartition flag. */
627
  enum repartition_type reparttype = e->forcerepart;
628
  e->forcerepart = REPART_NONE;
629

630
631
  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
632
  if (e->nr_nodes == 1) return;
633

634
635
636
  /* Do the repartitioning. */
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);
637
638
639
640

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
641
     the parts array, and emitting the sends and receives.
642
643
644
645
646
647
648
649
650
651
652
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

653
654
655
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
656
#else
657
  error("SWIFT was not compiled with MPI and METIS support.");
658
#endif
659
}
660

661
662
663
664
665
666
667
668
/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
 */
669
670
void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {
671

672
673
674
675
676
677
678
679
680
681
  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}
682

683
684
685
686
/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
687
 * @param ci The sending #cell.
688
689
690
 * @param cj Dummy cell containing the nodeID of the receiving node.
 * @param t_xv The send_xv #task, if it has already been created.
 * @param t_rho The send_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
691
 * @param t_ti The send_ti #task, if required and has already been created.
692
 */
693
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj,
694
695
                          struct task *t_xv, struct task *t_rho,
                          struct task *t_ti) {
696

697
#ifdef WITH_MPI
698
699
  struct link *l = NULL;
  struct scheduler *s = &e->sched;
700
  const int nodeID = cj->nodeID;
701

702
703
  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
704
705
    if (l->t->ci->nodeID == nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == nodeID))
706
      break;
707

708
709
  /* If so, attach send tasks. */
  if (l != NULL) {
710

711
712
713
    /* Create the tasks and their dependencies? */
    if (t_xv == NULL) {
      t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
714
                               3 * ci->tag, 0, ci, cj, 0);
715
      t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
716
                                3 * ci->tag + 1, 0, ci, cj, 0);
717
      if (!(e->policy & engine_policy_fixdt))
718
        t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend,
719
                                 3 * ci->tag + 2, 0, ci, cj, 0);
720

721
722
      /* The send_rho task depends on the cell's ghost task. */
      scheduler_addunlock(s, ci->super->ghost, t_rho);
723

724
725
      /* The send_rho task should unlock the super-cell's kick task. */
      scheduler_addunlock(s, t_rho, ci->super->kick);
726

727
728
      /* The send_xv task should unlock the super-cell's ghost task. */
      scheduler_addunlock(s, t_xv, ci->super->ghost);
729

730
      /* The super-cell's kick task should unlock the send_ti task. */
731
      if (t_ti != NULL) scheduler_addunlock(s, ci->super->kick, t_ti);
732
    }
733

734
735
736
    /* Add them to the local cell. */
    ci->send_xv = engine_addlink(e, ci->send_xv, t_xv);
    ci->send_rho = engine_addlink(e, ci->send_rho, t_rho);
737
    if (t_ti != NULL) ci->send_ti = engine_addlink(e, ci->send_ti, t_ti);
738
  }
739

740
  /* Recurse? */
741
  if (ci->split)
742
    for (int k = 0; k < 8; k++)
743
      if (ci->progeny[k] != NULL)
744
        engine_addtasks_send(e, ci->progeny[k], cj, t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
745
746
747
748

#else
  error("SWIFT was not compiled with MPI support.");
#endif
749
}
750
751
752
753
754

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
755
 * @param c The foreign #cell.
756
757
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
758
 * @param t_ti The recv_ti #task, if required and has already been created.
759
 */
760
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
761
                          struct task *t_rho, struct task *t_ti) {
762

763
#ifdef WITH_MPI
764
  struct scheduler *s = &e->sched;
765

766
767
768
769
  /* Do we need to construct a recv task?
     Note that since c is a foreign cell, all its density tasks will involve
     only the current rank, and thus we don't have to check them.*/
  if (t_xv == NULL && c->density != NULL) {
770

771
    /* Create the tasks. */
772
    t_xv = scheduler_addtask(s, task_type_recv, task_subtype_none, 3 * c->tag,
773
774
                             0, c, NULL, 0);
    t_rho = scheduler_addtask(s, task_type_recv, task_subtype_none,
775
                              3 * c->tag + 1, 0, c, NULL, 0);
776
    if (!(e->policy & engine_policy_fixdt))
777
      t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend,
778
                               3 * c->tag + 2, 0, c, NULL, 0);
779
  }
780
781
  c->recv_xv = t_xv;
  c->recv_rho = t_rho;
782
  c->recv_ti = t_ti;
783

784
785
786
787
788
  /* Add dependencies. */
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
789
  for (struct link *l = c->force; l != NULL; l = l->next) {
790
    scheduler_addunlock(s, t_rho, l->t);
791
    if (t_ti != NULL) scheduler_addunlock(s, l->t, t_ti);
792
  }
793
794
795
796
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);

  /* Recurse? */
  if (c->split)
797
    for (int k = 0; k < 8; k++)
798
      if (c->progeny[k] != NULL)
799
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
800
801
802
803

#else
  error("SWIFT was not compiled with MPI support.");
#endif
804
}
805
806
807
808
809
810

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
811
void engine_exchange_cells(struct engine *e) {
812
813
814

#ifdef WITH_MPI

815
816
  struct space *s = e->s;
  struct cell *cells = s->cells;
817
818
  const int nr_cells = s->nr_cells;
  const int nr_proxies = e->nr_proxies;
819
820
821
822
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;
823
  ticks tic = getticks();
824
825
826

  /* Run through the cells and get the size of the ones that will be sent off.
   */
827
828
829
  int count_out = 0;
  for (int k = 0; k < nr_cells; k++) {
    offset[k] = count_out;
830
    if (cells[k].sendto)
831
      count_out += (cells[k].pcell_size = cell_getsize(&cells[k]));
832
  }
833

834
  /* Allocate the pcells. */
835
  struct pcell *pcells;
836
837
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count_out)) ==
      NULL)
838
839
840
841
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
842
  for (int k = 0; k < nr_cells; k++)
843
844
845
846
847
848
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
849
  for (int k = 0; k < nr_proxies; k++) {
850
851
852
853
854
855
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
856
  for (int k = 0; k < nr_proxies; k++) {
857
    int pid = MPI_UNDEFINED;
858
859
860
861
862
863
864
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_cells_exch2(&e->proxies[pid]);
  }

865
  /* Wait for all the sends to have finished too. */
866
867
868
869
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Set the requests for the cells. */
870
  for (int k = 0; k < nr_proxies; k++) {
871
872
873
874
875
    reqs_in[k] = e->proxies[k].req_cells_in;
    reqs_out[k] = e->proxies[k].req_cells_out;
  }

  /* Wait for each pcell array to come in from the proxies. */
876
  for (int k = 0; k < nr_proxies; k++) {
877
    int pid = MPI_UNDEFINED;
878
879
880
881
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "cell data from proxy %i has arrived." , pid );
882
    for (int count = 0, j = 0; j < e->proxies[pid].nr_cells_in; j++)
883
884
885
886
      count += cell_unpack(&e->proxies[pid].pcells_in[count],
                           e->proxies[pid].cells_in[j], e->s);
  }

887
  /* Wait for all the sends to have finished too. */
888
889
890
891
892
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Count the number of particles we need to import and re-allocate
     the buffer if needed. */
893
  int count_parts_in = 0, count_gparts_in = 0;
894
  for (int k = 0; k < nr_proxies; k++)
895
896
897
898
899
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
      count_parts_in += e->proxies[k].cells_in[j]->count;
      count_gparts_in += e->proxies[k].cells_in[j]->gcount;
    }
  if (count_parts_in > s->size_parts_foreign) {
900
    if (s->parts_foreign != NULL) free(s->parts_foreign);
901
    s->size_parts_foreign = 1.1 * count_parts_in;
902
903
904
905
    if (posix_memalign((void **)&s->parts_foreign, part_align,
                       sizeof(struct part) * s->size_parts_foreign) != 0)
      error("Failed to allocate foreign part data.");
  }
906
907
908
909
910
911
912
  if (count_gparts_in > s->size_gparts_foreign) {
    if (s->gparts_foreign != NULL) free(s->gparts_foreign);
    s->size_gparts_foreign = 1.1 * count_gparts_in;
    if (posix_memalign((void **)&s->gparts_foreign, gpart_align,
                       sizeof(struct gpart) * s->size_gparts_foreign) != 0)
      error("Failed to allocate foreign gpart data.");
  }
913

914
  /* Unpack the cells and link to the particle data. */
915
  struct part *parts = s->parts_foreign;
916
  struct gpart *gparts = s->gparts_foreign;
917
918
  for (int k = 0; k < nr_proxies; k++) {
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
919
      cell_link_parts(e->proxies[k].cells_in[j], parts);