engine.c 103 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
48
#include "engine.h"
49
50

/* Local headers. */
51
#include "atomic.h"
52
#include "cell.h"
53
#include "clocks.h"
54
55
#include "cycle.h"
#include "debug.h"
56
#include "error.h"
57
#include "hydro.h"
58
#include "minmax.h"
59
#include "parallel_io.h"
60
#include "part.h"
61
#include "partition.h"
62
63
#include "proxy.h"
#include "runner.h"
64
65
#include "serial_io.h"
#include "single_io.h"
66
#include "timers.h"
67
#include "units.h"
68
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
69

Pedro Gonnet's avatar
Pedro Gonnet committed
70
71
72
73
74
75
76
77
78
79
80
81
82
const char *engine_policy_names[13] = {"none",
                                       "rand",
                                       "steal",
                                       "keep",
                                       "block",
                                       "fix_dt",
                                       "cpu_tight",
                                       "mpi",
                                       "numa_affinity",
                                       "hydro",
                                       "self_gravity",
                                       "external_gravity",
                                       "cosmology_integration"};
Pedro Gonnet's avatar
Pedro Gonnet committed
83

84
85
86
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

87
88
89
90
91
#ifdef HAVE_SETAFFINITY
/** The initial affinity of the main thread (set by engin_pin()) */
static cpu_set_t entry_affinity;
#endif

92
93
94
95
96
97
98
99
100
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
 * @param l The #link.
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
101
struct link *engine_addlink(struct engine *e, struct link *l, struct task *t) {
102

103
104
105
106
107
  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
108
109
110
111
  res->next = l;
  res->t = t;
  return res;
}
112
113

/**
114
115
 * @brief Generate the gravity hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
Matthieu Schaller's avatar
Matthieu Schaller committed
116
117
 *
 * Tasks are only created here. The dependencies will be added later on.
118
119
120
121
122
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
123
124
void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c,
                                            struct cell *super) {
125
126

  struct scheduler *s = &e->sched;
127
128
129
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity;
130
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;
131
132

  /* Am I the super-cell? */
133
134
135
  /* TODO(pedro): Add a condition for gravity tasks as well. */
  if (super == NULL &&
      (c->density != NULL || (!c->split && (c->count > 0 || c->gcount > 0)))) {
136

137
    /* This is the super cell, i.e. the first with density tasks attached. */
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      /* Add the init task. */
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);

      /* Add the drift task. */
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);

      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
      } else {
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
      }

      if (is_with_external_gravity)
        c->grav_external = scheduler_addtask(
            s, task_type_grav_external, task_subtype_none, 0, 0, c, NULL, 0);
    }
  }

  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_gravity_hierarchical_tasks(e, c->progeny[k], super);
}

/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks.
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
void engine_make_hydro_hierarchical_tasks(struct engine *e, struct cell *c,
                                          struct cell *super) {

  struct scheduler *s = &e->sched;
  const int is_fixdt = (e->policy & engine_policy_fixdt) == engine_policy_fixdt;

  /* Is this the super-cell? */
  if (super == NULL && (c->density != NULL || (c->count > 0 && !c->split))) {
198

199
    /* This is the super cell, i.e. the first with density tasks attached. */
200
201
202
203
204
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

Matthieu Schaller's avatar
Matthieu Schaller committed
205
      /* Add the init task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
206
207
208
      if (c->init == NULL)
        c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0,
                                    c, NULL, 0);
209

Matthieu Schaller's avatar
Matthieu Schaller committed
210
      /* Add the drift task. */
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
211
212
213
      if (c->drift == NULL)
        c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0,
                                     0, c, NULL, 0);
214

215
216
      /* Add the kick task that matches the policy. */
      if (is_fixdt) {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
217
218
219
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick_fixdt,
                                      task_subtype_none, 0, 0, c, NULL, 0);
220
      } else {
Matthieu Schaller's avatar
Be safe    
Matthieu Schaller committed
221
222
223
        if (c->kick == NULL)
          c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0,
                                      0, c, NULL, 0);
224
      }
Tom Theuns's avatar
Tom Theuns committed
225

226
227
228
      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
229
    }
230
  }
231

232
233
234
235
236
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
237
    for (int k = 0; k < 8; k++)
238
      if (c->progeny[k] != NULL)
239
        engine_make_hydro_hierarchical_tasks(e, c->progeny[k], super);
240
}
241

242
/**
243
 * @brief Redistribute the particles amongst the nodes according
244
245
 *      to their cell's node IDs.
 *
246
247
248
249
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
250
251
252
253
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
254
255
 *
 *
256
257
 * @param e The #engine.
 */
