engine.c 116 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
48
#include "engine.h"
49
50

/* Local headers. */
51
#include "atomic.h"
52
#include "cell.h"
53
#include "clocks.h"
54
55
#include "cycle.h"
#include "debug.h"
56
#include "error.h"
57
#include "hydro.h"
58
#include "minmax.h"
59
#include "parallel_io.h"
60
#include "part.h"
61
#include "partition.h"
62
63
#include "proxy.h"
#include "runner.h"
64
65
#include "serial_io.h"
#include "single_io.h"
66
#include "timers.h"
67
#include "tools.h"
68
#include "units.h"
69
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
70

Pedro Gonnet's avatar
Pedro Gonnet committed
71
72
73
74
75
76
77
78
79
80
81
82
83
const char *engine_policy_names[13] = {"none",
                                       "rand",
                                       "steal",
                                       "keep",
                                       "block",
                                       "fix_dt",
                                       "cpu_tight",
                                       "mpi",
                                       "numa_affinity",
                                       "hydro",
                                       "self_gravity",
                                       "external_gravity",
                                       "cosmology_integration"};
Pedro Gonnet's avatar
Pedro Gonnet committed
84

85
86
87
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

88
89
90
91
92
#ifdef HAVE_SETAFFINITY
/** The initial affinity of the main thread (set by engin_pin()) */
static cpu_set_t entry_affinity;
#endif

93
94
95
96
97
98
99
100
101
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
 * @param l The #link.
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
102
struct link *engine_addlink(struct engine *e, struct link *l, struct task *t) {
103

104
105
106
107
108
  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
109
110
111
112
  res->next = l;
  res->t = t;
  return res;
}
113
114

/**
115
116
 * @brief Generate the gravity hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
Matthieu Schaller's avatar
Matthieu Schaller committed
117
118
 *
 * Tasks are only created here. The dependencies will be added later on.
119
120
121
122
123
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
124
125
void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c,
                                            struct cell *super) {
126
127

  struct scheduler *s = &e->sched;
128
129
130
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity;
131
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;
132

133
  /* Is this the super-cell? */
Matthieu Schaller's avatar
Matthieu Schaller committed
134
  if (super == NULL && (c->grav != NULL || (!c->split && c->gcount > 0))) {
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196

    /* This is the super cell, i.e. the first with gravity tasks attached. */
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      /* Add the init task. */
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);

      /* Add the drift task. */
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);

      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
      } else {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
      }

      if (is_with_external_gravity)
        c->grav_external = scheduler_addtask(
            s, task_type_grav_external, task_subtype_none, 0, 0, c, NULL, 0);
    }
  }

  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_gravity_hierarchical_tasks(e, c->progeny[k], super);
}

/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
void engine_make_hydro_hierarchical_tasks(struct engine *e, struct cell *c,
                                          struct cell *super) {

  struct scheduler *s = &e->sched;
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;

  /* Is this the super-cell? */
  if (super == NULL && (c->density != NULL || (c->count > 0 && !c->split))) {
197

198
    /* This is the super cell, i.e. the first with density tasks attached. */
199
200
201
202
203
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
204
      /* Add the init task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
205
206
207
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
208

Matthieu Schaller's avatar
Matthieu Schaller committed
209
      /* Add the drift task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
210
211
212
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);
213

214
215
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
216
217
218
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
219
      } else {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
220
221
222
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
223
      }
Tom Theuns's avatar
Tom Theuns committed
224

225
226
227
      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
228
229
230
231
232
233

#ifdef EXTRA_HYDRO_LOOP
      /* Generate the extra ghost task. */
      c->extra_ghost = scheduler_addtask(s, task_type_extra_ghost,
                                         task_subtype_none, 0, 0, c, NULL, 0);
#endif
234
    }
235
  }
236

237
238
239
240
241
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
242
    for (int k = 0; k < 8; k++)
243
      if (c->progeny[k] != NULL)
244
        engine_make_hydro_hierarchical_tasks(e, c->progeny[k], super);
245
}
246

247
/**
248
 * @brief Redistribute the particles amongst the nodes according
249
250
 *      to their cell's node IDs.
 *
251
252
253
254
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
255
256
257
258
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
259
260
 *
 *
261
262
 * @param e The #engine.
 */
