engine.c 267 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
48
49
50
51
/* Load the profiler header, if needed. */
#ifdef WITH_PROFILER
#include <gperftools/profiler.h>
#endif

52
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
53
#include "engine.h"
54
55

/* Local headers. */
56
#include "active.h"
57
#include "atomic.h"
58
#include "cell.h"
59
#include "chemistry.h"
60
#include "clocks.h"
61
#include "cooling.h"
62
#include "cosmology.h"
63
64
#include "cycle.h"
#include "debug.h"
65
#include "equation_of_state.h"
66
#include "error.h"
67
#include "gravity.h"
68
#include "gravity_cache.h"
69
#include "hydro.h"
lhausamm's avatar
lhausamm committed
70
#include "logger.h"
lhausamm's avatar
lhausamm committed
71
#include "logger_io.h"
72
#include "map.h"
73
#include "memswap.h"
74
#include "minmax.h"
75
#include "outputlist.h"
76
#include "parallel_io.h"
77
#include "part.h"
78
#include "partition.h"
James Willis's avatar
James Willis committed
79
#include "profiler.h"
80
#include "proxy.h"
81
#include "restart.h"
82
#include "runner.h"
83
84
#include "serial_io.h"
#include "single_io.h"
85
#include "sort_part.h"
86
#include "sourceterms.h"
Loic Hausammann's avatar
Loic Hausammann committed
87
#include "stars_io.h"
88
#include "statistics.h"
89
#include "timers.h"
90
#include "tools.h"
91
#include "units.h"
92
#include "velociraptor_interface.h"
Matthieu Schaller's avatar
Matthieu Schaller committed
93
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
94

95
96
97
/* Particle cache size. */
#define CACHE_SIZE 512

98
99
100
101
102
const char *engine_policy_names[] = {"none",
                                     "rand",
                                     "steal",
                                     "keep",
                                     "block",
103
                                     "cpu tight",
104
                                     "mpi",
105
                                     "numa affinity",
106
                                     "hydro",
107
108
109
110
111
                                     "self gravity",
                                     "external gravity",
                                     "cosmological integration",
                                     "drift everything",
                                     "reconstruct multi-poles",
112
113
                                     "cooling",
                                     "sourceterms",
James Willis's avatar
James Willis committed
114
                                     "stars",
Loic Hausammann's avatar
Loic Hausammann committed
115
                                     "structure finding",
116
                                     "star formation",
117
                                     "feedback"};
Pedro Gonnet's avatar
Pedro Gonnet committed
118

119
120
121
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

122
123
124
125
126
/**
 * @brief Data collected from the cells at the end of a time-step
 */
struct end_of_step_data {

127
128
  size_t updated, g_updated, s_updated;
  size_t inhibited, g_inhibited, s_inhibited;
129
130
  integertime_t ti_hydro_end_min, ti_hydro_end_max, ti_hydro_beg_max;
  integertime_t ti_gravity_end_min, ti_gravity_end_max, ti_gravity_beg_max;
131
132
133
  struct engine *e;
};

134
135
136
137
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
138
 * @param l A pointer to the #link, will be modified atomically.
139
140
141
142
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
143
void engine_addlink(struct engine *e, struct link **l, struct task *t) {
144

145
  /* Get the next free link. */
146
  const size_t ind = atomic_inc(&e->nr_links);
147
148
149
150
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
151

152
  /* Set it atomically. */
153
  res->t = t;
154
  res->next = atomic_swap(l, res);
155
}
156

Loic Hausammann's avatar
Loic Hausammann committed
157
158
159
/**
 * @brief Recursively add non-implicit star ghost tasks to a cell hierarchy.
 */
160
161
162
void engine_add_stars_ghosts(struct engine *e, struct cell *c,
                             struct task *stars_ghost_in,
                             struct task *stars_ghost_out) {
Loic Hausammann's avatar
Loic Hausammann committed
163
164

  /* If we have reached the leaf OR have to few particles to play with*/
165
  if (!c->split || c->stars.count < engine_max_sparts_per_ghost) {
Loic Hausammann's avatar
Loic Hausammann committed
166
167
168

    /* Add the ghost task and its dependencies */
    struct scheduler *s = &e->sched;
169
    c->stars.ghost = scheduler_addtask(s, task_type_stars_ghost,
170
                                       task_subtype_none, 0, 0, c, NULL);
171
172
    scheduler_addunlock(s, stars_ghost_in, c->stars.ghost);
    scheduler_addunlock(s, c->stars.ghost, stars_ghost_out);
Loic Hausammann's avatar
Loic Hausammann committed
173
174
175
176
  } else {
    /* Keep recursing */
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
177
178
        engine_add_stars_ghosts(e, c->progeny[k], stars_ghost_in,
                                stars_ghost_out);
Loic Hausammann's avatar
Loic Hausammann committed
179
180
181
  }
}