258
void engine_redistribute(struct engine *e) {
259

260
#ifdef WITH_MPI
261

262
263
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
264
265
  struct space *s = e->s;
  struct cell *cells = s->cells;
266
  const int nr_cells = s->nr_cells;
267
  const int *cdim = s->cdim;
268
  const double iwidth[3] = {s->iwidth[0], s->iwidth[1], s->iwidth[2]};
269
270
271
272
  const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
  struct part *parts = s->parts;
  struct xpart *xparts = s->xparts;
  struct gpart *gparts = s->gparts;
273
  ticks tic = getticks();
274

275
276
277
  /* Allocate temporary arrays to store the counts of particles to be sent
     and the destination of each particle */
  int *counts, *g_counts;
278
  if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
279
    error("Failed to allocate count temporary buffer.");
280
  if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
281
    error("Failed to allocate gcount temporary buffer.");
282
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
283
284
  bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);

285
  /* Allocate the destination index arrays. */
286
287
  int *dest, *g_dest;
  if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
288
    error("Failed to allocate dest temporary buffer.");
289
  if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL)
290
291
    error("Failed to allocate g_dest temporary buffer.");

292
  /* Get destination of each particle */
293
  for (size_t k = 0; k < s->nr_parts; k++) {
294
295

    /* Periodic boundary conditions */
296
    for (int j = 0; j < 3; j++) {
297
298
299
300
301
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
James Willis's avatar
James Willis committed
302
303
304
    const int cid =
        cell_getid(cdim, parts[k].x[0] * iwidth[0], parts[k].x[1] * iwidth[1],
                   parts[k].x[2] * iwidth[2]);
305
306
307
308
309
310
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            parts[k].x[0], parts[k].x[1], parts[k].x[2]);
#endif

311
    dest[k] = cells[cid].nodeID;
312
313

    /* The counts array is indexed as count[from * nr_nodes + to]. */
314
315
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }
316
317

  /* Sort the particles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
318
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
319

320
  /* We need to re-link the gpart partners of parts. */
321
322
323
324
325
326
  if (s->nr_parts > 0) {
    int current_dest = dest[0];
    size_t count_this_dest = 0;
    for (size_t k = 0; k < s->nr_parts; ++k) {
      if (s->parts[k].gpart != NULL) {

327
328
329
330
331
        /* As the addresses will be invalidated by the communications, we will
         * instead store the absolute index from the start of the sub-array of
         * particles to be sent to a given node.
         * Recall that gparts without partners have a negative id.
         * We will restore the pointers on the receiving node later on. */
332
333
334
335
        if (dest[k] != current_dest) {
          current_dest = dest[k];
          count_this_dest = 0;
        }
336

337
#ifdef SWIFT_DEBUG_CHECKS
Matthieu Schaller's avatar
Matthieu Schaller committed
338
        if (s->parts[k].gpart->id_or_neg_offset >= 0)
339
340
          error("Trying to link a partnerless gpart !");
#endif
341

342
        s->parts[k].gpart->id_or_neg_offset = -count_this_dest;
343
        count_this_dest++;
344
345
346
347
348
      }
    }
  }

  /* Get destination of each g-particle */
349
  for (size_t k = 0; k < s->nr_gparts; k++) {
350
351

    /* Periodic boundary conditions */
352
    for (int j = 0; j < 3; j++) {
353
354
355
356
      if (gparts[k].x[j] < 0.0)
        gparts[k].x[j] += dim[j];
      else if (gparts[k].x[j] >= dim[j])
        gparts[k].x[j] -= dim[j];
357
    }
James Willis's avatar
James Willis committed
358
359
360
    const int cid =
        cell_getid(cdim, gparts[k].x[0] * iwidth[0], gparts[k].x[1] * iwidth[1],
                   gparts[k].x[2] * iwidth[2]);
361
362
363
364
365
366
#ifdef SWIFT_DEBUG_CHECKS
    if (cid < 0 || cid >= s->nr_cells)
      error("Bad cell id %i for part %zi at [%.3e,%.3e,%.3e].", cid, k,
            gparts[k].x[0], gparts[k].x[1], gparts[k].x[2]);
#endif

367
    g_dest[k] = cells[cid].nodeID;
368
369

    /* The counts array is indexed as count[from * nr_nodes + to]. */
370
    g_counts[nodeID * nr_nodes + g_dest[k]] += 1;
371
  }