263
void engine_redistribute(struct engine *e) {
264

265
#ifdef WITH_MPI
266

267
268
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
269
270
  struct space *s = e->s;
  struct cell *cells = s->cells;
271
  const int nr_cells = s->nr_cells;
272
  const int *cdim = s->cdim;
273
  const double iwidth[3] = {s->iwidth[0], s->iwidth[1], s->iwidth[2]};
274
275
276
277
  const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
  struct part *parts = s->parts;
  struct xpart *xparts = s->xparts;
  struct gpart *gparts = s->gparts;
278
  ticks tic = getticks();
279

280
281
282
  /* Allocate temporary arrays to store the counts of particles to be sent
     and the destination of each particle */
  int *counts, *g_counts;
283
  if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
284
    error("Failed to allocate count temporary buffer.");
285
  if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
286
    error("Failed to allocate gcount temporary buffer.");
287
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
288
289
  bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);

290
  /* Allocate the destination index arrays. */
291
292
  int *dest, *g_dest;
  if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
293
    error("Failed to allocate dest temporary buffer.");
294
  if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL)
295
296
    error("Failed to allocate g_dest temporary buffer.");

297
  /* Get destination of each particle */
298
  for (size_t k = 0; k < s->nr_parts; k++) {
299
300

    /* Periodic boundary conditions */
301
    for (int j = 0; j < 3; j++) {
302
303
304
305
306
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
James Willis's avatar
James Willis committed
307
308
309
    const int cid =
        cell_getid(cdim, parts[k].x[0] * iwidth[0], parts[k].x[1] * iwidth[1],
                   parts[k].x[2] * iwidth[2]);
310
311
312
313
314
315
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            parts[k].x[0], parts[k].x[1], parts[k].x[2]);
#endif

316
    dest[k] = cells[cid].nodeID;
317
318

    /* The counts array is indexed as count[from * nr_nodes + to]. */
319
320
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }
321
322

  /* Sort the particles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
323
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
324

325
  /* We need to re-link the gpart partners of parts. */
326
327
328
329
330
331
  if (s->nr_parts > 0) {
    int current_dest = dest[0];
    size_t count_this_dest = 0;
    for (size_t k = 0; k < s->nr_parts; ++k) {
      if (s->parts[k].gpart != NULL) {

332
333
334
335
336
        /* As the addresses will be invalidated by the communications, we will
         * instead store the absolute index from the start of the sub-array of
         * particles to be sent to a given node.
         * Recall that gparts without partners have a negative id.
         * We will restore the pointers on the receiving node later on. */
337
338
339
340
        if (dest[k] != current_dest) {
          current_dest = dest[k];
          count_this_dest = 0;
        }
341

342
#ifdef SWIFT_DEBUG_CHECKS
Matthieu Schaller's avatar
Matthieu Schaller committed
343
        if (s->parts[k].gpart->id_or_neg_offset >= 0)
344
345
          error("Trying to link a partnerless gpart !");
#endif
346

347
        s->parts[k].gpart->id_or_neg_offset = -count_this_dest;
348
        count_this_dest++;
349
350
351
352
353
      }
    }
  }

  /* Get destination of each g-particle */
354
  for (size_t k = 0; k < s->nr_gparts; k++) {
355
356

    /* Periodic boundary conditions */
357
    for (int j = 0; j < 3; j++) {
358
359
360
361
      if (gparts[k].x[j] < 0.0)
        gparts[k].x[j] += dim[j];
      else if (gparts[k].x[j] >= dim[j])
        gparts[k].x[j] -= dim[j];
362
    }
James Willis's avatar
James Willis committed
363
364
365
    const int cid =
        cell_getid(cdim, gparts[k].x[0] * iwidth[0], gparts[k].x[1] * iwidth[1],
                   gparts[k].x[2] * iwidth[2]);
366
367
368
369
370
371
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            gparts[k].x[0], gparts[k].x[1], gparts[k].x[2]);
#endif

372
    g_dest[k] = cells[cid].nodeID;
373
374

    /* The counts array is indexed as count[from * nr_nodes + to]. */
375
    g_counts[nodeID * nr_nodes + g_dest[k]] += 1;
376
  }
377
378

  /* Sort the gparticles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
379
  space_gparts_sort(s, g_dest, s->nr_gparts, 0, nr_nodes - 1, e->verbose);
380

381
382
383
384
385
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

386
387
388
389
390
391
  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Each node knows how many parts and gparts will be transferred to every
392
     other node. We can start preparing to receive data */
393
394
395

  /* Get the new number of parts and gparts for this node */
  size_t nr_parts = 0, nr_gparts = 0;
