engine.c 266 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
5
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
6
7
8
 *                    Angus Lepper (angus.lepper@ed.ac.uk)
 *               2016 John A. Regan (john.a.regan@durham.ac.uk)
 *                    Tom Theuns (tom.theuns@durham.ac.uk)
9
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
10
11
12
13
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
14
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
15
16
17
18
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
19
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
20
21
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
22
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
23
24
25
26
27
28
29
30
31
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
32
#include <stdbool.h>
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
37

38
39
/* MPI headers. */
#ifdef WITH_MPI
40
#include <mpi.h>
41
42
#endif

Angus Lepper's avatar
Angus Lepper committed
43
44
#ifdef HAVE_LIBNUMA
#include <numa.h>
45
46
#endif

47
48
49
50
51
/* Load the profiler header, if needed. */
#ifdef WITH_PROFILER
#include <gperftools/profiler.h>
#endif

52
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
53
#include "engine.h"
54
55

/* Local headers. */
56
#include "active.h"
57
#include "atomic.h"
58
#include "cell.h"
59
#include "chemistry.h"
60
#include "clocks.h"
61
#include "cooling.h"
62
#include "cosmology.h"
63
64
#include "cycle.h"
#include "debug.h"
65
#include "equation_of_state.h"
66
#include "error.h"
67
#include "gravity.h"
68
#include "gravity_cache.h"
69
#include "hydro.h"
lhausamm's avatar
lhausamm committed
70
#include "logger.h"
lhausamm's avatar
lhausamm committed
71
#include "logger_io.h"
72
#include "map.h"
73
#include "memswap.h"
74
#include "minmax.h"
75
#include "outputlist.h"
76
#include "parallel_io.h"
77
#include "part.h"
78
#include "partition.h"
James Willis's avatar
James Willis committed
79
#include "profiler.h"
80
#include "proxy.h"
81
#include "restart.h"
82
#include "runner.h"
83
84
#include "serial_io.h"
#include "single_io.h"
85
#include "sort_part.h"
86
#include "sourceterms.h"
Loic Hausammann's avatar
Loic Hausammann committed
87
#include "stars_io.h"
88
#include "statistics.h"
89
#include "timers.h"
90
#include "tools.h"
91
#include "units.h"
92
#include "velociraptor_interface.h"
Matthieu Schaller's avatar
Matthieu Schaller committed
93
#include "version.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
94

95
96
97
/* Particle cache size. */
#define CACHE_SIZE 512

98
99
100
101
102
const char *engine_policy_names[] = {"none",
                                     "rand",
                                     "steal",
                                     "keep",
                                     "block",
103
                                     "cpu tight",
104
                                     "mpi",
105
                                     "numa affinity",
106
                                     "hydro",
107
108
109
110
111
                                     "self gravity",
                                     "external gravity",
                                     "cosmological integration",
                                     "drift everything",
                                     "reconstruct multi-poles",
112
113
                                     "cooling",
                                     "sourceterms",
James Willis's avatar
James Willis committed
114
                                     "stars",
Loic Hausammann's avatar
Loic Hausammann committed
115
                                     "structure finding",
116
                                     "star formation",
Loikki's avatar
Loikki committed
117
118
119
                                     "feedback",
				     "logger"
};
Pedro Gonnet's avatar
Pedro Gonnet committed
120

121
122
123
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

124
125
126
127
128
/**
 * @brief Data collected from the cells at the end of a time-step
 */
struct end_of_step_data {

129
130
  size_t updated, g_updated, s_updated;
  size_t inhibited, g_inhibited, s_inhibited;
131
132
  integertime_t ti_hydro_end_min, ti_hydro_end_max, ti_hydro_beg_max;
  integertime_t ti_gravity_end_min, ti_gravity_end_max, ti_gravity_beg_max;
133
134
135
  struct engine *e;
};

136
137
138
139
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
140
 * @param l A pointer to the #link, will be modified atomically.
141
142
143
144
 * @param t The #task.
 *
 * @return The new #link pointer.
 */
145
void engine_addlink(struct engine *e, struct link **l, struct task *t) {
146

147
  /* Get the next free link. */
148
  const size_t ind = atomic_inc(&e->nr_links);
149
150
151
152
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
153

154
  /* Set it atomically. */
155
  res->t = t;
156
  res->next = atomic_swap(l, res);
157
}
158

Loic Hausammann's avatar
Loic Hausammann committed
159
160
161
/**
 * @brief Recursively add non-implicit star ghost tasks to a cell hierarchy.
 */
