engine.c 114 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
48
#include "engine.h"
49
50

/* Local headers. */
51
#include "atomic.h"
52
#include "cell.h"
53
#include "clocks.h"
54
55
#include "cycle.h"
#include "debug.h"
56
#include "error.h"
57
#include "hydro.h"
58
#include "minmax.h"
59
#include "parallel_io.h"
60
#include "part.h"
61
#include "partition.h"
62
63
#include "proxy.h"
#include "runner.h"
64
65
#include "serial_io.h"
#include "single_io.h"
66
#include "timers.h"
67
#include "tools.h"
68
#include "units.h"
69
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
70

Pedro Gonnet's avatar
Pedro Gonnet committed
71
72
73
74
75
76
77
78
79
80
81
82
83
const char *engine_policy_names[13] = {"none",
                                       "rand",
                                       "steal",
                                       "keep",
                                       "block",
                                       "fix_dt",
                                       "cpu_tight",
                                       "mpi",
                                       "numa_affinity",
                                       "hydro",
                                       "self_gravity",
                                       "external_gravity",
                                       "cosmology_integration"};
Pedro Gonnet's avatar
Pedro Gonnet committed
84

85
86
87
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

88
89
90
91
92
#ifdef HAVE_SETAFFINITY
/** The initial affinity of the main thread (set by engin_pin()) */
static cpu_set_t entry_affinity;
#endif

93
94
95
96
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
97
 * @param l A pointer to the #link, will be modified atomically.
98
99
100
101
102
 * @param t The #task.
 *
 * @return The new #link pointer.
 */

103
void engine_addlink(struct engine *e, struct link **l, struct task *t) {
104

105
  /* Get the next free link. */
106
107
108
109
110
  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
111

112
  /* Set it atomically. */
113
  res->t = t;
114
  res->next = atomic_swap(l, res);
115
}
116
117

/**
118
119
 * @brief Generate the gravity hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
Matthieu Schaller's avatar
Matthieu Schaller committed
120
121
 *
 * Tasks are only created here. The dependencies will be added later on.
122
123
124
 *
 * @param e The #engine.
 * @param c The #cell.
125
 * @param gsuper The gsuper #cell.
126
 */
127
void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c,
128
                                            struct cell *gsuper) {
129
130

  struct scheduler *s = &e->sched;
131
132
133
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity;
134
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;
135

136
  /* Is this the super-cell? */
137
  if (gsuper == NULL && (c->grav != NULL || (!c->split && c->gcount > 0))) {
138
139

    /* This is the super cell, i.e. the first with gravity tasks attached. */
140
    gsuper = c;
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      /* Add the init task. */
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);

      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
      } else {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
      }

      if (is_with_external_gravity)
        c->grav_external = scheduler_addtask(
            s, task_type_grav_external, task_subtype_none, 0, 0, c, NULL, 0);
    }
  }

  /* Set the super-cell. */
168
  c->gsuper = gsuper;
169
170
171
172
173

  /* Recurse. */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
174
        engine_make_gravity_hierarchical_tasks(e, c->progeny[k], gsuper);
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
}

/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
void engine_make_hydro_hierarchical_tasks(struct engine *e, struct cell *c,
                                          struct cell *super) {

  struct scheduler *s = &e->sched;
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;

  /* Is this the super-cell? */
  if (super == NULL && (c->density != NULL || (c->count > 0 && !c->split))) {
195

196
    /* This is the super cell, i.e. the first with density tasks attached. */
197
198
199
200
201
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
202
      /* Add the init task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
203
204
205
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
206

207
208
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
209
210
211
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
212
      } else {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
213
214
215
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
216
      }
Tom Theuns's avatar
Tom Theuns committed
217

218
219
220
      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
221
222
223
224
225
226

#ifdef EXTRA_HYDRO_LOOP
      /* Generate the extra ghost task. */
      c->extra_ghost = scheduler_addtask(s, task_type_extra_ghost,
                                         task_subtype_none, 0, 0, c, NULL, 0);
#endif
227
    }
228
  }
229

230
231
232
233
234
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
235
    for (int k = 0; k < 8; k++)
236
      if (c->progeny[k] != NULL)
237
        engine_make_hydro_hierarchical_tasks(e, c->progeny[k], super);
238
}
239