182
183
184
185
186
/**
 * @brief Recursively add non-implicit ghost tasks to a cell hierarchy.
 */
void engine_add_ghosts(struct engine *e, struct cell *c, struct task *ghost_in,
                       struct task *ghost_out) {
187
188

  /* If we have reached the leaf OR have to few particles to play with*/
189
  if (!c->split || c->hydro.count < engine_max_parts_per_ghost) {
190
191

    /* Add the ghost task and its dependencies */
192
    struct scheduler *s = &e->sched;
193
    c->hydro.ghost =
194
        scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0, c, NULL);
195
196
    scheduler_addunlock(s, ghost_in, c->hydro.ghost);
    scheduler_addunlock(s, c->hydro.ghost, ghost_out);
197
  } else {
198
    /* Keep recursing */
199
200
201
202
203
204
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_add_ghosts(e, c->progeny[k], ghost_in, ghost_out);
  }
}

205
206
/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
207
 * i.e. all the O(Npart) tasks -- timestep version
208
209
210
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
211
212
 * Note that there is no need to recurse below the super-cell. Note also
 * that we only add tasks if the relevant particles are present in the cell.
213
 *
214
215
216
 * @param e The #engine.
 * @param c The #cell.
 */
217
void engine_make_hierarchical_tasks_common(struct engine *e, struct cell *c) {
218
219

  struct scheduler *s = &e->sched;
220
  const int is_with_cooling = (e->policy & engine_policy_cooling);
Matthieu Schaller's avatar
Matthieu Schaller committed
221
  const int is_with_star_formation = (e->policy & engine_policy_star_formation);
222

223
  /* Are we in a super-cell ? */
224
  if (c->super == c) {
225
226
227
228

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

229
230
      /* Add the two half kicks */
      c->kick1 = scheduler_addtask(s, task_type_kick1, task_subtype_none, 0, 0,
231
                                   c, NULL);
232

lhausamm's avatar
lhausamm committed
233
#if defined(WITH_LOGGER)
Loic Hausammann's avatar
Format    
Loic Hausammann committed
234
235
      c->logger = scheduler_addtask(s, task_type_logger, task_subtype_none, 0,
                                    0, c, NULL);
lhausamm's avatar
lhausamm committed
236
#endif
Loikki's avatar
Loikki committed
237

238
      c->kick2 = scheduler_addtask(s, task_type_kick2, task_subtype_none, 0, 0,
239
                                   c, NULL);
Tom Theuns's avatar
Tom Theuns committed
240

241
242
      /* Add the time-step calculation task and its dependency */
      c->timestep = scheduler_addtask(s, task_type_timestep, task_subtype_none,
243
                                      0, 0, c, NULL);
244

245
      /* Add the task finishing the force calculation */
246
247
      c->end_force = scheduler_addtask(s, task_type_end_force,
                                       task_subtype_none, 0, 0, c, NULL);
248

249
250
      if (is_with_cooling) {

251
252
        c->hydro.cooling = scheduler_addtask(s, task_type_cooling,
                                             task_subtype_none, 0, 0, c, NULL);
253

254
        scheduler_addunlock(s, c->end_force, c->hydro.cooling);
255
        scheduler_addunlock(s, c->hydro.cooling, c->kick2);
256
257

      } else {
258
        scheduler_addunlock(s, c->end_force, c->kick2);
259
      }
Matthieu Schaller's avatar
Matthieu Schaller committed
260
261
262
263
264
265
266
267
268
269
270
271

      if (is_with_star_formation) {

        c->hydro.star_formation = scheduler_addtask(
            s, task_type_star_formation, task_subtype_none, 0, 0, c, NULL);

        scheduler_addunlock(s, c->kick2, c->hydro.star_formation);
        scheduler_addunlock(s, c->hydro.star_formation, c->timestep);

      } else {
        scheduler_addunlock(s, c->kick2, c->timestep);
      }
272
      scheduler_addunlock(s, c->timestep, c->kick1);
lhausamm's avatar
lhausamm committed
273
274
275

#if defined(WITH_LOGGER)
      scheduler_addunlock(s, c->kick1, c->logger);
lhausamm's avatar
lhausamm committed
276
#endif
Loic Hausammann's avatar
Format    
Loic Hausammann committed
277
    }
278

279
  } else { /* We are above the super-cell so need to go deeper */
280

281
282
283
284
285
286
287
    /* Recurse. */
    if (c->split)
      for (int k = 0; k < 8; k++)
        if (c->progeny[k] != NULL)
          engine_make_hierarchical_tasks_common(e, c->progeny[k]);
  }
}
288