162
163
164
void engine_add_stars_ghosts(struct engine *e, struct cell *c,
                             struct task *stars_ghost_in,
                             struct task *stars_ghost_out) {
Loic Hausammann's avatar
Loic Hausammann committed
165
166

  /* If we have reached the leaf OR have to few particles to play with*/
167
  if (!c->split || c->stars.count < engine_max_sparts_per_ghost) {
Loic Hausammann's avatar
Loic Hausammann committed
168
169
170

    /* Add the ghost task and its dependencies */
    struct scheduler *s = &e->sched;
171
    c->stars.ghost = scheduler_addtask(s, task_type_stars_ghost,
172
                                       task_subtype_none, 0, 0, c, NULL);
173
174
    scheduler_addunlock(s, stars_ghost_in, c->stars.ghost);
    scheduler_addunlock(s, c->stars.ghost, stars_ghost_out);
Loic Hausammann's avatar
Loic Hausammann committed
175
176
177
178
  } else {
    /* Keep recursing */
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
179
180
        engine_add_stars_ghosts(e, c->progeny[k], stars_ghost_in,
                                stars_ghost_out);
Loic Hausammann's avatar
Loic Hausammann committed
181
182
183
  }
}

184
185
186
187
188
/**
 * @brief Recursively add non-implicit ghost tasks to a cell hierarchy.
 */
void engine_add_ghosts(struct engine *e, struct cell *c, struct task *ghost_in,
                       struct task *ghost_out) {
189
190

  /* If we have reached the leaf OR have to few particles to play with*/
191
  if (!c->split || c->hydro.count < engine_max_parts_per_ghost) {
192
193

    /* Add the ghost task and its dependencies */
194
    struct scheduler *s = &e->sched;
195
    c->hydro.ghost =
196
        scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0, c, NULL);
197
198
    scheduler_addunlock(s, ghost_in, c->hydro.ghost);
    scheduler_addunlock(s, c->hydro.ghost, ghost_out);
199
  } else {
200
    /* Keep recursing */
201
202
203
204
205
206
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_add_ghosts(e, c->progeny[k], ghost_in, ghost_out);
  }
}

207
208
/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
209
 * i.e. all the O(Npart) tasks -- timestep version
210
211
212
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
213
214
 * Note that there is no need to recurse below the super-cell. Note also
 * that we only add tasks if the relevant particles are present in the cell.
215
 *
216
217
218
 * @param e The #engine.
 * @param c The #cell.
 */
219
void engine_make_hierarchical_tasks_common(struct engine *e, struct cell *c) {
220
221

  struct scheduler *s = &e->sched;
222
  const int is_with_cooling = (e->policy & engine_policy_cooling);
Matthieu Schaller's avatar
Matthieu Schaller committed
223
  const int is_with_star_formation = (e->policy & engine_policy_star_formation);
224

225
  /* Are we in a super-cell ? */
226
  if (c->super == c) {
227
228
229
230

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

231
232
      /* Add the two half kicks */
      c->kick1 = scheduler_addtask(s, task_type_kick1, task_subtype_none, 0, 0,
233
                                   c, NULL);
234

lhausamm's avatar
lhausamm committed
235
#if defined(WITH_LOGGER)
Loikki's avatar
Loikki committed
236
237
	c->logger = scheduler_addtask(s, task_type_logger, task_subtype_none, 0, 0,
				      c, NULL);
lhausamm's avatar
lhausamm committed
238
#endif
Loikki's avatar
Loikki committed
239
240
	

241
      c->kick2 = scheduler_addtask(s, task_type_kick2, task_subtype_none, 0, 0,
242
                                   c, NULL);
Tom Theuns's avatar
Tom Theuns committed
243

244
245
      /* Add the time-step calculation task and its dependency */
      c->timestep = scheduler_addtask(s, task_type_timestep, task_subtype_none,
246
                                      0, 0, c, NULL);
247

248
      /* Add the task finishing the force calculation */
249
250
      c->end_force = scheduler_addtask(s, task_type_end_force,
                                       task_subtype_none, 0, 0, c, NULL);
251

252
253
      if (is_with_cooling) {

254
255
        c->hydro.cooling = scheduler_addtask(s, task_type_cooling,
                                             task_subtype_none, 0, 0, c, NULL);
256

257
        scheduler_addunlock(s, c->end_force, c->hydro.cooling);
258
        scheduler_addunlock(s, c->hydro.cooling, c->kick2);
259
260

      } else {
261
        scheduler_addunlock(s, c->end_force, c->kick2);
262
      }
Matthieu Schaller's avatar
Matthieu Schaller committed
263
264
265
266
267
268
269
270
271
272
273
274

      if (is_with_star_formation) {

        c->hydro.star_formation = scheduler_addtask(
            s, task_type_star_formation, task_subtype_none, 0, 0, c, NULL);

        scheduler_addunlock(s, c->kick2, c->hydro.star_formation);
        scheduler_addunlock(s, c->hydro.star_formation, c->timestep);

      } else {
        scheduler_addunlock(s, c->kick2, c->timestep);
      }
275
      scheduler_addunlock(s, c->timestep, c->kick1);
lhausamm's avatar
lhausamm committed
276
277
278

#if defined(WITH_LOGGER)
      scheduler_addunlock(s, c->kick1, c->logger);
lhausamm's avatar
lhausamm committed
279
280
#endif
}
281

282
  } else { /* We are above the super-cell so need to go deeper */
283

284
285
286
287
288
289
290
    /* Recurse. */
    if (c->split)
      for (int k = 0; k < 8; k++)
        if (c->progeny[k] != NULL)
          engine_make_hierarchical_tasks_common(e, c->progeny[k]);
  }
}
291