372
373

  /* Sort the gparticles according to their cell index. */
Matthieu Schaller's avatar
Matthieu Schaller committed
374
  space_gparts_sort(s, g_dest, s->nr_gparts, 0, nr_nodes - 1, e->verbose);
375

376
377
378
379
380
  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

381
382
383
384
385
386
  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Each node knows how many parts and gparts will be transferred to every
387
     other node. We can start preparing to receive data */
388
389
390

  /* Get the new number of parts and gparts for this node */
  size_t nr_parts = 0, nr_gparts = 0;
391
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
392
393
394
395
  for (int k = 0; k < nr_nodes; k++)
    nr_gparts += g_counts[k * nr_nodes + nodeID];

  /* Allocate the new arrays with some extra margin */
396
  struct part *parts_new = NULL;
397
  struct xpart *xparts_new = NULL;
398
  struct gpart *gparts_new = NULL;
399
  if (posix_memalign((void **)&parts_new, part_align,
400
401
                     sizeof(struct part) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
402
    error("Failed to allocate new part data.");
403
404
405
406
407
408
409
410
411
412
  if (posix_memalign((void **)&xparts_new, xpart_align,
                     sizeof(struct xpart) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new xpart data.");
  if (posix_memalign((void **)&gparts_new, gpart_align,
                     sizeof(struct gpart) * nr_gparts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new gpart data.");

  /* Prepare MPI requests for the asynchronous communications */
413
  MPI_Request *reqs;
414
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
415
416
      NULL)
    error("Failed to allocate MPI request list.");
417
418
419
420
421
422
423
424
425
426
427
428
  for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

  /* Emit the sends and recvs for the particle and gparticle data. */
  size_t offset_send = 0, offset_recv = 0;
  size_t g_offset_send = 0, g_offset_recv = 0;
  for (int k = 0; k < nr_nodes; k++) {

    /* Indices in the count arrays of the node of interest */
    const int ind_send = nodeID * nr_nodes + k;
    const int ind_recv = k * nr_nodes + nodeID;

    /* Are we sending any part/xpart ? */
429
    if (counts[ind_send] > 0) {
430

431
      /* message("Sending %d part to node %d", counts[ind_send], k); */
Matthieu Schaller's avatar
Matthieu Schaller committed
432

433
      /* If the send is to the same node, just copy */
434
435
436
437
438
439
440
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];
441
442

        /* Else, emit some communications */
443
      } else {
444
445
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type,
                      k, 3 * ind_send + 0, MPI_COMM_WORLD,
446
447
                      &reqs[6 * k]) != MPI_SUCCESS)
          error("Failed to isend parts to node %i.", k);
448
449
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type,
                      k, 3 * ind_send + 1, MPI_COMM_WORLD,
450
451
                      &reqs[6 * k + 1]) != MPI_SUCCESS)
          error("Failed to isend xparts to node %i.", k);
452
453
        offset_send += counts[ind_send];
      }
454
    }
455
456
457
458

    /* Are we sending any gpart ? */
    if (g_counts[ind_send] > 0) {

459
      /* message("Sending %d gpart to node %d", g_counts[ind_send], k); */
460

461
462
      /* If the send is to the same node, just copy */
      if (k == nodeID) {
463
        memcpy(&gparts_new[g_offset_recv], &s->gparts[g_offset_send],
464
465
466
467
468
469
470
               sizeof(struct gpart) * g_counts[ind_recv]);
        g_offset_send += g_counts[ind_send];
        g_offset_recv += g_counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
471
                      gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
472
473
                      &reqs[6 * k + 2]) != MPI_SUCCESS)
          error("Failed to isend gparts to node %i.", k);
474
        g_offset_send += g_counts[ind_send];
475
476
477
478
479
480
      }
    }

    /* Now emit the corresponding Irecv() */

    /* Are we receiving any part/xpart from this node ? */
481
    if (k != nodeID && counts[ind_recv] > 0) {
482
483
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], part_mpi_type, k,
                    3 * ind_recv + 0, MPI_COMM_WORLD,
484
485
                    &reqs[6 * k + 3]) != MPI_SUCCESS)
        error("Failed to emit irecv of parts from node %i.", k);
486
487
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], xpart_mpi_type,
                    k, 3 * ind_recv + 1, MPI_COMM_WORLD,
488
489
                    &reqs[6 * k + 4]) != MPI_SUCCESS)
        error("Failed to emit irecv of xparts from node %i.", k);
490
      offset_recv += counts[ind_recv];
491
    }