240
/**
241
 * @brief Redistribute the particles amongst the nodes according
242
243
 *      to their cell's node IDs.
 *
244
245
246
247
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
248
249
250
251
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
252
253
 *
 *
254
255
 * @param e The #engine.
 */
256
void engine_redistribute(struct engine *e) {
257

258
#ifdef WITH_MPI
259

260
261
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
262
263
  struct space *s = e->s;
  struct cell *cells = s->cells;
264
  const int nr_cells = s->nr_cells;
265
  const int *cdim = s->cdim;
266
  const double iwidth[3] = {s->iwidth[0], s->iwidth[1], s->iwidth[2]};
267
268
269
270
  const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
  struct part *parts = s->parts;
  struct xpart *xparts = s->xparts;
  struct gpart *gparts = s->gparts;
271
  ticks tic = getticks();
272

273
274
275
  /* Allocate temporary arrays to store the counts of particles to be sent
     and the destination of each particle */
  int *counts, *g_counts;
276
  if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
277
    error("Failed to allocate count temporary buffer.");
278
  if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
279
    error("Failed to allocate gcount temporary buffer.");
280
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
281
282
  bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);

283
  /* Allocate the destination index arrays. */
284
285
  int *dest, *g_dest;
  if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
286
    error("Failed to allocate dest temporary buffer.");
287
  if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL)
288
289
    error("Failed to allocate g_dest temporary buffer.");

290
  /* Get destination of each particle */