292
293
/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
294
 * i.e. all the O(Npart) tasks -- hydro version
295
296
297
298
299
300
301
302
303
304
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * Note that there is no need to recurse below the super-cell. Note also
 * that we only add tasks if the relevant particles are present in the cell.
 *
 * @param e The #engine.
 * @param c The #cell.
 */
void engine_make_hierarchical_tasks_hydro(struct engine *e, struct cell *c) {
305

306
307
  struct scheduler *s = &e->sched;
  const int is_with_sourceterms = (e->policy & engine_policy_sourceterms);
308

309
  /* Are we in a super-cell ? */
310
  if (c->hydro.super == c) {
311

312
    /* Add the sort task. */
313
    c->hydro.sorts =
314
        scheduler_addtask(s, task_type_sort, task_subtype_none, 0, 0, c, NULL);
315

316
317
    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {
318

319
      /* Add the drift task. */
320
321
      c->hydro.drift = scheduler_addtask(s, task_type_drift_part,
                                         task_subtype_none, 0, 0, c, NULL);
322

323
      /* Generate the ghost tasks. */
324
      c->hydro.ghost_in =
325
326
          scheduler_addtask(s, task_type_ghost_in, task_subtype_none, 0,
                            /* implicit = */ 1, c, NULL);
327
      c->hydro.ghost_out =
328
329
          scheduler_addtask(s, task_type_ghost_out, task_subtype_none, 0,
                            /* implicit = */ 1, c, NULL);
330
      engine_add_ghosts(e, c, c->hydro.ghost_in, c->hydro.ghost_out);
331
332

#ifdef EXTRA_HYDRO_LOOP
333
      /* Generate the extra ghost task. */
334
335
      c->hydro.extra_ghost = scheduler_addtask(
          s, task_type_extra_ghost, task_subtype_none, 0, 0, c, NULL);
336
#endif
337

338
      /* add source terms */
339
      if (is_with_sourceterms) {
340
        c->sourceterms = scheduler_addtask(s, task_type_sourceterms,
341
                                           task_subtype_none, 0, 0, c, NULL);
342
      }
343
344
    }

345
  } else { /* We are above the super-cell so need to go deeper */
346

347
348
349
350
351
352
353
354
    /* Recurse. */
    if (c->split)
      for (int k = 0; k < 8; k++)
        if (c->progeny[k] != NULL)
          engine_make_hierarchical_tasks_hydro(e, c->progeny[k]);
  }
}

355
356
357
358
359
360
361
362
363
364
365
366
/**
 * @brief Generate the hydro hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks -- gravity version
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * Note that there is no need to recurse below the super-cell. Note also
 * that we only add tasks if the relevant particles are present in the cell.
 *
 * @param e The #engine.
 * @param c The #cell.
 */
367
368
369
370
371
372
373
void engine_make_hierarchical_tasks_gravity(struct engine *e, struct cell *c) {

  struct scheduler *s = &e->sched;
  const int periodic = e->s->periodic;
  const int is_self_gravity = (e->policy & engine_policy_self_gravity);

  /* Are we in a super-cell ? */
374
  if (c->grav.super == c) {
375
376
377
378

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

379
380
      c->grav.drift = scheduler_addtask(s, task_type_drift_gpart,
                                        task_subtype_none, 0, 0, c, NULL);
381
382

      if (is_self_gravity) {
383

384
        /* Initialisation of the multipoles */
385
        c->grav.init = scheduler_addtask(s, task_type_init_grav,
386
387
388
                                         task_subtype_none, 0, 0, c, NULL);

        /* Gravity non-neighbouring pm calculations */
389
        c->grav.long_range = scheduler_addtask(
390
391
392
            s, task_type_grav_long_range, task_subtype_none, 0, 0, c, NULL);

        /* Gravity recursive down-pass */
393
        c->grav.down = scheduler_addtask(s, task_type_grav_down,
394
395
                                         task_subtype_none, 0, 0, c, NULL);

396
        /* Implicit tasks for the up and down passes */
397
        c->grav.init_out = scheduler_addtask(s, task_type_init_grav_out,
398
                                             task_subtype_none, 0, 1, c, NULL);
399
        c->grav.down_in = scheduler_addtask(s, task_type_grav_down_in,
400
401
                                            task_subtype_none, 0, 1, c, NULL);

402
403
        /* Gravity mesh force propagation */
        if (periodic)
404
          c->grav.mesh = scheduler_addtask(s, task_type_grav_mesh,
405
406
                                           task_subtype_none, 0, 0, c, NULL);

407
        if (periodic) scheduler_addunlock(s, c->grav.drift, c->grav.mesh);
408
409
410
        if (periodic) scheduler_addunlock(s, c->grav.mesh, c->grav.down);
        scheduler_addunlock(s, c->grav.init, c->grav.long_range);
        scheduler_addunlock(s, c->grav.long_range, c->grav.down);
411
        scheduler_addunlock(s, c->grav.down, c->super->end_force);
412

Matthieu Schaller's avatar
Matthieu Schaller committed
413
        /* Link in the implicit tasks */
414
415
        scheduler_addunlock(s, c->grav.init, c->grav.init_out);
        scheduler_addunlock(s, c->grav.down_in, c->grav.down);
416
417
      }
    }
418
  }