492
493
494
495

    /* Are we receiving any gpart from this node ? */
    if (k != nodeID && g_counts[ind_recv] > 0) {
      if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
496
                    gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
497
498
499
500
                    &reqs[6 * k + 5]) != MPI_SUCCESS)
        error("Failed to emit irecv of gparts from node %i.", k);
      g_offset_recv += g_counts[ind_recv];
    }
501
  }
502

503
  /* Wait for all the sends and recvs to tumble in. */
504
  MPI_Status stats[6 * nr_nodes];
505
  int res;
506
507
  if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
    for (int k = 0; k < 6 * nr_nodes; k++) {
508
509
510
      char buff[MPI_MAX_ERROR_STRING];
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
511
    }
512
513
    error("Failed during waitall for part data.");
  }
514

515
516
517
518
519
520
521
522
523
524
525
526
  /* We now need to restore the part<->gpart links */
  size_t offset_parts = 0, offset_gparts = 0;
  for (int node = 0; node < nr_nodes; ++node) {

    const int ind_recv = node * nr_nodes + nodeID;
    const size_t count_parts = counts[ind_recv];
    const size_t count_gparts = g_counts[ind_recv];

    /* Loop over the gparts received from that node */
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) {

      /* Does this gpart have a partner ? */
527
      if (gparts_new[k].id_or_neg_offset <= 0) {
528

Matthieu Schaller's avatar
Style    
Matthieu Schaller committed
529
530
        const ptrdiff_t partner_index =
            offset_parts - gparts_new[k].id_or_neg_offset;
531
532

        /* Re-link */
533
        gparts_new[k].id_or_neg_offset = -partner_index;
Pedro Gonnet's avatar
Pedro Gonnet committed
534
        parts_new[partner_index].gpart = &gparts_new[k];
535
536
537
538
539
540
541
      }
    }

    offset_parts += count_parts;
    offset_gparts += count_gparts;
  }

542
#ifdef SWIFT_DEBUG_CHECKS
543
  /* Verify that all parts are in the right place. */
544
  for (int k = 0; k < nr_parts; k++) {
545
    int cid = cell_getid(cdim, parts_new[k].x[0] * iwidth[0],
James Willis's avatar
James Willis committed
546
547
                         parts_new[k].x[1] * iwidth[1],
                         parts_new[k].x[2] * iwidth[2]);
548
549
550
551
    if (cells[cid].nodeID != nodeID)
      error("Received particle (%i) that does not belong here (nodeID=%i).", k,
            cells[cid].nodeID);
  }
552

553
  /* Verify that the links are correct */
554
  for (size_t k = 0; k < nr_gparts; ++k) {
555

556
    if (gparts_new[k].id_or_neg_offset <= 0) {
557

558
      struct part *part = &parts_new[-gparts_new[k].id_or_neg_offset];
559

560
      if (part->gpart != &gparts_new[k]) error("Linking problem !");
561

562
      if (gparts_new[k].x[0] != part->x[0] ||
563
          gparts_new[k].x[1] != part->x[1] || gparts_new[k].x[2] != part->x[2])
564
565
566
567
        error("Linked particles are not at the same position !");
    }
  }
  for (size_t k = 0; k < nr_parts; ++k) {
Matthieu Schaller's avatar
Matthieu Schaller committed
568

569
570
    if (parts_new[k].gpart != NULL &&
        parts_new[k].gpart->id_or_neg_offset != -k) {
571
      error("Linking problem !");
572
573
574
    }
  }
#endif
575
576
577
578

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
579
  free(gparts);
580
581
  s->parts = parts_new;
  s->xparts = xparts_new;
582
  s->gparts = gparts_new;
583
  s->nr_parts = nr_parts;
584
585
586
  s->nr_gparts = nr_gparts;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;
587

588
  /* Clean up the temporary stuff. */
589
590
591
592
  free(reqs);
  free(counts);
  free(dest);

593
594
595
596
597
  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
598
599
    message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID,
            nr_parts, nr_gparts, my_cells);
600
601
  }

602
603
604
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
605
#else
606
  error("SWIFT was not compiled with MPI support.");
607
608
#endif
}
609

610
/**
611
 * @brief Repartition the cells amongst the nodes.
612
613
614
 *
 * @param e The #engine.
 */