289
290
/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
291
 * i.e. all the O(Npart) tasks -- hydro version
292
293
294
295
296
297
298
299
300
301
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * Note that there is no need to recurse below the super-cell. Note also
 * that we only add tasks if the relevant particles are present in the cell.
 *
 * @param e The #engine.
 * @param c The #cell.
 */
void engine_make_hierarchical_tasks_hydro(struct engine *e, struct cell *c) {
302

303
304
  struct scheduler *s = &e->sched;
  const int is_with_sourceterms = (e->policy & engine_policy_sourceterms);
305

306
  /* Are we in a super-cell ? */
307
  if (c->hydro.super == c) {
308

309
    /* Add the sort task. */
310
    c->hydro.sorts =
311
        scheduler_addtask(s, task_type_sort, task_subtype_none, 0, 0, c, NULL);
312

313
314
    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {
315

316
      /* Add the drift task. */
317
318
      c->hydro.drift = scheduler_addtask(s, task_type_drift_part,
                                         task_subtype_none, 0, 0, c, NULL);
319

320
      /* Generate the ghost tasks. */
321
      c->hydro.ghost_in =
322
323
          scheduler_addtask(s, task_type_ghost_in, task_subtype_none, 0,
                            /* implicit = */ 1, c, NULL);
324
      c->hydro.ghost_out =
325
326
          scheduler_addtask(s, task_type_ghost_out, task_subtype_none, 0,
                            /* implicit = */ 1, c, NULL);
327
      engine_add_ghosts(e, c, c->hydro.ghost_in, c->hydro.ghost_out);
328
329

#ifdef EXTRA_HYDRO_LOOP
330
      /* Generate the extra ghost task. */
331
332
      c->hydro.extra_ghost = scheduler_addtask(
          s, task_type_extra_ghost, task_subtype_none, 0, 0, c, NULL);
333
#endif
334

335
      /* add source terms */
336
      if (is_with_sourceterms) {
337
        c->sourceterms = scheduler_addtask(s, task_type_sourceterms,
338
                                           task_subtype_none, 0, 0, c, NULL);
339
      }
340
341
    }

342
  } else { /* We are above the super-cell so need to go deeper */
343

344
345
346
347
348
349
350
351
    /* Recurse. */
    if (c->split)
      for (int k = 0; k < 8; k++)
        if (c->progeny[k] != NULL)
          engine_make_hierarchical_tasks_hydro(e, c->progeny[k]);
  }
}

352
353
354
355
356
357
358
359
360
361
362
363
/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks -- gravity version
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * Note that there is no need to recurse below the super-cell. Note also
 * that we only add tasks if the relevant particles are present in the cell.
 *
 * @param e The #engine.
 * @param c The #cell.
 */
364
365
366
367
368
369
370
void engine_make_hierarchical_tasks_gravity(struct engine *e, struct cell *c) {

  struct scheduler *s = &e->sched;
  const int periodic = e->s->periodic;
  const int is_self_gravity = (e->policy & engine_policy_self_gravity);

  /* Are we in a super-cell ? */
371
  if (c->grav.super == c) {
372
373
374
375

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

376
377
      c->grav.drift = scheduler_addtask(s, task_type_drift_gpart,
                                        task_subtype_none, 0, 0, c, NULL);
378
379

      if (is_self_gravity) {
380

381
        /* Initialisation of the multipoles */
382
        c->grav.init = scheduler_addtask(s, task_type_init_grav,
383
384
385
                                         task_subtype_none, 0, 0, c, NULL);

        /* Gravity non-neighbouring pm calculations */
386
        c->grav.long_range = scheduler_addtask(
387
388
389
            s, task_type_grav_long_range, task_subtype_none, 0, 0, c, NULL);

        /* Gravity recursive down-pass */
390
        c->grav.down = scheduler_addtask(s, task_type_grav_down,
391
392
                                         task_subtype_none, 0, 0, c, NULL);

393
        /* Implicit tasks for the up and down passes */
394
        c->grav.init_out = scheduler_addtask(s, task_type_init_grav_out,
395
                                             task_subtype_none, 0, 1, c, NULL);
396
        c->grav.down_in = scheduler_addtask(s, task_type_grav_down_in,
397
398
                                            task_subtype_none, 0, 1, c, NULL);

399
400
        /* Gravity mesh force propagation */
        if (periodic)
401
          c->grav.mesh = scheduler_addtask(s, task_type_grav_mesh,
402
403
                                           task_subtype_none, 0, 0, c, NULL);

404
        if (periodic) scheduler_addunlock(s, c->grav.drift, c->grav.mesh);
405
406
407
        if (periodic) scheduler_addunlock(s, c->grav.mesh, c->grav.down);
        scheduler_addunlock(s, c->grav.init, c->grav.long_range);
        scheduler_addunlock(s, c->grav.long_range, c->grav.down);
408
        scheduler_addunlock(s, c->grav.down, c->super->end_force);
409

Matthieu Schaller's avatar
Matthieu Schaller committed
410
        /* Link in the implicit tasks */
411
412
        scheduler_addunlock(s, c->grav.init, c->grav.init_out);
        scheduler_addunlock(s, c->grav.down_in, c->grav.down);
413
414
      }
    }
415
  }