419

420
  /* We are below the super-cell but not below the maximal splitting depth */
421
  else if (c->grav.super != NULL && c->depth < space_subdepth_grav) {
422
423
424
425
426
427

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      if (is_self_gravity) {

428
        c->grav.init_out = scheduler_addtask(s, task_type_init_grav_out,
429
430
                                             task_subtype_none, 0, 1, c, NULL);

431
        c->grav.down_in = scheduler_addtask(s, task_type_grav_down_in,
432
433
                                            task_subtype_none, 0, 1, c, NULL);

434
435
        scheduler_addunlock(s, c->parent->grav.init_out, c->grav.init_out);
        scheduler_addunlock(s, c->grav.down_in, c->parent->grav.down_in);
436
437
      }
    }
438
  }
439

440
441
  /* Recurse but not below the maximal splitting depth */
  if (c->split && c->depth <= space_subdepth_grav)
442
443
444
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_hierarchical_tasks_gravity(e, c->progeny[k]);
445
}
446

Loic Hausammann's avatar
Loic Hausammann committed
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
/**
 * @brief Generate the stars hierarchical tasks for a hierarchy of cells -
 * i.e. all the O(Npart) tasks -- star version
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * Note that there is no need to recurse below the super-cell. Note also
 * that we only add tasks if the relevant particles are present in the cell.
 *
 * @param e The #engine.
 * @param c The #cell.
 */
void engine_make_hierarchical_tasks_stars(struct engine *e, struct cell *c) {

  struct scheduler *s = &e->sched;

  /* Are we in a super-cell ? */
Loic Hausammann's avatar
Loic Hausammann committed
464
  if (c->super == c) {
Loic Hausammann's avatar
Loic Hausammann committed
465
466
467
468
469

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      /* Generate the ghost tasks. */
470
      c->stars.ghost_in =
Loic Hausammann's avatar
Loic Hausammann committed
471
          scheduler_addtask(s, task_type_stars_ghost_in, task_subtype_none, 0,
Loic Hausammann's avatar
Loic Hausammann committed
472
                            /* implicit = */ 1, c, NULL);
473
      c->stars.ghost_out =
Loic Hausammann's avatar
Loic Hausammann committed
474
          scheduler_addtask(s, task_type_stars_ghost_out, task_subtype_none, 0,
Loic Hausammann's avatar
Loic Hausammann committed
475
                            /* implicit = */ 1, c, NULL);
476
      engine_add_stars_ghosts(e, c, c->stars.ghost_in, c->stars.ghost_out);
477
478
    }
  } else { /* We are above the super-cell so need to go deeper */
Loic Hausammann's avatar
Loic Hausammann committed
479
480
481
482
483
484
485
486
487

    /* Recurse. */
    if (c->split)
      for (int k = 0; k < 8; k++)
        if (c->progeny[k] != NULL)
          engine_make_hierarchical_tasks_stars(e, c->progeny[k]);
  }
}

488
489
490
void engine_make_hierarchical_tasks_mapper(void *map_data, int num_elements,
                                           void *extra_data) {
  struct engine *e = (struct engine *)extra_data;
491
492
  const int is_with_hydro = (e->policy & engine_policy_hydro);
  const int is_with_self_gravity = (e->policy & engine_policy_self_gravity);
493
494
  const int is_with_external_gravity =
      (e->policy & engine_policy_external_gravity);
Loic Hausammann's avatar
Loic Hausammann committed
495
  const int is_with_feedback = (e->policy & engine_policy_feedback);
496
497
498

  for (int ind = 0; ind < num_elements; ind++) {
    struct cell *c = &((struct cell *)map_data)[ind];
499
500
501
    /* Make the common tasks (time integration) */
    engine_make_hierarchical_tasks_common(e, c);
    /* Add the hydro stuff */
502
    if (is_with_hydro) engine_make_hierarchical_tasks_hydro(e, c);
503
    /* And the gravity stuff */
504
    if (is_with_self_gravity || is_with_external_gravity)
505
      engine_make_hierarchical_tasks_gravity(e, c);
Loic Hausammann's avatar
Loic Hausammann committed
506
    if (is_with_feedback) engine_make_hierarchical_tasks_stars(e, c);
507
508
509
  }
}