615
void engine_repartition(struct engine *e) {
616
617
618

#if defined(WITH_MPI) && defined(HAVE_METIS)

619
620
  ticks tic = getticks();

621
  /* Clear the repartition flag. */
622
  enum repartition_type reparttype = e->forcerepart;
623
  e->forcerepart = REPART_NONE;
624

625
626
  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
627
  if (e->nr_nodes == 1) return;
628

629
630
631
  /* Do the repartitioning. */
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);
632
633
634
635

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
636
     the parts array, and emitting the sends and receives.
637
638
639
640
641
642
643
644
645
646
647
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

648
649
650
  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
651
#else
652
  error("SWIFT was not compiled with MPI and METIS support.");
653
#endif
654
}
655

656
657
658
659
660
661
662
663
/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
 */
664
665
void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {
666

667
668
669
670
671
672
673
674
675
676
  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}
677

678
679
680
681
/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
682
 * @param ci The sending #cell.
683
684
685
 * @param cj Dummy cell containing the nodeID of the receiving node.
 * @param t_xv The send_xv #task, if it has already been created.
 * @param t_rho The send_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
686
 * @param t_ti The send_ti #task, if required and has already been created.
687
 */
688
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj,
689
690
                          struct task *t_xv, struct task *t_rho,
                          struct task *t_ti) {
691

692
#ifdef WITH_MPI
693
694
  struct link *l = NULL;
  struct scheduler *s = &e->sched;
695
  const int nodeID = cj->nodeID;
696

697
698
  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
699
700
    if (l->t->ci->nodeID == nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == nodeID))
701
      break;
702

703
704
  /* If so, attach send tasks. */
  if (l != NULL) {
705

706
707
708
    /* Create the tasks and their dependencies? */
    if (t_xv == NULL) {
      t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
709
                               3 * ci->tag, 0, ci, cj, 0);
710
      t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
711
                                3 * ci->tag + 1, 0, ci, cj, 0);
712
      if (!(e->policy & engine_policy_fixdt))
713
        t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend,
714
                                 3 * ci->tag + 2, 0, ci, cj, 0);
715

716
717
      /* The send_rho task depends on the cell's ghost task. */
      scheduler_addunlock(s, ci->super->ghost, t_rho);
718

719
720
      /* The send_rho task should unlock the super-cell's kick task. */
      scheduler_addunlock(s, t_rho, ci->super->kick);
721

722
723
      /* The send_xv task should unlock the super-cell's ghost task. */
      scheduler_addunlock(s, t_xv, ci->super->ghost);
724

725
      /* The super-cell's kick task should unlock the send_ti task. */
726
      if (t_ti != NULL) scheduler_addunlock(s, ci->super->kick, t_ti);
727
    }
728

729
730
731
    /* Add them to the local cell. */
    ci->send_xv = engine_addlink(e, ci->send_xv, t_xv);
    ci->send_rho = engine_addlink(e, ci->send_rho, t_rho);
732
    if (t_ti != NULL) ci->send_ti = engine_addlink(e, ci->send_ti, t_ti);
733
  }
734

735
  /* Recurse? */
736
  if (ci->split)
737
    for (int k = 0; k < 8; k++)
738
      if (ci->progeny[k] != NULL)
739
        engine_addtasks_send(e, ci->progeny[k], cj, t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
740
741
742
743

#else
  error("SWIFT was not compiled with MPI support.");
#endif
744
}
745
746
747
748
749

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
750
 * @param c The foreign #cell.
751
752
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
Peter W. Draper's avatar
Peter W. Draper committed
753
 * @param t_ti The recv_ti #task, if required and has already been created.
754
 */
755
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
756
                          struct task *t_rho, struct task *t_ti) {
757

758
#ifdef WITH_MPI
759
  struct scheduler *s = &e->sched;
760

761
762
763
764
  /* Do we need to construct a recv task?
     Note that since c is a foreign cell, all its density tasks will involve
     only the current rank, and thus we don't have to check them.*/
  if (t_xv == NULL && c->density != NULL) {
765

766
    /* Create the tasks. */
767
    t_xv = scheduler_addtask(s, task_type_recv, task_subtype_none, 3 * c->tag,
768
769
                             0, c, NULL, 0);
    t_rho = scheduler_addtask(s, task_type_recv, task_subtype_none,
770
                              3 * c->tag + 1, 0, c, NULL, 0);
771
    if (!(e->policy & engine_policy_fixdt))
772
      t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend,
773
                               3 * c->tag + 2, 0, c, NULL, 0);
774
  }
775
776
  c->recv_xv = t_xv;
  c->recv_rho = t_rho;
777
  c->recv_ti = t_ti;
778