396
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
397
398
399
400
  for (int k = 0; k < nr_nodes; k++)
    nr_gparts += g_counts[k * nr_nodes + nodeID];

  /* Allocate the new arrays with some extra margin */
401
  struct part *parts_new = NULL;
402
  struct xpart *xparts_new = NULL;
403
  struct gpart *gparts_new = NULL;
404
  if (posix_memalign((void **)&parts_new, part_align,
405
406
                     sizeof(struct part) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
407
    error("Failed to allocate new part data.");
408
409
410
411
412
413
414
415
416
417
  if (posix_memalign((void **)&xparts_new, xpart_align,
                     sizeof(struct xpart) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new xpart data.");
  if (posix_memalign((void **)&gparts_new, gpart_align,
                     sizeof(struct gpart) * nr_gparts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new gpart data.");

  /* Prepare MPI requests for the asynchronous communications */
418
  MPI_Request *reqs;
419
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
420
421
      NULL)
    error("Failed to allocate MPI request list.");
422
423
424
425
426
427
428
429
430
431
432
433
  for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

  /* Emit the sends and recvs for the particle and gparticle data. */
  size_t offset_send = 0, offset_recv = 0;
  size_t g_offset_send = 0, g_offset_recv = 0;
  for (int k = 0; k < nr_nodes; k++) {

    /* Indices in the count arrays of the node of interest */
    const int ind_send = nodeID * nr_nodes + k;
    const int ind_recv = k * nr_nodes + nodeID;

    /* Are we sending any part/xpart ? */
434
    if (counts[ind_send] > 0) {
435

436
      /* message("Sending %d part to node %d", counts[ind_send], k); */
Matthieu Schaller's avatar
Matthieu Schaller committed
437

438
      /* If the send is to the same node, just copy */
439
440
441
442
443
444
445
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];
446
447

        /* Else, emit some communications */
448
      } else {
449
450
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type,
                      k, 3 * ind_send + 0, MPI_COMM_WORLD,
451
452
                      &reqs[6 * k]) != MPI_SUCCESS)
          error("Failed to isend parts to node %i.", k);
453
454
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type,
                      k, 3 * ind_send + 1, MPI_COMM_WORLD,
455
456
                      &reqs[6 * k + 1]) != MPI_SUCCESS)
          error("Failed to isend xparts to node %i.", k);
457
458
        offset_send += counts[ind_send];
      }
459
    }
460
461
462
463

    /* Are we sending any gpart ? */
    if (g_counts[ind_send] > 0) {

464
      /* message("Sending %d gpart to node %d", g_counts[ind_send], k); */
465

466
467
      /* If the send is to the same node, just copy */
      if (k == nodeID) {
468
        memcpy(&gparts_new[g_offset_recv], &s->gparts[g_offset_send],
469
470
471
472
473
474
475
               sizeof(struct gpart) * g_counts[ind_recv]);
        g_offset_send += g_counts[ind_send];
        g_offset_recv += g_counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
476
                      gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
477
478
                      &reqs[6 * k + 2]) != MPI_SUCCESS)
          error("Failed to isend gparts to node %i.", k);
479
        g_offset_send += g_counts[ind_send];
480
481
482
483
484
485
      }
    }

    /* Now emit the corresponding Irecv() */

    /* Are we receiving any part/xpart from this node ? */
486
    if (k != nodeID && counts[ind_recv] > 0) {
487
488
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], part_mpi_type, k,
                    3 * ind_recv + 0, MPI_COMM_WORLD,
489
490
                    &reqs[6 * k + 3]) != MPI_SUCCESS)
        error("Failed to emit irecv of parts from node %i.", k);
491
492
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], xpart_mpi_type,
                    k, 3 * ind_recv + 1, MPI_COMM_WORLD,
493
494
                    &reqs[6 * k + 4]) != MPI_SUCCESS)
        error("Failed to emit irecv of xparts from node %i.", k);
495
      offset_recv += counts[ind_recv];
496
    }
497
498
499
500

    /* Are we receiving any gpart from this node ? */
    if (k != nodeID && g_counts[ind_recv] > 0) {
      if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
501
                    gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
502
503
504
505
                    &reqs[6 * k + 5]) != MPI_SUCCESS)
        error("Failed to emit irecv of gparts from node %i.", k);
      g_offset_recv += g_counts[ind_recv];
    }
506
  }
507

508
  /* Wait for all the sends and recvs to tumble in. */