291
  for (size_t k = 0; k < s->nr_parts; k++) {
292
293

    /* Periodic boundary conditions */
294
    for (int j = 0; j < 3; j++) {
295
296
297
298
299
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
James Willis's avatar
James Willis committed
300
301
302
    const int cid =
        cell_getid(cdim, parts[k].x[0] * iwidth[0], parts[k].x[1] * iwidth[1],
                   parts[k].x[2] * iwidth[2]);
303
304
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
305
      error("Bad cell id %i for part %zu at [%.3e,%.3e,%.3e].", cid, k,
306
307
308
            parts[k].x[0], parts[k].x[1], parts[k].x[2]);
#endif

309
    dest[k] = cells[cid].nodeID;
310
311

    /* The counts array is indexed as count[from * nr_nodes + to]. */
312
313
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }
314
315

  /* Sort the particles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
316
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
317

318
  /* We need to re-link the gpart partners of parts. */
319
320
321
322
323
324
  if (s->nr_parts > 0) {
    int current_dest = dest[0];
    size_t count_this_dest = 0;
    for (size_t k = 0; k < s->nr_parts; ++k) {
      if (s->parts[k].gpart != NULL) {

325
326
327
328
329
        /* As the addresses will be invalidated by the communications, we will
         * instead store the absolute index from the start of the sub-array of
         * particles to be sent to a given node.
         * Recall that gparts without partners have a negative id.
         * We will restore the pointers on the receiving node later on. */
330
331
332
333
        if (dest[k] != current_dest) {
          current_dest = dest[k];
          count_this_dest = 0;
        }
334

335
#ifdef SWIFT_DEBUG_CHECKS
Matthieu Schaller's avatar
Matthieu Schaller committed
336
        if (s->parts[k].gpart->id_or_neg_offset >= 0)
337
338
          error("Trying to link a partnerless gpart !");
#endif
339

340
        s->parts[k].gpart->id_or_neg_offset = -count_this_dest;
341
        count_this_dest++;
342
343
344
345
346
      }
    }
  }

  /* Get destination of each g-particle */
347
  for (size_t k = 0; k < s->nr_gparts; k++) {
348
349

    /* Periodic boundary conditions */
350
    for (int j = 0; j < 3; j++) {
351
352
353
354
      if (gparts[k].x[j] < 0.0)
        gparts[k].x[j] += dim[j];
      else if (gparts[k].x[j] >= dim[j])
        gparts[k].x[j] -= dim[j];
355
    }
James Willis's avatar
James Willis committed
356
357
358
    const int cid =
        cell_getid(cdim, gparts[k].x[0] * iwidth[0], gparts[k].x[1] * iwidth[1],
                   gparts[k].x[2] * iwidth[2]);
359
360
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
361
      error("Bad cell id %i for part %zu at [%.3e,%.3e,%.3e].", cid, k,
362
363
364
            gparts[k].x[0], gparts[k].x[1], gparts[k].x[2]);
#endif

365
    g_dest[k] = cells[cid].nodeID;
366
367

    /* The counts array is indexed as count[from * nr_nodes + to]. */
368
    g_counts[nodeID * nr_nodes + g_dest[k]] += 1;
369
  }
370
371

  /* Sort the gparticles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
372
  space_gparts_sort(s, g_dest, s->nr_gparts, 0, nr_nodes - 1, e->verbose);
373

374
375
376
377
378
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

379
380
381
382
383
384
  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Each node knows how many parts and gparts will be transferred to every
385
     other node. We can start preparing to receive data */
386
387
388

  /* Get the new number of parts and gparts for this node */
  size_t nr_parts = 0, nr_gparts = 0;
389
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
390
391
392
393
  for (int k = 0; k < nr_nodes; k++)
    nr_gparts += g_counts[k * nr_nodes + nodeID];

  /* Allocate the new arrays with some extra margin */
394
  struct part *parts_new = NULL;
395
  struct xpart *xparts_new = NULL;
396
  struct gpart *gparts_new = NULL;
397
  if (posix_memalign((void **)&parts_new, part_align,
398
399
                     sizeof(struct part) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
400
    error("Failed to allocate new part data.");
401
402
403
404
405
406
407
408
409
410
  if (posix_memalign((void **)&xparts_new, xpart_align,
                     sizeof(struct xpart) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new xpart data.");
  if (posix_memalign((void **)&gparts_new, gpart_align,
                     sizeof(struct gpart) * nr_gparts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new gpart data.");

  /* Prepare MPI requests for the asynchronous communications */
411
  MPI_Request *reqs;
412
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
413
414
      NULL)
    error("Failed to allocate MPI request list.");
415
416
417
418
419
420
421
422
423
424
425
426
  for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

  /* Emit the sends and recvs for the particle and gparticle data. */
  size_t offset_send = 0, offset_recv = 0;
  size_t g_offset_send = 0, g_offset_recv = 0;
  for (int k = 0; k < nr_nodes; k++) {

    /* Indices in the count arrays of the node of interest */
    const int ind_send = nodeID * nr_nodes + k;
    const int ind_recv = k * nr_nodes + nodeID;

    /* Are we sending any part/xpart ? */
427
    if (counts[ind_send] > 0) {
428

429
      /* message("Sending %d part to node %d", counts[ind_send], k); */
Matthieu Schaller's avatar
Matthieu Schaller committed
430

431
      /* If the send is to the same node, just copy */
432
433
434
435
436
437
438
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];
439
440

        /* Else, emit some communications */
441
      } else {
442
443
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type,
                      k, 3 * ind_send + 0, MPI_COMM_WORLD,
444
445
                      &reqs[6 * k]) != MPI_SUCCESS)
          error("Failed to isend parts to node %i.", k);
446
447
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type,
                      k, 3 * ind_send + 1, MPI_COMM_WORLD,
448
449
                      &reqs[6 * k + 1]) != MPI_SUCCESS)
          error("Failed to isend xparts to node %i.", k);
450
451
        offset_send += counts[ind_send];
      }
452
    }
453
454
455
456

    /* Are we sending any gpart ? */
    if (g_counts[ind_send] > 0) {

457
      /* message("Sending %d gpart to node %d", g_counts[ind_send], k); */
458

459
460
      /* If the send is to the same node, just copy */
      if (k == nodeID) {
461
        memcpy(&gparts_new[g_offset_recv], &s->gparts[g_offset_send],
462
463
464
465
466
467
468
               sizeof(struct gpart) * g_counts[ind_recv]);
        g_offset_send += g_counts[ind_send];
        g_offset_recv += g_counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
469
                      gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
470
471
                      &reqs[6 * k + 2]) != MPI_SUCCESS)
          error("Failed to isend gparts to node %i.", k);
472
        g_offset_send += g_counts[ind_send];
473
474
475
476
477
478
      }
    }

    /* Now emit the corresponding Irecv() */

    /* Are we receiving any part/xpart from this node ? */
479
    if (k != nodeID && counts[ind_recv] > 0) {
480
481
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], part_mpi_type, k,
                    3 * ind_recv + 0, MPI_COMM_WORLD,
482
483
                    &reqs[6 * k + 3]) != MPI_SUCCESS)
        error("Failed to emit irecv of parts from node %i.", k);
484
485
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], xpart_mpi_type,
                    k, 3 * ind_recv + 1, MPI_COMM_WORLD,