416

417
  /* We are below the super-cell but not below the maximal splitting depth */
418
  else if (c->grav.super != NULL && c->depth < space_subdepth_grav) {
419
420
421
422
423
424

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      if (is_self_gravity) {

425
        c->grav.init_out = scheduler_addtask(s, task_type_init_grav_out,
426
427
                                             task_subtype_none, 0, 1, c, NULL);

428
        c->grav.down_in = scheduler_addtask(s, task_type_grav_down_in,
429
430
                                            task_subtype_none, 0, 1, c, NULL);

431
432
        scheduler_addunlock(s, c->parent->grav.init_out, c->grav.init_out);
        scheduler_addunlock(s, c->grav.down_in, c->parent->grav.down_in);
433
434
      }
    }
435
  }
436

437
438
  /* Recurse but not below the maximal splitting depth */
  if (c->split && c->depth <= space_subdepth_grav)
439
440
441
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_hierarchical_tasks_gravity(e, c->progeny[k]);
442
}
443

Loic Hausammann's avatar
Loic Hausammann committed
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
/**
 * @brief Generate the stars hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks -- star version
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * Note that there is no need to recurse below the super-cell. Note also
 * that we only add tasks if the relevant particles are present in the cell.
 *
 * @param e The #engine.
 * @param c The #cell.
 */
void engine_make_hierarchical_tasks_stars(struct engine *e, struct cell *c) {

  struct scheduler *s = &e->sched;

  /* Are we in a super-cell ? */
Loic Hausammann's avatar
Loic Hausammann committed
461
  if (c->super == c) {
Loic Hausammann's avatar
Loic Hausammann committed
462
463
464
465
466

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      /* Generate the ghost tasks. */
467
      c->stars.ghost_in =
Loic Hausammann's avatar
Loic Hausammann committed
468
          scheduler_addtask(s, task_type_stars_ghost_in, task_subtype_none, 0,
Loic Hausammann's avatar
Loic Hausammann committed
469
                            /* implicit = */ 1, c, NULL);
470
      c->stars.ghost_out =
Loic Hausammann's avatar
Loic Hausammann committed
471
          scheduler_addtask(s, task_type_stars_ghost_out, task_subtype_none, 0,
Loic Hausammann's avatar
Loic Hausammann committed
472
                            /* implicit = */ 1, c, NULL);
473
      engine_add_stars_ghosts(e, c, c->stars.ghost_in, c->stars.ghost_out);
474
475
    }
  } else { /* We are above the super-cell so need to go deeper */
Loic Hausammann's avatar
Loic Hausammann committed
476
477
478
479
480
481
482
483
484

    /* Recurse. */
    if (c->split)
      for (int k = 0; k < 8; k++)
        if (c->progeny[k] != NULL)
          engine_make_hierarchical_tasks_stars(e, c->progeny[k]);
  }
}

485
486
487
void engine_make_hierarchical_tasks_mapper(void *map_data, int num_elements,
                                           void *extra_data) {
  struct engine *e = (struct engine *)extra_data;
488
489
  const int is_with_hydro = (e->policy & engine_policy_hydro);
  const int is_with_self_gravity = (e->policy & engine_policy_self_gravity);
490
491
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity);
Loic Hausammann's avatar
Loic Hausammann committed
492
  const int is_with_feedback = (e->policy & engine_policy_feedback);
493
494
495

  for (int ind = 0; ind < num_elements; ind++) {
    struct cell *c = &((struct cell *)map_data)[ind];
496
497
498
    /* Make the common tasks (time integration) */
    engine_make_hierarchical_tasks_common(e, c);
    /* Add the hydro stuff */
499
    if (is_with_hydro) engine_make_hierarchical_tasks_hydro(e, c);
500
    /* And the gravity stuff */
501
    if (is_with_self_gravity || is_with_external_gravity)
502
      engine_make_hierarchical_tasks_gravity(e, c);
Loic Hausammann's avatar
Loic Hausammann committed
503
    if (is_with_feedback) engine_make_hierarchical_tasks_stars(e, c);
504
505
506
  }
}