509
  MPI_Status stats[6 * nr_nodes];
510
  int res;
511
512
  if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
    for (int k = 0; k < 6 * nr_nodes; k++) {
513
514
515
      char buff[MPI_MAX_ERROR_STRING];
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
516
    }
517
518
    error("Failed during waitall for part data.");
  }
519

520
521
522
523
524
525
526
527
528
529
530
531
  /* We now need to restore the part<->gpart links */
  size_t offset_parts = 0, offset_gparts = 0;
  for (int node = 0; node < nr_nodes; ++node) {

    const int ind_recv = node * nr_nodes + nodeID;
    const size_t count_parts = counts[ind_recv];
    const size_t count_gparts = g_counts[ind_recv];

    /* Loop over the gparts received from that node */
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) {

      /* Does this gpart have a partner ? */
532
      if (gparts_new[k].id_or_neg_offset <= 0) {
533

Matthieu Schaller's avatar
Style    
Matthieu Schaller committed
534
535
        const ptrdiff_t partner_index =
            offset_parts - gparts_new[k].id_or_neg_offset;
536
537

        /* Re-link */
538
        gparts_new[k].id_or_neg_offset = -partner_index;
Pedro Gonnet's avatar
Pedro Gonnet committed
539
        parts_new[partner_index].gpart = &gparts_new[k];
540
541
542
543
544
545
546
      }
    }

    offset_parts += count_parts;
    offset_gparts += count_gparts;
  }

547
#ifdef SWIFT_DEBUG_CHECKS
548
  /* Verify that all parts are in the right place. */
549
  for (int k = 0; k < nr_parts; k++) {
550
    int cid = cell_getid(cdim, parts_new[k].x[0] * iwidth[0],
James Willis's avatar
James Willis committed
551
552
                         parts_new[k].x[1] * iwidth[1],
                         parts_new[k].x[2] * iwidth[2]);
553
554
555
556
    if (cells[cid].nodeID != nodeID)
      error("Received particle (%i) that does not belong here (nodeID=%i).", k,
            cells[cid].nodeID);
  }
557

558
  /* Verify that the links are correct */
559
  for (size_t k = 0; k < nr_gparts; ++k) {
560

561
    if (gparts_new[k].id_or_neg_offset <= 0) {
562

563
      struct part *part = &parts_new[-gparts_new[k].id_or_neg_offset];
564

565
      if (part->gpart != &gparts_new[k]) error("Linking problem !");
566

567
      if (gparts_new[k].x[0] != part->x[0] ||
568
          gparts_new[k].x[1] != part->x[1] || gparts_new[k].x[2] != part->x[2])
569
570
571
572
        error("Linked particles are not at the same position !");
    }
  }
  for (size_t k = 0; k < nr_parts; ++k) {
Matthieu Schaller's avatar
Matthieu Schaller committed
573

574
575
    if (parts_new[k].gpart != NULL &&
        parts_new[k].gpart->id_or_neg_offset != -k) {
576
      error("Linking problem !");
577
578
579
    }
  }
#endif
580
581
582
583

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
584
  free(gparts);
585
586
  s->parts = parts_new;
  s->xparts = xparts_new;
587
  s->gparts = gparts_new;
588
  s->nr_parts = nr_parts;
589
590
591
  s->nr_gparts = nr_gparts;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;
592

593
  /* Clean up the temporary stuff. */
594
595
596
597
  free(reqs);
  free(counts);
  free(dest);

598
599
600
601
602
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
603
604
    message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID,
            nr_parts, nr_gparts, my_cells);
605
606
  }

607
608
609
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
610
#else
611
  error("SWIFT was not compiled with MPI support.");
612
613
#endif
}
614

615
/**
616
 * @brief Repartition the cells amongst the nodes.
617
618
619
 *
 * @param e The #engine.
 */
620
void engine_repartition(struct engine *e) {
621
622
623

#if defined(WITH_MPI) && defined(HAVE_METIS)

624
625
  ticks tic = getticks();

626
  /* Clear the repartition flag. */
627
  enum repartition_type reparttype = e->forcerepart;
628
  e->forcerepart = REPART_NONE;
629

630
631
  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
632
  if (e->nr_nodes == 1) return;
633

634
635
636
  /* Do the repartitioning. */
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);
637
638
639
640

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
641
     the parts array, and emitting the sends and receives.
642
643
644
645
646
647
648
649
650
651
652
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