486
487
                    &reqs[6 * k + 4]) != MPI_SUCCESS)
        error("Failed to emit irecv of xparts from node %i.", k);
488
      offset_recv += counts[ind_recv];
489
    }
490
491
492
493

    /* Are we receiving any gpart from this node ? */
    if (k != nodeID && g_counts[ind_recv] > 0) {
      if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
494
                    gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
495
496
497
498
                    &reqs[6 * k + 5]) != MPI_SUCCESS)
        error("Failed to emit irecv of gparts from node %i.", k);
      g_offset_recv += g_counts[ind_recv];
    }
499
  }
500

501
  /* Wait for all the sends and recvs to tumble in. */
502
  MPI_Status stats[6 * nr_nodes];
503
  int res;
504
505
  if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
    for (int k = 0; k < 6 * nr_nodes; k++) {
506
507
508
      char buff[MPI_MAX_ERROR_STRING];
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
509
    }
510
511
    error("Failed during waitall for part data.");
  }
512

513
514
515
516
517
518
519
520
521
522
523
524
  /* We now need to restore the part<->gpart links */
  size_t offset_parts = 0, offset_gparts = 0;
  for (int node = 0; node < nr_nodes; ++node) {

    const int ind_recv = node * nr_nodes + nodeID;
    const size_t count_parts = counts[ind_recv];
    const size_t count_gparts = g_counts[ind_recv];

    /* Loop over the gparts received from that node */
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) {

      /* Does this gpart have a partner ? */
525
      if (gparts_new[k].id_or_neg_offset <= 0) {
526

Matthieu Schaller's avatar
Style    
Matthieu Schaller committed
527
528
        const ptrdiff_t partner_index =
            offset_parts - gparts_new[k].id_or_neg_offset;
529
530

        /* Re-link */
531
        gparts_new[k].id_or_neg_offset = -partner_index;
Pedro Gonnet's avatar
Pedro Gonnet committed
532
        parts_new[partner_index].gpart = &gparts_new[k];
533
534
535
536
537
538
539
      }
    }

    offset_parts += count_parts;
    offset_gparts += count_gparts;
  }

540
#ifdef SWIFT_DEBUG_CHECKS
541
  /* Verify that all parts are in the right place. */
542
  for (int k = 0; k < nr_parts; k++) {
543
    int cid = cell_getid(cdim, parts_new[k].x[0] * iwidth[0],
James Willis's avatar
James Willis committed
544
545
                         parts_new[k].x[1] * iwidth[1],
                         parts_new[k].x[2] * iwidth[2]);
546
547
548
549
    if (cells[cid].nodeID != nodeID)
      error("Received particle (%i) that does not belong here (nodeID=%i).", k,
            cells[cid].nodeID);
  }
550

551
  /* Verify that the links are correct */
552
  for (size_t k = 0; k < nr_gparts; ++k) {
553

554
    if (gparts_new[k].id_or_neg_offset <= 0) {
555

556
      struct part *part = &parts_new[-gparts_new[k].id_or_neg_offset];
557

558
      if (part->gpart != &gparts_new[k]) error("Linking problem !");
559

560
      if (gparts_new[k].x[0] != part->x[0] ||
561
          gparts_new[k].x[1] != part->x[1] || gparts_new[k].x[2] != part->x[2])
562
563
564
565
        error("Linked particles are not at the same position !");
    }
  }
  for (size_t k = 0; k < nr_parts; ++k) {
Matthieu Schaller's avatar
Matthieu Schaller committed
566

567
568
    if (parts_new[k].gpart != NULL &&
        parts_new[k].gpart->id_or_neg_offset != -k) {
569
      error("Linking problem !");
570
571
572
    }
  }
#endif
573
574
575
576

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
577
  free(gparts);
578
579
  s->parts = parts_new;
  s->xparts = xparts_new;
580
  s->gparts = gparts_new;
581
  s->nr_parts = nr_parts;
582
583
584
  s->nr_gparts = nr_gparts;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;
585

586
  /* Clean up the temporary stuff. */
587
588
589
590
  free(reqs);
  free(counts);
  free(dest);

591
592
593
594
595
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
596
    message("node %i now has %zu parts and %zu gparts in %i cells.", nodeID,
597
            nr_parts, nr_gparts, my_cells);
598
599
  }