507
#ifdef WITH_MPI
508
/**
Peter W. Draper's avatar
Peter W. Draper committed
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
 * Do the exchange of one type of particles with all the other nodes.
 *
 * @param counts 2D array with the counts of particles to exchange with
 *               each other node.
 * @param parts the particle data to exchange
 * @param new_nr_parts the number of particles this node will have after all
 *                     exchanges have completed.
 * @param sizeofparts sizeof the particle struct.
 * @param alignsize the memory alignment required for this particle type.
 * @param mpi_type the MPI_Datatype for these particles.
 * @param nr_nodes the number of nodes to exchange with.
 * @param nodeID the id of this node.
 *
 * @result new particle data constructed from all the exchanges with the
 *         given alignment.
524
 */
525
static void *engine_do_redistribute(int *counts, char *parts,
526
527
                                    size_t new_nr_parts, size_t sizeofparts,
                                    size_t alignsize, MPI_Datatype mpi_type,
528
                                    int nr_nodes, int nodeID) {
529
530

  /* Allocate a new particle array with some extra margin */
531
  char *parts_new = NULL;
532
533
  if (posix_memalign(
          (void **)&parts_new, alignsize,
534
          sizeofparts * new_nr_parts * engine_redistribute_alloc_margin) != 0)
535
536
537
538
    error("Failed to allocate new particle data.");

  /* Prepare MPI requests for the asynchronous communications */
  MPI_Request *reqs;
539
540
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 2 * nr_nodes)) ==
      NULL)
541
542
    error("Failed to allocate MPI request list.");