510
#ifdef WITH_MPI
511
/**
Peter W. Draper's avatar
Peter W. Draper committed
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
 * Do the exchange of one type of particles with all the other nodes.
 *
 * @param counts 2D array with the counts of particles to exchange with
 *               each other node.
 * @param parts the particle data to exchange
 * @param new_nr_parts the number of particles this node will have after all
 *                     exchanges have completed.
 * @param sizeofparts sizeof the particle struct.
 * @param alignsize the memory alignment required for this particle type.
 * @param mpi_type the MPI_Datatype for these particles.
 * @param nr_nodes the number of nodes to exchange with.
 * @param nodeID the id of this node.
 *
 * @result new particle data constructed from all the exchanges with the
 *         given alignment.
527
 */
528
static void *engine_do_redistribute(int *counts, char *parts,
529
530
                                    size_t new_nr_parts, size_t sizeofparts,
                                    size_t alignsize, MPI_Datatype mpi_type,
531
                                    int nr_nodes, int nodeID) {
532
533

  /* Allocate a new particle array with some extra margin */
534
  char *parts_new = NULL;
535
536
  if (posix_memalign(
          (void **)&parts_new, alignsize,
537
          sizeofparts * new_nr_parts * engine_redistribute_alloc_margin) != 0)
538
539
540
541
    error("Failed to allocate new particle data.");

  /* Prepare MPI requests for the asynchronous communications */
  MPI_Request *reqs;
542
543
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 2 * nr_nodes)) ==
      NULL)
544
545
    error("Failed to allocate MPI request list.");