653
654
655
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
656
#else
657
  error("SWIFT was not compiled with MPI and METIS support.");
658
#endif
659
}
660

661
662
663
664
665
666
667
668
/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
 */
669
670
void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {
671

672
673
674
675
676
677
678
679
680
681
  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}
682

683
684
685
686
/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
687
 * @param ci The sending #cell.
688
689
690
 * @param cj Dummy cell containing the nodeID of the receiving node.
 * @param t_xv The send_xv #task, if it has already been created.
 * @param t_rho The send_rho #task, if it has already been created.
691
 * @param t_gradient The send_gradient #task, if already created.
Peter W. Draper's avatar
Peter W. Draper committed
692
 * @param t_ti The send_ti #task, if required and has already been created.
693
 */
694
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj,
695
                          struct task *t_xv, struct task *t_rho,
696
                          struct task *t_gradient, struct task *t_ti) {
697

698
#ifdef WITH_MPI
699
700
  struct link *l = NULL;
  struct scheduler *s = &e->sched;
701
  const int nodeID = cj->nodeID;
702

703
704
  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
705
706
    if (l->t->ci->nodeID == nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == nodeID))
707
      break;
708

709
710
  /* If so, attach send tasks. */
  if (l != NULL) {
711

712
713
714
    /* Create the tasks and their dependencies? */
    if (t_xv == NULL) {
      t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
715
                               4 * ci->tag, 0, ci, cj, 0);
716
      t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
717
                                4 * ci->tag + 1, 0, ci, cj, 0);
718
      if (!(e->policy & engine_policy_fixdt))
719
        t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend,
720
721
722
723
724
725
726
727
728
729
730
731
732
733
                                 4 * ci->tag + 2, 0, ci, cj, 0);
#ifdef EXTRA_HYDRO_LOOP
      t_gradient = scheduler_addtask(s, task_type_send, task_subtype_none,
                                     4 * ci->tag + 3, 0, ci, cj, 0);
#endif

#ifdef EXTRA_HYDRO_LOOP

      scheduler_addunlock(s, t_gradient, ci->super->kick);

      scheduler_addunlock(s, ci->super->extra_ghost, t_gradient);

      /* The send_rho task should unlock the super-cell's extra_ghost task. */
      scheduler_addunlock(s, t_rho, ci->super->extra_ghost);
734

735
736
      /* The send_rho task depends on the cell's ghost task. */
      scheduler_addunlock(s, ci->super->ghost, t_rho);
737

738
739
740
741
      /* The send_xv task should unlock the super-cell's ghost task. */
      scheduler_addunlock(s, t_xv, ci->super->ghost);

#else
742
743
      /* The send_rho task should unlock the super-cell's kick task. */
      scheduler_addunlock(s, t_rho, ci->super->kick);
744

745
746
747
      /* The send_rho task depends on the cell's ghost task. */
      scheduler_addunlock(s, ci->super->ghost, t_rho);

748
749
      /* The send_xv task should unlock the super-cell's ghost task. */
      scheduler_addunlock(s, t_xv, ci->super->ghost);
750
#endif
751

752
      /* The super-cell's kick task should unlock the send_ti task. */
753
      if (t_ti != NULL) scheduler_addunlock(s, ci->super->kick, t_ti);
754
    }
755

756
757
758
    /* Add them to the local cell. */
    ci->send_xv = engine_addlink(e, ci->send_xv, t_xv);
    ci->send_rho = engine_addlink(e, ci->send_rho, t_rho);
759
760
761
#ifdef EXTRA_HYDRO_LOOP
    ci->send_gradient = engine_addlink(e, ci->send_gradient, t_gradient);
#endif
762
    if (t_ti != NULL) ci->send_ti = engine_addlink(e, ci->send_ti, t_ti);
763
  }
764

765
  /* Recurse? */
766
  if (ci->split)
767
    for (int k = 0; k < 8; k++)
768
      if (ci->progeny[k] != NULL)
769
770
        engine_addtasks_send(e, ci->progeny[k], cj, t_xv, t_rho, t_gradient,
                             t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
771
772
773
774

#else
  error("SWIFT was not compiled with MPI support.");
#endif
775
}
776
777
778
779
780

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
781
 * @param c The foreign #cell.
782
783
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
784
 * @param t_gradient The recv_gradient #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
785
 * @param t_ti The recv_ti #task, if required and has already been created.
786
 */