543
  /* Only send and receive only "chunk" particles per request. So we need to
544
545
546
   * loop as many times as necessary here. Make 2Gb/sizeofparts so we only
   * send 2Gb packets. */
  const int chunk = INT_MAX / sizeofparts;
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
  int sent = 0;
  int recvd = 0;

  int activenodes = 1;
  while (activenodes) {

    for (int k = 0; k < 2 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

    /* Emit the sends and recvs for the data. */
    size_t offset_send = sent;
    size_t offset_recv = recvd;
    activenodes = 0;

    for (int k = 0; k < nr_nodes; k++) {

      /* Indices in the count arrays of the node of interest */
      const int ind_send = nodeID * nr_nodes + k;
      const int ind_recv = k * nr_nodes + nodeID;

      /* Are we sending any data this loop? */
      int sending = counts[ind_send] - sent;
      if (sending > 0) {
        activenodes++;
570
        if (sending > chunk) sending = chunk;
571
572
573
574

        /* If the send and receive is local then just copy. */
        if (k == nodeID) {
          int receiving = counts[ind_recv] - recvd;
575
          if (receiving > chunk) receiving = chunk;
576
          memcpy(&parts_new[offset_recv * sizeofparts],
577
                 &parts[offset_send * sizeofparts], sizeofparts * receiving);
578
579
        } else {
          /* Otherwise send it. */
580
581
582
          int res =
              MPI_Isend(&parts[offset_send * sizeofparts], sending, mpi_type, k,
                        ind_send, MPI_COMM_WORLD, &reqs[2 * k + 0]);
583
584
585
586
          if (res != MPI_SUCCESS)
            mpi_error(res, "Failed to isend parts to node %i.", k);
        }
      }
587

588
      /* If we're sending to this node, then move past it to next. */
589
      if (counts[ind_send] > 0) offset_send += counts[ind_send];
590

591
592
593
594
595
596
      /* Are we receiving any data from this node? Note already done if coming
       * from this node. */
      if (k != nodeID) {
        int receiving = counts[ind_recv] - recvd;
        if (receiving > 0) {
          activenodes++;
597
          if (receiving > chunk) receiving = chunk;
598
599
600
601
602
603
          int res = MPI_Irecv(&parts_new[offset_recv * sizeofparts], receiving,
                              mpi_type, k, ind_recv, MPI_COMM_WORLD,
                              &reqs[2 * k + 1]);
          if (res != MPI_SUCCESS)
            mpi_error(res, "Failed to emit irecv of parts from node %i.", k);
        }
604
605
      }

606
      /* If we're receiving from this node, then move past it to next. */
607
      if (counts[ind_recv] > 0) offset_recv += counts[ind_recv];
608
609
    }

610
611
612
613
614
615
616
    /* Wait for all the sends and recvs to tumble in. */
    MPI_Status stats[2 * nr_nodes];
    int res;
    if ((res = MPI_Waitall(2 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
      for (int k = 0; k < 2 * nr_nodes; k++) {
        char buff[MPI_MAX_ERROR_STRING];
        MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
617
618
        message("request from source %i, tag %i has error '%s'.",
                stats[k].MPI_SOURCE, stats[k].MPI_TAG, buff);
619
620
      }
      error("Failed during waitall for part data.");
621
    }
622
623
624
625

    /* Move to next chunks. */
    sent += chunk;
    recvd += chunk;
626
627
628
629
630
631
632
633
634
635
  }

  /* Free temps. */
  free(reqs);

  /* And return new memory. */
  return parts_new;
}
#endif

636
#ifdef WITH_MPI /* redist_mapper */
637

638
/* Support for engine_redistribute threadpool dest mappers. */
639
struct redist_mapper_data {
640
  int *counts;
641
642
643
644
645
  int *dest;
  int nodeID;
  int nr_nodes;
  struct cell *cells;
  struct space *s;
646
647
648
  void *base;
};

649
650
651
/* Generic function for accumulating counts for TYPE parts. Note
 * we use a local counts array to avoid the atomic_add in the parts
 * loop. */
Peter W. Draper's avatar
Peter W. Draper committed
652
#define ENGINE_REDISTRIBUTE_DEST_MAPPER(TYPE)                              \
653
654
655
  engine_redistribute_dest_mapper_##TYPE(void *map_data, int num_elements, \
                                         void *extra_data) {               \
    struct TYPE *parts = (struct TYPE *)map_data;                          \
656
657
    struct redist_mapper_data *mydata =                                    \
        (struct redist_mapper_data *)extra_data;                           \
658
659
660
661
    struct space *s = mydata->s;                                           \
    int *dest =                                                            \
        mydata->dest + (ptrdiff_t)(parts - (struct TYPE *)mydata->base);   \
    int *lcounts = NULL;                                                   \
662
663
    if ((lcounts = (int *)calloc(                                          \
             sizeof(int), mydata->nr_nodes * mydata->nr_nodes)) == NULL)   \
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
      error("Failed to allocate counts thread-specific buffer");           \
    for (int k = 0; k < num_elements; k++) {                               \
      for (int j = 0; j < 3; j++) {                                        \
        if (parts[k].x[j] < 0.0)                                           \
          parts[k].x[j] += s->dim[j];                                      \
        else if (parts[k].x[j] >= s->dim[j])                               \
          parts[k].x[j] -= s->dim[j];                                      \
      }                                                                    \
      const int cid = cell_getid(s->cdim, parts[k].x[0] * s->iwidth[0],    \
                                 parts[k].x[1] * s->iwidth[1],             \
                                 parts[k].x[2] * s->iwidth[2]);            \
      dest[k] = s->cells_top[cid].nodeID;                                  \
      size_t ind = mydata->nodeID * mydata->nr_nodes + dest[k];            \
      lcounts[ind] += 1;                                                   \
    }                                                                      \
    for (int k = 0; k < (mydata->nr_nodes * mydata->nr_nodes); k++)        \
      atomic_add(&mydata->counts[k], lcounts[k]);                          \
    free(lcounts);                                                         \
  }
683

684
685
/**
 * @brief Accumulate the counts of particles per cell.
686
 * Threadpool helper for accumulating the counts of particles per cell.
687
 *
688
 * part version.
689
 */
Peter W. Draper's avatar
Peter W. Draper committed
690
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(part);
691

692
693
/**
 * @brief Accumulate the counts of star particles per cell.
694
 * Threadpool helper for accumulating the counts of particles per cell.
695
 *
696
 * spart version.
697
 */
Peter W. Draper's avatar
Peter W. Draper committed
698
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(spart);
699

700
701
/**
 * @brief Accumulate the counts of gravity particles per cell.
702
 * Threadpool helper for accumulating the counts of particles per cell.
703
 *
704
 * gpart version.
705
 */
Peter W. Draper's avatar
Peter W. Draper committed
706
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(gpart);
707

708
#endif /* redist_mapper_data */
709

710
#ifdef WITH_MPI /* savelink_mapper_data */
711
712

/* Support for saving the linkage between gparts and parts/sparts. */
713
struct savelink_mapper_data {
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
  int nr_nodes;
  int *counts;
  void *parts;
  int nodeID;
};

/**
 * @brief Save the offset of each gravity partner of a part or spart.
 *
 * The offset is from the start of the sorted particles to be sent to a node.
 * This is possible as parts without gravity partners have a positive id.
 * These offsets are used to restore the pointers on the receiving node.
 *
 * CHECKS should be eliminated as dead code when optimizing.
 */
Peter W. Draper's avatar
Peter W. Draper committed
729
#define ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(TYPE, CHECKS)                      \
Peter W. Draper's avatar
Peter W. Draper committed
730
731
732
  engine_redistribute_savelink_mapper_##TYPE(void *map_data, int num_elements, \
                                             void *extra_data) {               \
    int *nodes = (int *)map_data;                                              \
733
734
    struct savelink_mapper_data *mydata =                                      \
        (struct savelink_mapper_data *)extra_data;                             \
Peter W. Draper's avatar
Peter W. Draper committed
735
736
737
738
739
740
741
    int nodeID = mydata->nodeID;                                               \
    int nr_nodes = mydata->nr_nodes;                                           \
    int *counts = mydata->counts;                                              \
    struct TYPE *parts = (struct TYPE *)mydata->parts;                         \
                                                                               \
    for (int j = 0; j < num_elements; j++) {                                   \
      int node = nodes[j];                                                     \
742
      int count = 0;                                                           \
Peter W. Draper's avatar
Peter W. Draper committed
743
744
745
      size_t offset = 0;                                                       \
      for (int i = 0; i < node; i++) offset += counts[nodeID * nr_nodes + i];  \
                                                                               \
746
      for (int k = 0; k < counts[nodeID * nr_nodes + node]; k++) {             \
Peter W. Draper's avatar
Peter W. Draper committed
747
748
        if (parts[k + offset].gpart != NULL) {                                 \
          if (CHECKS)                                                          \
749
            if (parts[k + offset].gpart->id_or_neg_offset > 0)                 \
Peter W. Draper's avatar
Peter W. Draper committed
750
751
752
753
754
755
              error("Trying to link a partnerless " #TYPE "!");                \
          parts[k + offset].gpart->id_or_neg_offset = -count;                  \
          count++;                                                             \
        }                                                                      \
      }                                                                        \
    }                                                                          \
756
757
758
759
760
761
762
  }

/**
 * @brief Save position of part-gpart links.
 * Threadpool helper for accumulating the counts of particles per cell.
 */
#ifdef SWIFT_DEBUG_CHECKS
Peter W. Draper's avatar
Peter W. Draper committed
763
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 1);
764
#else
Peter W. Draper's avatar
Peter W. Draper committed
765
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 0);
766
767
768
769
770
771
772
#endif