546
  /* Only send and receive only "chunk" particles per request. So we need to
547
548
549
   * loop as many times as necessary here. Make 2Gb/sizeofparts so we only
   * send 2Gb packets. */
  const int chunk = INT_MAX / sizeofparts;
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
  int sent = 0;
  int recvd = 0;

  int activenodes = 1;
  while (activenodes) {

    for (int k = 0; k < 2 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

    /* Emit the sends and recvs for the data. */
    size_t offset_send = sent;
    size_t offset_recv = recvd;
    activenodes = 0;

    for (int k = 0; k < nr_nodes; k++) {

      /* Indices in the count arrays of the node of interest */
      const int ind_send = nodeID * nr_nodes + k;
      const int ind_recv = k * nr_nodes + nodeID;

      /* Are we sending any data this loop? */
      int sending = counts[ind_send] - sent;
      if (sending > 0) {
        activenodes++;
573
        if (sending > chunk) sending = chunk;
574
575
576
577

        /* If the send and receive is local then just copy. */
        if (k == nodeID) {
          int receiving = counts[ind_recv] - recvd;
578
          if (receiving > chunk) receiving = chunk;
579
          memcpy(&parts_new[offset_recv * sizeofparts],
580
                 &parts[offset_send * sizeofparts], sizeofparts * receiving);
581
582
        } else {
          /* Otherwise send it. */
583
584
585
          int res =
              MPI_Isend(&parts[offset_send * sizeofparts], sending, mpi_type, k,
                        ind_send, MPI_COMM_WORLD, &reqs[2 * k + 0]);
586
587
588
589
          if (res != MPI_SUCCESS)
            mpi_error(res, "Failed to isend parts to node %i.", k);
        }
      }
590

591
      /* If we're sending to this node, then move past it to next. */
592
      if (counts[ind_send] > 0) offset_send += counts[ind_send];
593

594
595
596
597
598
599
      /* Are we receiving any data from this node? Note already done if coming
       * from this node. */
      if (k != nodeID) {
        int receiving = counts[ind_recv] - recvd;
        if (receiving > 0) {
          activenodes++;
600
          if (receiving > chunk) receiving = chunk;
601
602
603
604
605
606
          int res = MPI_Irecv(&parts_new[offset_recv * sizeofparts], receiving,
                              mpi_type, k, ind_recv, MPI_COMM_WORLD,
                              &reqs[2 * k + 1]);
          if (res != MPI_SUCCESS)
            mpi_error(res, "Failed to emit irecv of parts from node %i.", k);
        }
607
608
      }

609
      /* If we're receiving from this node, then move past it to next. */
610
      if (counts[ind_recv] > 0) offset_recv += counts[ind_recv];
611
612
    }

613
614
615
616
617
618
619
    /* Wait for all the sends and recvs to tumble in. */
    MPI_Status stats[2 * nr_nodes];
    int res;
    if ((res = MPI_Waitall(2 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
      for (int k = 0; k < 2 * nr_nodes; k++) {
        char buff[MPI_MAX_ERROR_STRING];
        MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
620
621
        message("request from source %i, tag %i has error '%s'.",
                stats[k].MPI_SOURCE, stats[k].MPI_TAG, buff);
622
623
      }
      error("Failed during waitall for part data.");
624
    }
625
626
627
628

    /* Move to next chunks. */
    sent += chunk;
    recvd += chunk;
629
630
631
632
633
634
635
636
637
638
  }

  /* Free temps. */
  free(reqs);

  /* And return new memory. */
  return parts_new;
}
#endif

639
#ifdef WITH_MPI /* redist_mapper */
640

641
/* Support for engine_redistribute threadpool dest mappers. */
642
struct redist_mapper_data {
643
  int *counts;
644
645
646
647
648
  int *dest;
  int nodeID;
  int nr_nodes;
  struct cell *cells;
  struct space *s;
649
650
651
  void *base;
};

652
653
654
/* Generic function for accumulating counts for TYPE parts. Note
 * we use a local counts array to avoid the atomic_add in the parts
 * loop. */
Peter W. Draper's avatar
Peter W. Draper committed
655
#define ENGINE_REDISTRIBUTE_DEST_MAPPER(TYPE)                              \
656
657
658
  engine_redistribute_dest_mapper_##TYPE(void *map_data, int num_elements, \
                                         void *extra_data) {               \
    struct TYPE *parts = (struct TYPE *)map_data;                          \
659
660
    struct redist_mapper_data *mydata =                                    \
        (struct redist_mapper_data *)extra_data;                           \
661
662
663
664
    struct space *s = mydata->s;                                           \
    int *dest =                                                            \
        mydata->dest + (ptrdiff_t)(parts - (struct TYPE *)mydata->base);   \
    int *lcounts = NULL;                                                   \
665
666
    if ((lcounts = (int *)calloc(                                          \
             sizeof(int), mydata->nr_nodes * mydata->nr_nodes)) == NULL)   \
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
      error("Failed to allocate counts thread-specific buffer");           \
    for (int k = 0; k < num_elements; k++) {                               \
      for (int j = 0; j < 3; j++) {                                        \
        if (parts[k].x[j] < 0.0)                                           \
          parts[k].x[j] += s->dim[j];                                      \
        else if (parts[k].x[j] >= s->dim[j])                               \
          parts[k].x[j] -= s->dim[j];                                      \
      }                                                                    \
      const int cid = cell_getid(s->cdim, parts[k].x[0] * s->iwidth[0],    \
                                 parts[k].x[1] * s->iwidth[1],             \
                                 parts[k].x[2] * s->iwidth[2]);            \
      dest[k] = s->cells_top[cid].nodeID;                                  \
      size_t ind = mydata->nodeID * mydata->nr_nodes + dest[k];            \
      lcounts[ind] += 1;                                                   \
    }                                                                      \
    for (int k = 0; k < (mydata->nr_nodes * mydata->nr_nodes); k++)        \
      atomic_add(&mydata->counts[k], lcounts[k]);                          \
    free(lcounts);                                                         \
  }
686

687
688
/**
 * @brief Accumulate the counts of particles per cell.
689
 * Threadpool helper for accumulating the counts of particles per cell.
690
 *
691
 * part version.
692
 */
Peter W. Draper's avatar
Peter W. Draper committed
693
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(part);
694

695
696
/**
 * @brief Accumulate the counts of star particles per cell.
697
 * Threadpool helper for accumulating the counts of particles per cell.
698
 *
699
 * spart version.
700
 */
Peter W. Draper's avatar
Peter W. Draper committed
701
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(spart);
702

703
704
/**
 * @brief Accumulate the counts of gravity particles per cell.
705
 * Threadpool helper for accumulating the counts of particles per cell.
706
 *
707
 * gpart version.
708
 */
Peter W. Draper's avatar
Peter W. Draper committed
709
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(gpart);
710

711
#endif /* redist_mapper_data */
712

713
#ifdef WITH_MPI /* savelink_mapper_data */
714
715

/* Support for saving the linkage between gparts and parts/sparts. */
716
struct savelink_mapper_data {
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
  int nr_nodes;
  int *counts;
  void *parts;
  int nodeID;
};

/**
 * @brief Save the offset of each gravity partner of a part or spart.
 *
 * The offset is from the start of the sorted particles to be sent to a node.
 * This is possible as parts without gravity partners have a positive id.
 * These offsets are used to restore the pointers on the receiving node.
 *
 * CHECKS should be eliminated as dead code when optimizing.
 */
Peter W. Draper's avatar
Peter W. Draper committed
732
#define ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(TYPE, CHECKS)                      \
Peter W. Draper's avatar
Peter W. Draper committed
733
734
735
  engine_redistribute_savelink_mapper_##TYPE(void *map_data, int num_elements, \
                                             void *extra_data) {               \
    int *nodes = (int *)map_data;                                              \
736
737
    struct savelink_mapper_data *mydata =                                      \
        (struct savelink_mapper_data *)extra_data;                             \
Peter W. Draper's avatar
Peter W. Draper committed
738
739
740
741
742
743
744
    int nodeID = mydata->nodeID;                                               \
    int nr_nodes = mydata->nr_nodes;                                           \
    int *counts = mydata->counts;                                              \
    struct TYPE *parts = (struct TYPE *)mydata->parts;                         \
                                                                               \
    for (int j = 0; j < num_elements; j++) {                                   \
      int node = nodes[j];                                                     \
745
      int count = 0;                                                           \
Peter W. Draper's avatar
Peter W. Draper committed
746
747
748
      size_t offset = 0;                                                       \
      for (int i = 0; i < node; i++) offset += counts[nodeID * nr_nodes + i];  \
                                                                               \
749
      for (int k = 0; k < counts[nodeID * nr_nodes + node]; k++) {             \
Peter W. Draper's avatar
Peter W. Draper committed
750
751
        if (parts[k + offset].gpart != NULL) {                                 \
          if (CHECKS)                                                          \
752
            if (parts[k + offset].gpart->id_or_neg_offset > 0)                 \
Peter W. Draper's avatar
Peter W. Draper committed
753
754
755
756
757
758
              error("Trying to link a partnerless " #TYPE "!");                \
          parts[k + offset].gpart->id_or_neg_offset = -count;                  \
          count++;                                                             \
        }                                                                      \
      }                                                                        \
    }                                                                          \
759
760
761
762
763
764
765
  }

/**
 * @brief Save position of part-gpart links.
 * Threadpool helper for accumulating the counts of particles per cell.
 */
#ifdef SWIFT_DEBUG_CHECKS
Peter W. Draper's avatar
Peter W. Draper committed
766
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 1);
767
#else
Peter W. Draper's avatar
Peter W. Draper committed
768
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 0);
769
770
771
772
773
774
775
#endif