600
601
602
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
603
#else
604
  error("SWIFT was not compiled with MPI support.");
605
606
#endif
}
607

608
/**
609
 * @brief Repartition the cells amongst the nodes.
610
611
612
 *
 * @param e The #engine.
 */
613
void engine_repartition(struct engine *e) {
614
615
616

#if defined(WITH_MPI) && defined(HAVE_METIS)

617
618
  ticks tic = getticks();

619
  /* Clear the repartition flag. */
620
  enum repartition_type reparttype = e->forcerepart;
621
  e->forcerepart = REPART_NONE;
622

623
624
  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
625
  if (e->nr_nodes == 1) return;
626

627
628
629
  /* Do the repartitioning. */
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);
630
631
632
633

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
634
     the parts array, and emitting the sends and receives.
635
636
637
638
639
640
641
642
643
644
645
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

646
647
648
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
649
#else
650
  error("SWIFT was not compiled with MPI and METIS support.");
651
#endif
652
}
653

654
655
656
657
658
659
660
661
/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
 */
662
663
void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {
664

665
666
667
668
669
670
671
672
673
674
  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}
675

676
677
678
679
/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
680
 * @param ci The sending #cell.
681
682
683
 * @param cj Dummy cell containing the nodeID of the receiving node.
 * @param t_xv The send_xv #task, if it has already been created.
 * @param t_rho The send_rho #task, if it has already been created.
684
 * @param t_gradient The send_gradient #task, if already created.
Peter W. Draper's avatar
Peter W. Draper committed
685
 * @param t_ti The send_ti #task, if required and has already been created.
686
 */
687
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj,
688
                          struct task *t_xv, struct task *t_rho,
689
                          struct task *t_gradient, struct task *t_ti) {
690

691
#ifdef WITH_MPI
692
693
  struct link *l = NULL;
  struct scheduler *s = &e->sched;
694
  const int nodeID = cj->nodeID;
695

696
697
  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
698
699
    if (l->t->ci->nodeID == nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == nodeID))
700
      break;
701

702
703
  /* If so, attach send tasks. */
  if (l != NULL) {
704

705
706
707
    /* Create the tasks and their dependencies? */
    if (t_xv == NULL) {
      t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
708
                               4 * ci->tag, 0, ci, cj, 0);
709
      t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
710
                                4 * ci->tag + 1, 0, ci, cj, 0);
711
      if (!(e->policy & engine_policy_fixdt))
712
        t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend,
713
714
715
716
717
718
719
720
721
722
723
724
725
726
                                 4 * ci->tag + 2, 0, ci, cj, 0);
#ifdef EXTRA_HYDRO_LOOP
      t_gradient = scheduler_addtask(s, task_type_send, task_subtype_none,
                                     4 * ci->tag + 3, 0, ci, cj, 0);
#endif

#ifdef EXTRA_HYDRO_LOOP

      scheduler_addunlock(s, t_gradient, ci->super->kick);

      scheduler_addunlock(s, ci->super->extra_ghost, t_gradient);

      /* The send_rho task should unlock the super-cell's extra_ghost task. */
      scheduler_addunlock(s, t_rho, ci->super->extra_ghost);
727

728
729
      /* The send_rho task depends on the cell's ghost task. */
      scheduler_addunlock(s, ci->super->ghost, t_rho);
730

731
732
733
734
      /* The send_xv task should unlock the super-cell's ghost task. */
      scheduler_addunlock(s, t_xv, ci->super->ghost);

#else
735
736
      /* The send_rho task should unlock the super-cell's kick task. */
      scheduler_addunlock(s, t_rho, ci->super->kick);
737

738
739
740
      /* The send_rho task depends on the cell's ghost task. */
      scheduler_addunlock(s, ci->super->ghost, t_rho);

741
742
      /* The send_xv task should unlock the super-cell's ghost task. */
      scheduler_addunlock(s, t_xv, ci->super->ghost);
743
#endif
744

745
      /* The super-cell's kick task should unlock the send_ti task. */
746
      if (t_ti != NULL) scheduler_addunlock(s, ci->super->kick, t_ti);
747
    }
748

749
    /* Add them to the local cell. */
750
751
    engine_addlink(e, &ci->send_xv, t_xv);
    engine_addlink(e, &ci->send_rho, t_rho);