/**
 * @brief Save position of spart-gpart links.
 * Threadpool helper for accumulating the counts of particles per cell.
 */
#ifdef SWIFT_DEBUG_CHECKS
Peter W. Draper's avatar
Peter W. Draper committed
773
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 1);
774
#else
Peter W. Draper's avatar
Peter W. Draper committed
775
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 0);
776
777
#endif

778
#endif /* savelink_mapper_data */
779

780
#ifdef WITH_MPI /* relink_mapper_data */
781
782

/* Support for relinking parts, gparts and sparts after moving between nodes. */
783
struct relink_mapper_data {
784
785
786
787
  int nodeID;
  int nr_nodes;
  int *counts;
  int *s_counts;
788
789
790
791
792
793
794
795
796
  int *g_counts;
  struct space *s;
};

/**
 * @brief Restore the part/gpart and spart/gpart links for a list of nodes.
 *
 * @param map_data address of nodes to process.
 * @param num_elements the number nodes to process.
797
798
 * @param extra_data additional data defining the context (a
 * relink_mapper_data).
799
800
801
802
803
 */
static void engine_redistribute_relink_mapper(void *map_data, int num_elements,
                                              void *extra_data) {

  int *nodes = (int *)map_data;
804
  struct relink_mapper_data *mydata = (struct relink_mapper_data *)extra_data;
805
806
807
808

  int nodeID = mydata->nodeID;
  int nr_nodes = mydata->nr_nodes;
  int *counts = mydata->counts;
809
  int *g_counts = mydata->g_counts;
810
811
  int *s_counts = mydata->s_counts;
  struct space *s = mydata->s;
812
813
814
815

  for (int i = 0; i < num_elements; i++) {

    int node = nodes[i];
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830

    /* Get offsets to correct parts of the counts arrays for this node. */
    size_t offset_parts = 0;
    size_t offset_gparts = 0;
    size_t offset_sparts = 0;
    for (int n = 0; n < node; n++) {
      int ind_recv = n * nr_nodes + nodeID;
      offset_parts += counts[ind_recv];
      offset_gparts += g_counts[ind_recv];
      offset_sparts += s_counts[ind_recv];
    }

    /* Number of gparts sent from this node. */
    int ind_recv = node * nr_nodes + nodeID;
    const size_t count_gparts = g_counts[ind_recv];
831
832

    /* Loop over the gparts received from this node */
833
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; k++) {
834
835
836
837
838

      /* Does this gpart have a gas partner ? */
      if (s->gparts[k].type == swift_type_gas) {

        const ptrdiff_t partner_index =
839
            offset_parts - s->gparts[k].id_or_neg_offset;
840
841
842
843
844
845
846

        /* Re-link */
        s->gparts[k].id_or_neg_offset = -partner_index;
        s->parts[partner_index].gpart = &s->gparts[k];
      }

      /* Does this gpart have a star partner ? */
847
      else if (s->gparts[k].type == swift_type_stars) {
848
849

        const ptrdiff_t partner_index =
850
            offset_sparts - s->gparts[k].id_or_neg_offset;
851
852
853
854
855
856
857
858
859

        /* Re-link */
        s->gparts[k].id_or_neg_offset = -partner_index;
        s->sparts[partner_index].gpart = &s->gparts[k];
      }
    }
  }
}