/**
 * @brief Save position of spart-gpart links.
 * Threadpool helper for accumulating the counts of particles per cell.
 */
#ifdef SWIFT_DEBUG_CHECKS
Peter W. Draper's avatar
Peter W. Draper committed
776
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 1);
777
#else
Peter W. Draper's avatar
Peter W. Draper committed
778
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 0);
779
780
#endif

781
#endif /* savelink_mapper_data */
782

783
#ifdef WITH_MPI /* relink_mapper_data */
784
785

/* Support for relinking parts, gparts and sparts after moving between nodes. */
786
struct relink_mapper_data {
787
788
789
790
  int nodeID;
  int nr_nodes;
  int *counts;
  int *s_counts;
791
792
793
794
795
796
797
798
799
  int *g_counts;
  struct space *s;
};

/**
 * @brief Restore the part/gpart and spart/gpart links for a list of nodes.
 *
 * @param map_data address of nodes to process.
 * @param num_elements the number nodes to process.
800
801
 * @param extra_data additional data defining the context (a
 * relink_mapper_data).
802
803
804
805
806
 */
static void engine_redistribute_relink_mapper(void *map_data, int num_elements,
                                              void *extra_data) {

  int *nodes = (int *)map_data;
807
  struct relink_mapper_data *mydata = (struct relink_mapper_data *)extra_data;
808
809
810
811

  int nodeID = mydata->nodeID;
  int nr_nodes = mydata->nr_nodes;
  int *counts = mydata->counts;
812
  int *g_counts = mydata->g_counts;
813
814
  int *s_counts = mydata->s_counts;
  struct space *s = mydata->s;
815
816
817
818

  for (int i = 0; i < num_elements; i++) {

    int node = nodes[i];
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833

    /* Get offsets to correct parts of the counts arrays for this node. */
    size_t offset_parts = 0;
    size_t offset_gparts = 0;
    size_t offset_sparts = 0;
    for (int n = 0; n < node; n++) {
      int ind_recv = n * nr_nodes + nodeID;
      offset_parts += counts[ind_recv];
      offset_gparts += g_counts[ind_recv];
      offset_sparts += s_counts[ind_recv];
    }

    /* Number of gparts sent from this node. */
    int ind_recv = node * nr_nodes + nodeID;
    const size_t count_gparts = g_counts[ind_recv];
834
835

    /* Loop over the gparts received from this node */
836
    for (size_t k = offset_gparts; k < offset_gparts + count_gparts; k++) {
837
838
839
840
841

      /* Does this gpart have a gas partner ? */
      if (s->gparts[k].type == swift_type_gas) {

        const ptrdiff_t partner_index =
842
            offset_parts - s->gparts[k].id_or_neg_offset;
843
844
845
846
847
848
849

        /* Re-link */
        s->gparts[k].id_or_neg_offset = -partner_index;
        s->parts[partner_index].gpart = &s->gparts[k];
      }

      /* Does this gpart have a star partner ? */
850
      else if (s->gparts[k].type == swift_type_stars) {
851
852

        const ptrdiff_t partner_index =
853
            offset_sparts - s->gparts[k].id_or_neg_offset;
854
855
856
857
858
859
860
861
862

        /* Re-link */
        s->gparts[k].id_or_neg_offset = -partner_index;
        s->sparts[partner_index].gpart = &s->gparts[k];
      }
    }
  }
}