779
780
781
782
783
  /* Add dependencies. */
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
784
  for (struct link *l = c->force; l != NULL; l = l->next) {
785
    scheduler_addunlock(s, t_rho, l->t);
786
    if (t_ti != NULL) scheduler_addunlock(s, l->t, t_ti);
787
  }
788
789
790
791
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);

  /* Recurse? */
  if (c->split)
792
    for (int k = 0; k < 8; k++)
793
      if (c->progeny[k] != NULL)
794
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho, t_ti);
Matthieu Schaller's avatar
Matthieu Schaller committed
795
796
797
798

#else
  error("SWIFT was not compiled with MPI support.");
#endif
799
}
800
801
802
803
804
805

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
806
void engine_exchange_cells(struct engine *e) {
807
808
809

#ifdef WITH_MPI

810
811
  struct space *s = e->s;
  struct cell *cells = s->cells;
812
813
  const int nr_cells = s->nr_cells;
  const int nr_proxies = e->nr_proxies;
814
815
816
817
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;
818
  ticks tic = getticks();
819
820
821

  /* Run through the cells and get the size of the ones that will be sent off.
   */
822
823
824
  int count_out = 0;
  for (int k = 0; k < nr_cells; k++) {
    offset[k] = count_out;
825
    if (cells[k].sendto)
826
      count_out += (cells[k].pcell_size = cell_getsize(&cells[k]));
827
  }
828

829
  /* Allocate the pcells. */
830
  struct pcell *pcells;
831
832
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count_out)) ==
      NULL)
833
834
835
836
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
837
  for (int k = 0; k < nr_cells; k++)
838
839
840
841
842
843
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
844
  for (int k = 0; k < nr_proxies; k++) {
845
846
847
848
849
850
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
851
  for (int k = 0; k < nr_proxies; k++) {
852
    int pid = MPI_UNDEFINED;
853
854
855
856
857
858
859
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_cells_exch2(&e->proxies[pid]);
  }

860
  /* Wait for all the sends to have finished too. */
861
862
863
864
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Set the requests for the cells. */
865
  for (int k = 0; k < nr_proxies; k++) {
866
867
868
869
870
    reqs_in[k] = e->proxies[k].req_cells_in;
    reqs_out[k] = e->proxies[k].req_cells_out;
  }

  /* Wait for each pcell array to come in from the proxies. */
871
  for (int k = 0; k < nr_proxies; k++) {
872
    int pid = MPI_UNDEFINED;
873
874
875
876
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "cell data from proxy %i has arrived." , pid );
877
    for (int count = 0, j = 0; j < e->proxies[pid].nr_cells_in; j++)
878
879
880
881
      count += cell_unpack(&e->proxies[pid].pcells_in[count],
                           e->proxies[pid].cells_in[j], e->s);
  }

882
  /* Wait for all the sends to have finished too. */
883
884
885
886
887
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Count the number of particles we need to import and re-allocate
     the buffer if needed. */
888
  int count_parts_in = 0, count_gparts_in = 0;
889
  for (int k = 0; k < nr_proxies; k++)
890
891
892
893
894
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
      count_parts_in += e->proxies[k].cells_in[j]->count;
      count_gparts_in += e->proxies[k].cells_in[j]->gcount;
    }
  if (count_parts_in > s->size_parts_foreign) {
895
    if (s->parts_foreign != NULL) free(s->parts_foreign);
896
    s->size_parts_foreign = 1.1 * count_parts_in;
897
898
899
900
    if (posix_memalign((void **)&s->parts_foreign, part_align,
                       sizeof(struct part) * s->size_parts_foreign) != 0)
      error("Failed to allocate foreign part data.");
  }
901
902
903
904
905
906
907
  if (count_gparts_in > s->size_gparts_foreign) {
    if (s->gparts_foreign != NULL) free(s->gparts_foreign);
    s->size_gparts_foreign = 1.1 * count_gparts_in;
    if (posix_memalign((void **)&s->gparts_foreign, gpart_align,
                       sizeof(struct gpart) * s->size_gparts_foreign) != 0)
      error("Failed to allocate foreign gpart data.");
  }
908

909
  /* Unpack the cells and link to the particle data. */
910
  struct part *parts = s->parts_foreign;
911
  struct gpart *gparts = s->gparts_foreign;
912
913
  for (int k = 0; k < nr_proxies; k++) {
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
914
      cell_link_parts(e->proxies[k].cells_in[j], parts);
915
      cell_link_gparts(e->proxies[k].cells_in[j], gparts);
916
      parts = &parts[e->proxies[k].cells_in[j]->count];