860
#endif /* relink_mapper_data */
861

862
/**
863
 * @brief Redistribute the particles amongst the nodes according
864
865
 *      to their cell's node IDs.
 *
866
867
868
869
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
870
871
872
873
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
874
875
 *
 *
876
877
 * @param e The #engine.
 */
878
void engine_redistribute(struct engine *e) {
879

880
#ifdef WITH_MPI
881

882
883
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
884
  struct space *s = e->s;
885
  struct cell *cells = s->cells_top;
886
  const int nr_cells = s->nr_cells;
887
  struct xpart *xparts = s->xparts;
888
889
  struct part *parts = s->parts;
  struct gpart *gparts = s->gparts;
890
  struct spart *sparts = s->sparts;
891
  ticks tic = getticks();
892

893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
  size_t nr_parts = s->nr_parts;
  size_t nr_gparts = s->nr_gparts;
  size_t nr_sparts = s->nr_sparts;

  /* Start by moving inhibited particles to the end of the arrays */
  for (size_t k = 0; k < nr_parts; /* void */) {
    if (parts[k].time_bin == time_bin_inhibited) {
      nr_parts -= 1;

      /* Swap the particle */
      memswap(&parts[k], &parts[nr_parts], sizeof(struct part));

      /* Swap the xpart */
      memswap(&xparts[k], &xparts[nr_parts], sizeof(struct xpart));

      /* Swap the link with the gpart */
      if (parts[k].gpart != NULL) {
        parts[k].gpart->id_or_neg_offset = -k;
      }
      if (parts[nr_parts].gpart != NULL) {
        parts[nr_parts].gpart->id_or_neg_offset = -nr_parts;
      }
    } else {
      k++;
    }
  }

  /* Now move inhibited star particles to the end of the arrays */
  for (size_t k = 0; k < nr_sparts; /* void */) {
    if (sparts[k].time_bin == time_bin_inhibited) {
      nr_sparts -= 1;

      /* Swap the particle */
      memswap(&s->sparts[k], &s->sparts[nr_sparts], sizeof(struct spart));

      /* Swap the link with the gpart */
      if (s->sparts[k].gpart != NULL) {
        s->sparts[k].gpart->id_or_neg_offset = -k;
      }
      if (s->sparts[nr_sparts].gpart != NULL) {
        s->sparts[nr_sparts].gpart->id_or_neg_offset = -nr_sparts;
      }
    } else {
      k++;
    }
  }

  /* Finally do the same with the gravity particles */
  for (size_t k = 0; k < nr_gparts; /* void */) {
    if (gparts[k].time_bin == time_bin_inhibited) {
      nr_gparts -= 1;

      /* Swap the particle */
      memswap(&s->gparts[k], &s->gparts[nr_gparts], sizeof(struct gpart));

      /* Swap the link with part/spart */
      if (s->gparts[k].type == swift_type_gas) {
        s->parts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
      } else if (s->gparts[k].type == swift_type_stars) {
        s->sparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
      }
      if (s->gparts[nr_gparts].type == swift_type_gas) {
        s->parts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
            &s->gparts[nr_gparts];
      } else if (s->gparts[nr_gparts].type == swift_type_stars) {
        s->sparts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
            &s->gparts[nr_gparts];
      }
    } else {
      k++;
    }
  }

  /* Now we are ready to deal with real particles and can start the exchange. */

968
  /* Allocate temporary arrays to store the counts of particles to be sent
969
970
   * and the destination of each particle */
  int *counts;
971
  if ((counts = (int *)calloc(sizeof(int), nr_nodes * nr_nodes)) == NULL)
972
    error("Failed to allocate counts temporary buffer.");
973

974
  int *dest;
975
  if ((dest = (int *)malloc(sizeof(int) * nr_parts)) == NULL)
976
977
    error("Failed to allocate dest temporary buffer.");

978
979
980
981
982
  /* Simple index of node IDs, used for mappers over nodes. */
  int *nodes = NULL;
  if ((nodes = (int *)malloc(sizeof(int) * nr_nodes)) == NULL)
    error("Failed to allocate nodes temporary buffer.");
  for (int k = 0; k < nr_nodes; k++) nodes[k] = k;
983