863
#endif /* relink_mapper_data */
864

865
/**
866
 * @brief Redistribute the particles amongst the nodes according
867
868
 *      to their cell's node IDs.
 *
869
870
871
872
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
873
874
875
876
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
877
878
 *
 *
879
880
 * @param e The #engine.
 */
881
void engine_redistribute(struct engine *e) {
882

883
#ifdef WITH_MPI
884

885
886
  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
887
  struct space *s = e->s;
888
  struct cell *cells = s->cells_top;
889
  const int nr_cells = s->nr_cells;
890
  struct xpart *xparts = s->xparts;
891
892
  struct part *parts = s->parts;
  struct gpart *gparts = s->gparts;
893
  struct spart *sparts = s->sparts;
894
  ticks tic = getticks();
895

896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
  size_t nr_parts = s->nr_parts;
  size_t nr_gparts = s->nr_gparts;
  size_t nr_sparts = s->nr_sparts;

  /* Start by moving inhibited particles to the end of the arrays */
  for (size_t k = 0; k < nr_parts; /* void */) {
    if (parts[k].time_bin == time_bin_inhibited) {
      nr_parts -= 1;

      /* Swap the particle */
      memswap(&parts[k], &parts[nr_parts], sizeof(struct part));

      /* Swap the xpart */
      memswap(&xparts[k], &xparts[nr_parts], sizeof(struct xpart));

      /* Swap the link with the gpart */
      if (parts[k].gpart != NULL) {
        parts[k].gpart->id_or_neg_offset = -k;
      }
      if (parts[nr_parts].gpart != NULL) {
        parts[nr_parts].gpart->id_or_neg_offset = -nr_parts;
      }
    } else {
      k++;
    }
  }

  /* Now move inhibited star particles to the end of the arrays */
  for (size_t k = 0; k < nr_sparts; /* void */) {
    if (sparts[k].time_bin == time_bin_inhibited) {
      nr_sparts -= 1;

      /* Swap the particle */
      memswap(&s->sparts[k], &s->sparts[nr_sparts], sizeof(struct spart));

      /* Swap the link with the gpart */
      if (s->sparts[k].gpart != NULL) {
        s->sparts[k].gpart->id_or_neg_offset = -k;
      }
      if (s->sparts[nr_sparts].gpart != NULL) {
        s->sparts[nr_sparts].gpart->id_or_neg_offset = -nr_sparts;
      }
    } else {
      k++;
    }
  }

  /* Finally do the same with the gravity particles */
  for (size_t k = 0; k < nr_gparts; /* void */) {
    if (gparts[k].time_bin == time_bin_inhibited) {
      nr_gparts -= 1;

      /* Swap the particle */
      memswap(&s->gparts[k], &s->gparts[nr_gparts], sizeof(struct gpart));

      /* Swap the link with part/spart */
      if (s->gparts[k].type == swift_type_gas) {
        s->parts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
      } else if (s->gparts[k].type == swift_type_stars) {
        s->sparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
      }
      if (s->gparts[nr_gparts].type == swift_type_gas) {
        s->parts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
            &s->gparts[nr_gparts];
      } else if (s->gparts[nr_gparts].type == swift_type_stars) {
        s->sparts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
            &s->gparts[nr_gparts];
      }
    } else {
      k++;
    }
  }

  /* Now we are ready to deal with real particles and can start the exchange. */

971
  /* Allocate temporary arrays to store the counts of particles to be sent
972
973
   * and the destination of each particle */
  int *counts;
974
  if ((counts = (int *)calloc(sizeof(int), nr_nodes * nr_nodes)) == NULL)
975
    error("Failed to allocate counts temporary buffer.");
976

977
  int *dest;
978
  if ((dest = (int *)malloc(sizeof(int) * nr_parts)) == NULL)
979
980
    error("Failed to allocate dest temporary buffer.");

981
982
983
984
985
  /* Simple index of node IDs, used for mappers over nodes. */
  int *nodes = NULL;
  if ((nodes = (int *)malloc(sizeof(int) * nr_nodes)) == NULL)
    error("Failed to allocate nodes temporary buffer.");
  for (int k = 0; k < nr_nodes; k++) nodes[k] = k;
986

Matthieu Schaller's avatar