752
#ifdef EXTRA_HYDRO_LOOP
753
    engine_addlink(e, &ci->send_gradient, t_gradient);
754
#endif
755
    if (t_ti != NULL) engine_addlink(e, &ci->send_ti, t_ti);
756
  }
757

758
  /* Recurse? */
759
  if (ci->split)
760
    for (int k = 0; k < 8; k++)
761
      if (ci->progeny[k] != NULL)
762
763
        engine_addtasks_send(e, ci->progeny[k], cj, t_xv, t_rho, t_gradient,
                             t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
764
765
766
767

#else
  error("SWIFT was not compiled with MPI support.");
#endif
768
}
769
770
771
772
773

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
774
 * @param c The foreign #cell.
775
776
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
777
 * @param t_gradient The recv_gradient #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
778
 * @param t_ti The recv_ti #task, if required and has already been created.
779
780
 */

781
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
782
783
                          struct task *t_rho, struct task *t_gradient,
                          struct task *t_ti) {
784

785
#ifdef WITH_MPI
786
  struct scheduler *s = &e->sched;
787

788
789
790
791
  /* Do we need to construct a recv task?
     Note that since c is a foreign cell, all its density tasks will involve
     only the current rank, and thus we don't have to check them.*/
  if (t_xv == NULL && c->density != NULL) {
792

793
    /* Create the tasks. */
794
    t_xv = scheduler_addtask(s, task_type_recv, task_subtype_none, 4 * c->tag,
795
796
                             0, c, NULL, 0);
    t_rho = scheduler_addtask(s, task_type_recv, task_subtype_none,
797
                              4 * c->tag + 1, 0, c, NULL, 0);
798
    if (!(e->policy & engine_policy_fixdt))
799
      t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend,
800
801
802
803
804
                               4 * c->tag + 2, 0, c, NULL, 0);
#ifdef EXTRA_HYDRO_LOOP
    t_gradient = scheduler_addtask(s, task_type_recv, task_subtype_none,
                                   4 * c->tag + 3, 0, c, NULL, 0);
#endif
805
  }
806
807
  c->recv_xv = t_xv;
  c->recv_rho = t_rho;
808
  c->recv_gradient = t_gradient;
809
  c->recv_ti = t_ti;
810

811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
/* Add dependencies. */
#ifdef EXTRA_HYDRO_LOOP
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
  for (struct link *l = c->gradient; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_rho, l->t);
    scheduler_addunlock(s, l->t, t_gradient);
  }
  for (struct link *l = c->force; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_gradient, l->t);
    if (t_ti != NULL) scheduler_addunlock(s, l->t, t_ti);
  }
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);
#else
827
828
829
830
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
831
  for (struct link *l = c->force; l != NULL; l = l->next) {
832
    scheduler_addunlock(s, t_rho, l->t);
833
    if (t_ti != NULL) scheduler_addunlock(s, l->t, t_ti);
834
  }
835
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);
836
#endif
837
838
839

  /* Recurse? */
  if (c->split)
840
    for (int k = 0; k < 8; k++)
841
      if (c->progeny[k] != NULL)
842
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho, t_gradient, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
843
844
845
846

#else
  error("SWIFT was not compiled with MPI support.");
#endif
847
}
848
849
850
851
852
853

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
854
void engine_exchange_cells(struct engine *e) {
855
856
857

#ifdef WITH_MPI

858
859
  struct space *s = e->s;
  struct cell *cells = s->cells;
860
861
  const int nr_cells = s->nr_cells;
  const int nr_proxies = e->nr_proxies;
862
863
864
865
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;
866
  ticks tic = getticks();
867
868
869

  /* Run through the cells and get the size of the ones that will be sent off.
   */
870
871
872
  int count_out = 0;
  for (int k = 0; k < nr_cells; k++) {
    offset[k] = count_out;
873
    if (cells[k].sendto)
874
      count_out += (cells[k].pcell_size = cell_getsize(&cells[k]));
875
  }
876

877
  /* Allocate the pcells. */
878
  struct pcell *pcells;
879
880
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count_out)) ==
      NULL)
881
882
883
884
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
885
  for (int k = 0; k < nr_cells; k++)
886
887
888
889
890
891
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
892
  for (int k = 0; k < nr_proxies; k++) {
893
894
895
896
897
898
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
899
  for (int k = 0; k < nr_proxies; k++) {
900
    int pid = MPI_UNDEFINED<