787
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
788
789
                          struct task *t_rho, struct task *t_gradient,
                          struct task *t_ti) {
790

791
#ifdef WITH_MPI
792
  struct scheduler *s = &e->sched;
793

794
795
796
797
  /* Do we need to construct a recv task?
     Note that since c is a foreign cell, all its density tasks will involve
     only the current rank, and thus we don't have to check them.*/
  if (t_xv == NULL && c->density != NULL) {
798

799
    /* Create the tasks. */
800
    t_xv = scheduler_addtask(s, task_type_recv, task_subtype_none, 4 * c->tag,
801
802
                             0, c, NULL, 0);
    t_rho = scheduler_addtask(s, task_type_recv, task_subtype_none,
803
                              4 * c->tag + 1, 0, c, NULL, 0);
804
    if (!(e->policy & engine_policy_fixdt))
805
      t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend,
806
807
808
809
810
                               4 * c->tag + 2, 0, c, NULL, 0);
#ifdef EXTRA_HYDRO_LOOP
    t_gradient = scheduler_addtask(s, task_type_recv, task_subtype_none,
                                   4 * c->tag + 3, 0, c, NULL, 0);
#endif
811
  }
812
813
  c->recv_xv = t_xv;
  c->recv_rho = t_rho;
814
  c->recv_gradient = t_gradient;
815
  c->recv_ti = t_ti;
816

817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
/* Add dependencies. */
#ifdef EXTRA_HYDRO_LOOP
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
  for (struct link *l = c->gradient; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_rho, l->t);
    scheduler_addunlock(s, l->t, t_gradient);
  }
  for (struct link *l = c->force; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_gradient, l->t);
    if (t_ti != NULL) scheduler_addunlock(s, l->t, t_ti);
  }
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);
#else
833
834
835
836
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
837
  for (struct link *l = c->force; l != NULL; l = l->next) {
838
    scheduler_addunlock(s, t_rho, l->t);
839
    if (t_ti != NULL) scheduler_addunlock(s, l->t, t_ti);
840
  }
841
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);
842
#endif
843
844
845

  /* Recurse? */
  if (c->split)
846
    for (int k = 0; k < 8; k++)
847
      if (c->progeny[k] != NULL)
848
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho, t_gradient, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
849
850
851
852

#else
  error("SWIFT was not compiled with MPI support.");
#endif
853
}
854
855
856
857
858
859

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
860
void engine_exchange_cells(struct engine *e) {
861
862
863

#ifdef WITH_MPI

864
865
  struct space *s = e->s;
  struct cell *cells = s->cells;
866
867
  const int nr_cells = s->nr_cells;
  const int nr_proxies = e->nr_proxies;
868
869
870
871
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;
872
  ticks tic = getticks();
873
874
875

  /* Run through the cells and get the size of the ones that will be sent off.
   */
876
877
878
  int count_out = 0;
  for (int k = 0; k < nr_cells; k++) {
    offset[k] = count_out;
879
    if (cells[k].sendto)
880
      count_out += (cells[k].pcell_size = cell_getsize(&cells[k]));
881
  }
882

883
  /* Allocate the pcells. */
884
  struct pcell *pcells;
885
886
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count_out)) ==
      NULL)
887
888
889
890
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
891
  for (int k = 0; k < nr_cells; k++)
892
893
894
895
896
897
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
898
  for (int k = 0; k < nr_proxies; k++) {
899
900
901
902
903
904
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
905
  for (int k = 0; k < nr_proxies; k++) {
906
    int pid = MPI_UNDEFINED;
907
908
909
910
911
912
913
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_cells_exch2(&e->proxies[pid]);
  }

914
  /* Wait for all the sends to have finished too. */
915
916
917
918
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Set the requests for the cells. */
919
  for (int k = 0; k < nr_proxies; k++) {
920
921
922
923
924
    reqs_in[k] = e->proxies[k].req_cells_in;
    reqs_out[k] = e->proxies[k].req_cells_out;
  }

  /* Wait for each pcell array to come in from the proxies. */
925
  for (int k = 0; k < nr_proxies; k++) {
926
    int pid = MPI_UNDEFINED;
927
928
929
930
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "cell data from proxy %i has arrived." , pid );
931
    for (int count = 0, j = 0; j < e->proxies[pid].nr_cells_in; j++)
932
933
934
935
      count += cell_unpack(&e->proxies[pid].pcells_in[count],
                           e->proxies[pid].cells_in[j], e->s);
  }