engine.c 64.3 KB
Newer Older
Pedro Gonnet's avatar
Pedro Gonnet committed
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
Peter W. Draper's avatar
Peter W. Draper committed
4
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
5
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
6
7
8
9
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
10
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
11
12
13
14
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
15
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
16
17
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
 *
Pedro Gonnet's avatar
Pedro Gonnet committed
19
20
21
22
23
24
25
26
27
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
28
29
30
31
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Angus Lepper's avatar
Angus Lepper committed
32
#include <stdbool.h>
Pedro Gonnet's avatar
Pedro Gonnet committed
33

34
35
/* MPI headers. */
#ifdef WITH_MPI
36
#include <mpi.h>
37
38
#endif

Angus Lepper's avatar
Angus Lepper committed
39
40
41
42
#ifdef HAVE_LIBNUMA
#include <numa.h>
#endif

43
/* This object's header. */
Pedro Gonnet's avatar
Pedro Gonnet committed
44
#include "engine.h"
45
46

/* Local headers. */
47
#include "atomic.h"
48
#include "cell.h"
49
50
#include "cycle.h"
#include "debug.h"
51
#include "error.h"
52
#include "hydro.h"
53
#include "minmax.h"
54
#include "part.h"
55
#include "partition.h"
56
#include "timers.h"
Pedro Gonnet's avatar
Pedro Gonnet committed
57

58
59
60
61
const char *engine_policy_names[12] = {
    "none",          "rand",   "steal",        "keep",
    "block",         "fix_dt", "cpu_tight",    "mpi",
    "numa_affinity", "hydro",  "self_gravity", "external_gravity"};
62

63
64
65
/** The rank of the engine as a global variable (for messages). */
int engine_rank;

66
67
68
69
70
71
72
73
74
75
/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
 * @param l The #link.
 * @param t The #task.
 *
 * @return The new #link pointer.
 */

76
struct link *engine_addlink(struct engine *e, struct link *l, struct task *t) {
77

78
79
  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
80
    error("Link table overflow.");
81
82
  }
  struct link *res = &e->links[ind];
83
84
85
86
  res->next = l;
  res->t = t;
  return res;
}
87
88
89
90
91
92
93
94

/**
 * @brief Generate the ghost and kick tasks for a hierarchy of cells.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112

void engine_mkghosts(struct engine *e, struct cell *c, struct cell *super) {

  int k;
  struct scheduler *s = &e->sched;

  /* Am I the super-cell? */
  if (super == NULL && c->nr_tasks > 0) {

    /* Remember me. */
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
Matthieu Schaller's avatar
Matthieu Schaller committed
113
114
115
116
      /* Add the drift task. */
      c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0, 0,
                                   c, NULL, 0);
      /* Add the init task. */
Matthieu Schaller's avatar
Matthieu Schaller committed
117
118
      c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0, c,
                                  NULL, 0);
Matthieu Schaller's avatar
Matthieu Schaller committed
119
      /* Add the kick task. */
Matthieu Schaller's avatar
Matthieu Schaller committed
120
121
      c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0, 0, c,
                                  NULL, 0);
Tom Theuns's avatar
Tom Theuns committed
122
123
124
125
126
127
128
129

		/* /\* Add the gravity tasks *\/ */
      /* c->grav_external = scheduler_addtask(s, task_type_grav_external, task_subtype_none, 0, 0, */
      /*                                      c, NULL, 0); */

      /* /\* Enforce gravity calculated before kick  *\/ */
      /* scheduler_addunlock(s, c->grav_external, c->kick); */
 		
130
    }
131
  }
132

133
134
135
136
137
138
139
140
  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
    for (k = 0; k < 8; k++)
      if (c->progeny[k] != NULL) engine_mkghosts(e, c->progeny[k], super);
}
141

142
/**
143
 * @brief Redistribute the particles amongst the nodes according
144
145
146
147
148
 *      to their cell's node IDs.
 *
 * @param e The #engine.
 */

149
void engine_redistribute(struct engine *e) {
150

151
#ifdef WITH_MPI
152

153
154
155
156
157
158
159
160
  int nr_nodes = e->nr_nodes, nodeID = e->nodeID;
  struct space *s = e->s;
  int my_cells = 0;
  int *cdim = s->cdim;
  struct cell *cells = s->cells;
  int nr_cells = s->nr_cells;

  /* Start by sorting the particles according to their nodes and
161
162
     getting the counts. The counts array is indexed as
     count[from * nr_nodes + to]. */
163
164
165
166
167
168
169
170
171
172
173
174
  int *counts, *dest;
  double ih[3], dim[3];
  ih[0] = s->ih[0];
  ih[1] = s->ih[1];
  ih[2] = s->ih[2];
  dim[0] = s->dim[0];
  dim[1] = s->dim[1];
  dim[2] = s->dim[2];
  if ((counts = (int *)malloc(sizeof(int) *nr_nodes *nr_nodes)) == NULL ||
      (dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
    error("Failed to allocate count and dest buffers.");
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
175
176
177
  struct part *parts = s->parts;
  for (int k = 0; k < s->nr_parts; k++) {
    for (int j = 0; j < 3; j++) {
178
179
180
181
182
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
Pedro Gonnet's avatar
Pedro Gonnet committed
183
184
    const int cid = cell_getid(cdim, parts[k].x[0] * ih[0],
                               parts[k].x[1] * ih[1], parts[k].x[2] * ih[2]);
185
186
187
    /* if (cid < 0 || cid >= s->nr_cells)
       error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].",
             cid, k, parts[k].x[0], parts[k].x[1], parts[k].x[2]); */
188
189
190
    dest[k] = cells[cid].nodeID;
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }
191
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1);
192
193
194
195
196
197
198
199

  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

  /* Get the new number of parts for this node, be generous in allocating. */
  int nr_parts = 0;
200
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
201
202
  struct part *parts_new = NULL;
  struct xpart *xparts_new = NULL, *xparts = s->xparts;
203
204
205
206
207
208
209
210
211
212
213
  if (posix_memalign((void **)&parts_new, part_align,
                     sizeof(struct part) * nr_parts * 1.2) != 0 ||
      posix_memalign((void **)&xparts_new, part_align,
                     sizeof(struct xpart) * nr_parts * 1.2) != 0)
    error("Failed to allocate new part data.");

  /* Emit the sends and recvs for the particle data. */
  MPI_Request *reqs;
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 4 * nr_nodes)) ==
      NULL)
    error("Failed to allocate MPI request list.");
214
215
216
217
218
219
220
221
222
223
224
225
226
  for (int k = 0; k < 4 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;
  for (int offset_send = 0, offset_recv = 0, k = 0; k < nr_nodes; k++) {
    int ind_send = nodeID * nr_nodes + k;
    int ind_recv = k * nr_nodes + nodeID;
    if (counts[ind_send] > 0) {
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];
      } else {
227
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send],
228
                      e->part_mpi_type, k, 2 * ind_send + 0, MPI_COMM_WORLD,
Pedro Gonnet's avatar
Pedro Gonnet committed
229
                      &reqs[4 * k]) != MPI_SUCCESS)
230
          error("Failed to isend parts to node %i.", k);
231
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send],
232
                      e->xpart_mpi_type, k, 2 * ind_send + 1, MPI_COMM_WORLD,
Pedro Gonnet's avatar
Pedro Gonnet committed
233
                      &reqs[4 * k + 1]) != MPI_SUCCESS)
234
235
236
          error("Failed to isend xparts to node %i.", k);
        offset_send += counts[ind_send];
      }
237
    }
238
    if (k != nodeID && counts[ind_recv] > 0) {
239
240
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], e->part_mpi_type,
                    k, 2 * ind_recv + 0, MPI_COMM_WORLD,
Pedro Gonnet's avatar
Pedro Gonnet committed
241
                    &reqs[4 * k + 2]) != MPI_SUCCESS)
242
        error("Failed to emit irecv of parts from node %i.", k);
243
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv],
244
                    e->xpart_mpi_type, k, 2 * ind_recv + 1, MPI_COMM_WORLD,
Pedro Gonnet's avatar
Pedro Gonnet committed
245
                    &reqs[4 * k + 3]) != MPI_SUCCESS)
246
        error("Failed to emit irecv of parts from node %i.", k);
247
      offset_recv += counts[ind_recv];
248
249
    }
  }
250

251
252
253
254
  /* Wait for all the sends and recvs to tumble in. */
  MPI_Status stats[4 * nr_nodes];
  int res;
  if ((res = MPI_Waitall(4 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
255
    for (int k = 0; k < 4 * nr_nodes; k++) {
256
257
258
259
      char buff[MPI_MAX_ERROR_STRING];
      int res;
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
260
    }
261
262
    error("Failed during waitall for part data.");
  }
263

264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
  /* Verify that all parts are in the right place. */
  /* for ( k = 0 ; k < nr_parts ; k++ ) {
      cid = cell_getid( cdim , parts_new[k].x[0]*ih[0] , parts_new[k].x[1]*ih[1]
     , parts_new[k].x[2]*ih[2] );
      if ( cells[ cid ].nodeID != nodeID )
          error( "Received particle (%i) that does not belong here (nodeID=%i)."
     , k , cells[ cid ].nodeID );
      } */

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
  s->parts = parts_new;
  s->xparts = xparts_new;
  s->nr_parts = nr_parts;
  s->size_parts = 1.2 * nr_parts;

  /* Be verbose about what just happened. */
282
  for (int k = 0; k < nr_cells; k++)
283
284
285
286
287
288
289
290
291
    if (cells[k].nodeID == nodeID) my_cells += 1;
  message("node %i now has %i parts in %i cells.", nodeID, nr_parts, my_cells);

  /* Clean up other stuff. */
  free(reqs);
  free(counts);
  free(dest);

#else
292
  error("SWIFT was not compiled with MPI support.");
293
294
#endif
}
295

296

297
/**
298
 * @brief Repartition the cells amongst the nodes.
299
300
301
 *
 * @param e The #engine.
 */
302
303

void engine_repartition(struct engine *e) {
304
305
306

#if defined(WITH_MPI) && defined(HAVE_METIS)

307
  /* Clear the repartition flag. */
308
  enum repartition_type reparttype = e->forcerepart;
309
  e->forcerepart = REPART_NONE;
310

311
312
  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
313
  if (e->nr_nodes == 1) return;
314

315
  /* Do the repartitioning. */
316
317
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);
318
319
320
321

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
322
     the parts array, and emitting the sends and receives.
323
324
325
326
327
328
329
330
331
332
333
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

334
#else
335
  error("SWIFT was not compiled with MPI and METIS support.");
336
#endif
337
}
338

339
340
341
342
343
344
345
/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
Matthieu Schaller's avatar
Matthieu Schaller committed
346
 */
347
348
349

void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {
Matthieu Schaller's avatar
Matthieu Schaller committed
350

351
352
353
354
355
356
357
358
359
360
  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}
361

362
363
364
365
/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
366
367
 * @param ci The sending #cell.
 * @param cj The receiving #cell
368
369
 */

370
void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj) {
371

Matthieu Schaller's avatar
Matthieu Schaller committed
372
#ifdef WITH_MPI
373
374
375
  int k;
  struct link *l = NULL;
  struct scheduler *s = &e->sched;
376

377
378
379
380
381
  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
    if (l->t->ci->nodeID == cj->nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == cj->nodeID))
      break;
382

383
384
  /* If so, attach send tasks. */
  if (l != NULL) {
385

386
387
388
389
390
391
392
    /* Create the tasks. */
    struct task *t_xv =
        scheduler_addtask(&e->sched, task_type_send, task_subtype_none,
                          2 * ci->tag, 0, ci, cj, 0);
    struct task *t_rho =
        scheduler_addtask(&e->sched, task_type_send, task_subtype_none,
                          2 * ci->tag + 1, 0, ci, cj, 0);
393

394
395
    /* The send_rho task depends on the cell's ghost task. */
    scheduler_addunlock(s, ci->super->ghost, t_rho);
396

Matthieu Schaller's avatar
Matthieu Schaller committed
397
398
    /* The send_rho task should unlock the super-cell's kick task. */
    scheduler_addunlock(s, t_rho, ci->super->kick);
399

400
401
    /* The send_xv task should unlock the super-cell's ghost task. */
    scheduler_addunlock(s, t_xv, ci->super->ghost);
402

403
  }
404

405
406
407
408
  /* Recurse? */
  else if (ci->split)
    for (k = 0; k < 8; k++)
      if (ci->progeny[k] != NULL) engine_addtasks_send(e, ci->progeny[k], cj);
Matthieu Schaller's avatar
Matthieu Schaller committed
409
410
411
412

#else
  error("SWIFT was not compiled with MPI support.");
#endif
413
}
414
415
416
417
418
419
420
421
422
423

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
 */

424
425
void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
                          struct task *t_rho) {
426

Matthieu Schaller's avatar
Matthieu Schaller committed
427
#ifdef WITH_MPI
428
429
  int k;
  struct scheduler *s = &e->sched;
430

431
432
  /* Do we need to construct a recv task? */
  if (t_xv == NULL && c->nr_density > 0) {
433

434
435
436
437
438
439
440
441
    /* Create the tasks. */
    t_xv = c->recv_xv =
        scheduler_addtask(&e->sched, task_type_recv, task_subtype_none,
                          2 * c->tag, 0, c, NULL, 0);
    t_rho = c->recv_rho =
        scheduler_addtask(&e->sched, task_type_recv, task_subtype_none,
                          2 * c->tag + 1, 0, c, NULL, 0);
  }
442

443
444
445
446
447
448
449
450
451
452
453
454
455
456
  /* Add dependencies. */
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
  for (struct link *l = c->force; l != NULL; l = l->next)
    scheduler_addunlock(s, t_rho, l->t);
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);

  /* Recurse? */
  if (c->split)
    for (k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho);
Matthieu Schaller's avatar
Matthieu Schaller committed
457
458
459
460

#else
  error("SWIFT was not compiled with MPI support.");
#endif
461
}
462
463
464
465
466
467

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */
468
469

void engine_exchange_cells(struct engine *e) {
470
471
472

#ifdef WITH_MPI

473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
  int j, k, pid, count = 0;
  struct pcell *pcells;
  struct space *s = e->s;
  struct cell *cells = s->cells;
  int nr_cells = s->nr_cells;
  int nr_proxies = e->nr_proxies;
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;

  /* Run through the cells and get the size of the ones that will be sent off.
   */
  for (k = 0; k < nr_cells; k++) {
    offset[k] = count;
    if (cells[k].sendto)
      count += (cells[k].pcell_size = cell_getsize(&cells[k]));
  }
491

492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
  /* Allocate the pcells. */
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count)) == NULL)
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
  for (k = 0; k < nr_cells; k++)
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
  for (k = 0; k < nr_proxies; k++) {
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
  for (k = 0; k < nr_proxies; k++) {
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_cells_exch2(&e->proxies[pid]);
  }

520
  /* Wait for all the sends to have finished too. */
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Set the requests for the cells. */
  for (k = 0; k < nr_proxies; k++) {
    reqs_in[k] = e->proxies[k].req_cells_in;
    reqs_out[k] = e->proxies[k].req_cells_out;
  }

  /* Wait for each pcell array to come in from the proxies. */
  for (k = 0; k < nr_proxies; k++) {
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "cell data from proxy %i has arrived." , pid );
    for (count = 0, j = 0; j < e->proxies[pid].nr_cells_in; j++)
      count += cell_unpack(&e->proxies[pid].pcells_in[count],
                           e->proxies[pid].cells_in[j], e->s);
  }

541
  /* Wait for all the sends to have finished too. */
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Count the number of particles we need to import and re-allocate
     the buffer if needed. */
  for (count = 0, k = 0; k < nr_proxies; k++)
    for (j = 0; j < e->proxies[k].nr_cells_in; j++)
      count += e->proxies[k].cells_in[j]->count;
  if (count > s->size_parts_foreign) {
    if (s->parts_foreign != NULL) free(s->parts_foreign);
    s->size_parts_foreign = 1.1 * count;
    if (posix_memalign((void **)&s->parts_foreign, part_align,
                       sizeof(struct part) * s->size_parts_foreign) != 0)
      error("Failed to allocate foreign part data.");
  }
557

558
  /* Unpack the cells and link to the particle data. */
559
  struct part *parts = s->parts_foreign;
560
561
562
563
  for (k = 0; k < nr_proxies; k++) {
    for (count = 0, j = 0; j < e->proxies[k].nr_cells_in; j++) {
      count += cell_link(e->proxies[k].cells_in[j], parts);
      parts = &parts[e->proxies[k].cells_in[j]->count];
564
    }
565
566
567
568
569
570
571
572
573
  }
  s->nr_parts_foreign = parts - s->parts_foreign;

  /* Is the parts buffer large enough? */
  if (s->nr_parts_foreign > s->size_parts_foreign)
    error("Foreign parts buffer too small.");

  /* Free the pcell buffer. */
  free(pcells);
574

575
576
577
578
#else
  error("SWIFT was not compiled with MPI support.");
#endif
}
579
580
581
582
583

/**
 * @brief Exchange straying parts with other nodes.
 *
 * @param e The #engine.
584
585
 * @param offset The index in the parts array as of which the foreign parts
 *reside.
586
587
588
589
590
 * @param ind The ID of the foreign #cell.
 * @param N The number of stray parts.
 *
 * @return The number of arrived parts copied to parts and xparts.
 */
591
592

int engine_exchange_strays(struct engine *e, int offset, int *ind, int N) {
593
594
595

#ifdef WITH_MPI

596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
  int k, pid, count = 0, nr_in = 0, nr_out = 0;
  MPI_Request reqs_in[2 * engine_maxproxies];
  MPI_Request reqs_out[2 * engine_maxproxies];
  MPI_Status status;
  struct proxy *p;
  struct space *s = e->s;

  /* Re-set the proxies. */
  for (k = 0; k < e->nr_proxies; k++) e->proxies[k].nr_parts_out = 0;

  /* Put the parts into the corresponding proxies. */
  for (k = 0; k < N; k++) {
    int node_id = e->s->cells[ind[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    pid = e->proxy_ind[node_id];
    if (pid < 0)
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%llu, x=[%e,%e,%e].",
          node_id, s->parts[offset + k].id, s->parts[offset + k].x[0],
          s->parts[offset + k].x[1], s->parts[offset + k].x[2]);
    proxy_parts_load(&e->proxies[pid], &s->parts[offset + k],
                     &s->xparts[offset + k], 1);
  }

  /* Launch the proxies. */
  for (k = 0; k < e->nr_proxies; k++) {
    proxy_parts_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_parts_count_in;
    reqs_out[k] = e->proxies[k].req_parts_count_out;
  }

  /* Wait for each count to come in and start the recv. */
  for (k = 0; k < e->nr_proxies; k++) {
    if (MPI_Waitany(e->nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_parts_exch2(&e->proxies[pid]);
  }
637

638
  /* Wait for all the sends to have finished too. */
639
640
641
  if (MPI_Waitall(e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

642
  /* Count the total number of incoming particles and make sure we have
643
644
645
646
647
648
     enough space to accommodate them. */
  int count_in = 0;
  for (k = 0; k < e->nr_proxies; k++) count_in += e->proxies[k].nr_parts_in;
  message("sent out %i particles, got %i back.", N, count_in);
  if (offset + count_in > s->size_parts) {
    s->size_parts = (offset + count_in) * 1.05;
649
650
    struct part *parts_new = NULL;
    struct xpart *xparts_new = NULL;
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
    if (posix_memalign((void **)&parts_new, part_align,
                       sizeof(struct part) * s->size_parts) != 0 ||
        posix_memalign((void **)&xparts_new, part_align,
                       sizeof(struct xpart) * s->size_parts) != 0)
      error("Failed to allocate new part data.");
    memcpy(parts_new, s->parts, sizeof(struct part) * offset);
    memcpy(xparts_new, s->xparts, sizeof(struct xpart) * offset);
    free(s->parts);
    free(s->xparts);
    s->parts = parts_new;
    s->xparts = xparts_new;
  }

  /* Collect the requests for the particle data from the proxies. */
  for (k = 0; k < e->nr_proxies; k++) {
    if (e->proxies[k].nr_parts_in > 0) {
      reqs_in[2 * k] = e->proxies[k].req_parts_in;
      reqs_in[2 * k + 1] = e->proxies[k].req_xparts_in;
      nr_in += 1;
    } else
      reqs_in[2 * k] = reqs_in[2 * k + 1] = MPI_REQUEST_NULL;
    if (e->proxies[k].nr_parts_out > 0) {
      reqs_out[2 * k] = e->proxies[k].req_parts_out;
      reqs_out[2 * k + 1] = e->proxies[k].req_xparts_out;
      nr_out += 1;
    } else
      reqs_out[2 * k] = reqs_out[2 * k + 1] = MPI_REQUEST_NULL;
  }

  /* Wait for each part array to come in and collect the new
     parts from the proxies. */
  for (k = 0; k < 2 * (nr_in + nr_out); k++) {
    int err;
    if ((err = MPI_Waitany(2 * e->nr_proxies, reqs_in, &pid, &status)) !=
        MPI_SUCCESS) {
      char buff[MPI_MAX_ERROR_STRING];
      int res;
      MPI_Error_string(err, buff, &res);
      error("MPI_Waitany failed (%s).", buff);
    }
    if (pid == MPI_UNDEFINED) break;
    // message( "request from proxy %i has arrived." , pid );
    if (reqs_in[pid & ~1] == MPI_REQUEST_NULL &&
        reqs_in[pid | 1] == MPI_REQUEST_NULL) {
      p = &e->proxies[pid >> 1];
      memcpy(&s->parts[offset + count], p->parts_in,
             sizeof(struct part) * p->nr_parts_in);
      memcpy(&s->xparts[offset + count], p->xparts_in,
             sizeof(struct xpart) * p->nr_parts_in);
700
701
      /* for (int k = offset; k < offset + count; k++)
         message(
702
703
            "received particle %lli, x=[%.3e %.3e %.3e], h=%.3e, from node %i.",
            s->parts[k].id, s->parts[k].x[0], s->parts[k].x[1],
704
            s->parts[k].x[2], s->parts[k].h, p->nodeID); */
705
      count += p->nr_parts_in;
706
    }
707
  }
Pedro Gonnet's avatar
Pedro Gonnet committed
708

709
  /* Wait for all the sends to have finished too. */
Pedro Gonnet's avatar
Pedro Gonnet committed
710
711
712
713
714
  if (nr_out > 0)
    if (MPI_Waitall(2 * e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) !=
        MPI_SUCCESS)
      error("MPI_Waitall on sends failed.");

715
716
  /* Return the number of harvested parts. */
  return count;
Pedro Gonnet's avatar
Pedro Gonnet committed
717

718
#else
Pedro Gonnet's avatar
Pedro Gonnet committed
719
  error("SWIFT was not compiled with MPI support.");
720
721
722
  return 0;
#endif
}
723

724
/**
725
 * @brief Fill the #space's task list.
726
 *
727
 * @param e The #engine we are working with.
728
 */
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744

void engine_maketasks(struct engine *e) {

  struct space *s = e->s;
  struct scheduler *sched = &e->sched;
  struct cell *cells = s->cells;
  int nr_cells = s->nr_cells;
  int nodeID = e->nodeID;
  int i, j, k, ii, jj, kk, iii, jjj, kkk, cid, cjd, sid;
  int *cdim = s->cdim;
  struct task *t, *t2;
  struct cell *ci, *cj;

  /* Re-set the scheduler. */
  scheduler_reset(sched, s->tot_cells * engine_maxtaskspercell);

745
746
747
  /* Add the space sorting tasks. */
  for (i = 0; i < e->nr_threads; i++)
    scheduler_addtask(sched, task_type_psort, task_subtype_none, i, 0, NULL,
748
749
                      NULL, 0);

750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
  /* Run through the highest level of cells and add pairs. */
  for (i = 0; i < cdim[0]; i++)
    for (j = 0; j < cdim[1]; j++)
      for (k = 0; k < cdim[2]; k++) {
        cid = cell_getid(cdim, i, j, k);
        if (cells[cid].count == 0) continue;
        ci = &cells[cid];
        if (ci->count == 0) continue;
        if (ci->nodeID == nodeID)
          scheduler_addtask(sched, task_type_self, task_subtype_density, 0, 0,
                            ci, NULL, 0);
        for (ii = -1; ii < 2; ii++) {
          iii = i + ii;
          if (!s->periodic && (iii < 0 || iii >= cdim[0])) continue;
          iii = (iii + cdim[0]) % cdim[0];
          for (jj = -1; jj < 2; jj++) {
            jjj = j + jj;
            if (!s->periodic && (jjj < 0 || jjj >= cdim[1])) continue;
            jjj = (jjj + cdim[1]) % cdim[1];
            for (kk = -1; kk < 2; kk++) {
              kkk = k + kk;
              if (!s->periodic && (kkk < 0 || kkk >= cdim[2])) continue;
              kkk = (kkk + cdim[2]) % cdim[2];
              cjd = cell_getid(cdim, iii, jjj, kkk);
              cj = &cells[cjd];
              if (cid >= cjd || cj->count == 0 ||
                  (ci->nodeID != nodeID && cj->nodeID != nodeID))
                continue;
              sid = sortlistID[(kk + 1) + 3 * ((jj + 1) + 3 * (ii + 1))];
              scheduler_addtask(sched, task_type_pair, task_subtype_density,
                                sid, 0, ci, cj, 1);
781
            }
782
          }
783
        }
784
785
      }

Tom Theuns's avatar
Tom Theuns committed
786
#ifdef GRAVITY
787
#ifdef DEFAULT_GRAVITY
788
789
790
791
792
793
794
795
796
797
  /* Add the gravity mm tasks. */
  for (i = 0; i < nr_cells; i++)
    if (cells[i].gcount > 0) {
      scheduler_addtask(sched, task_type_grav_mm, task_subtype_none, -1, 0,
                        &cells[i], NULL, 0);
      for (j = i + 1; j < nr_cells; j++)
        if (cells[j].gcount > 0)
          scheduler_addtask(sched, task_type_grav_mm, task_subtype_none, -1, 0,
                            &cells[i], &cells[j], 0);
    }
Tom Theuns's avatar
Tom Theuns committed
798
799
800
801
802
803
804
805
806
807
808
809
810
811
#endif
/* #ifdef EXTERNAL_POTENTIAL */
/*   /\* Add the external potential gravity *\/ */
/*   for (i = 0; i < nr_cells; i++) */
/*     if (cells[i].gcount > 0) { */
/*       scheduler_addtask(sched, task_type_grav_mm, task_subtype_none, -1, 0, */
/*                         &cells[i], NULL, 0); */
/*       for (j = i + 1; j < nr_cells; j++) */
/*         if (cells[j].gcount > 0) */
/*           scheduler_addtask(sched, task_type_grav_mm, task_subtype_none, -1, 0, */
/*                             &cells[i], &cells[j], 0); */
/*     } */
  
/* #endif */
812
813
#endif
  scheduler_print_tasks(sched, "sched.txt");
814

815
816
817
818
819
820
821
  /* Split the tasks. */
  scheduler_splittasks(sched);

  /* Allocate the list of cell-task links. The maximum number of links
     is the number of cells (s->tot_cells) times the number of neighbours (27)
     times the number of interaction types (2, density and force). */
  if (e->links != NULL) free(e->links);
822
823
  e->size_links = s->tot_cells * 27 * 2;
  if ((e->links = malloc(sizeof(struct link) * e->size_links)) == NULL)
824
825
    error("Failed to allocate cell-task links.");
  e->nr_links = 0;
826
#ifdef GRAVITY
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
  /* Add the gravity up/down tasks at the top-level cells and push them down. */
  for (k = 0; k < nr_cells; k++)
    if (cells[k].nodeID == nodeID && cells[k].gcount > 0) {

      /* Create tasks at top level. */
      struct task *up =
          scheduler_addtask(sched, task_type_grav_up, task_subtype_none, 0, 0,
                            &cells[k], NULL, 0);
      struct task *down =
          scheduler_addtask(sched, task_type_grav_down, task_subtype_none, 0, 0,
                            &cells[k], NULL, 0);

      /* Push tasks down the cell hierarchy. */
      engine_addtasks_grav(e, &cells[k], up, down);
    }
842
#endif
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
  /* Count the number of tasks associated with each cell and
     store the density tasks in each cell, and make each sort
     depend on the sorts of its progeny. */
  for (k = 0; k < sched->nr_tasks; k++) {

    /* Get the current task. */
    t = &sched->tasks[k];
    if (t->skip) continue;

    /* Link sort tasks together. */
    if (t->type == task_type_sort && t->ci->split)
      for (j = 0; j < 8; j++)
        if (t->ci->progeny[j] != NULL && t->ci->progeny[j]->sorts != NULL) {
          t->ci->progeny[j]->sorts->skip = 0;
          scheduler_addunlock(sched, t->ci->progeny[j]->sorts, t);
858
        }
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887

    /* Link density tasks to cells. */
    if (t->type == task_type_self) {
      atomic_inc(&t->ci->nr_tasks);
      if (t->subtype == task_subtype_density) {
        t->ci->density = engine_addlink(e, t->ci->density, t);
        atomic_inc(&t->ci->nr_density);
      }
    } else if (t->type == task_type_pair) {
      atomic_inc(&t->ci->nr_tasks);
      atomic_inc(&t->cj->nr_tasks);
      if (t->subtype == task_subtype_density) {
        t->ci->density = engine_addlink(e, t->ci->density, t);
        atomic_inc(&t->ci->nr_density);
        t->cj->density = engine_addlink(e, t->cj->density, t);
        atomic_inc(&t->cj->nr_density);
      }
    } else if (t->type == task_type_sub) {
      atomic_inc(&t->ci->nr_tasks);
      if (t->cj != NULL) atomic_inc(&t->cj->nr_tasks);
      if (t->subtype == task_subtype_density) {
        t->ci->density = engine_addlink(e, t->ci->density, t);
        atomic_inc(&t->ci->nr_density);
        if (t->cj != NULL) {
          t->cj->density = engine_addlink(e, t->cj->density, t);
          atomic_inc(&t->cj->nr_density);
        }
      }
    }
888
#ifdef DEFAULT_GRAVITY
889
890
891
892
893
894
895
896
897
898
899
    /* Link gravity multipole tasks to the up/down tasks. */
    if (t->type == task_type_grav_mm ||
        (t->type == task_type_sub && t->subtype == task_subtype_grav)) {
      atomic_inc(&t->ci->nr_tasks);
      scheduler_addunlock(sched, t->ci->grav_up, t);
      scheduler_addunlock(sched, t, t->ci->grav_down);
      if (t->cj != NULL && t->ci->grav_up != t->cj->grav_up) {
        scheduler_addunlock(sched, t->cj->grav_up, t);
        scheduler_addunlock(sched, t, t->cj->grav_down);
      }
    }
900
#endif
901
  }
Matthieu Schaller's avatar
Matthieu Schaller committed
902
  /* Append a ghost task to each cell, and add kick tasks to the
903
904
905
906
     super cells. */
  for (k = 0; k < nr_cells; k++) engine_mkghosts(e, &cells[k], NULL);

  /* Run through the tasks and make force tasks for each density task.
Matthieu Schaller's avatar
Matthieu Schaller committed
907
     Each force task depends on the cell ghosts and unlocks the kick task
908
909
910
911
912
913
914
915
916
917
918
919
     of its super-cell. */
  kk = sched->nr_tasks;
  for (k = 0; k < kk; k++) {

    /* Get a pointer to the task. */
    t = &sched->tasks[k];

    /* Skip? */
    if (t->skip) continue;

    /* Self-interaction? */
    if (t->type == task_type_self && t->subtype == task_subtype_density) {
920
      scheduler_addunlock(sched, t->ci->super->init, t);
921
922
923
924
      scheduler_addunlock(sched, t, t->ci->super->ghost);
      t2 = scheduler_addtask(sched, task_type_self, task_subtype_force, 0, 0,
                             t->ci, NULL, 0);
      scheduler_addunlock(sched, t->ci->super->ghost, t2);
Matthieu Schaller's avatar
Matthieu Schaller committed
925
      scheduler_addunlock(sched, t2, t->ci->super->kick);
926
927
      t->ci->force = engine_addlink(e, t->ci->force, t2);
      atomic_inc(&t->ci->nr_force);
928
    }
929
930
931
932
933
934

    /* Otherwise, pair interaction? */
    else if (t->type == task_type_pair && t->subtype == task_subtype_density) {
      t2 = scheduler_addtask(sched, task_type_pair, task_subtype_force, 0, 0,
                             t->ci, t->cj, 0);
      if (t->ci->nodeID == nodeID) {
935
        scheduler_addunlock(sched, t->ci->super->init, t);
936
937
        scheduler_addunlock(sched, t, t->ci->super->ghost);
        scheduler_addunlock(sched, t->ci->super->ghost, t2);
Matthieu Schaller's avatar
Matthieu Schaller committed
938
        scheduler_addunlock(sched, t2, t->ci->super->kick);
939
940
      }
      if (t->cj->nodeID == nodeID && t->ci->super != t->cj->super) {
941
        scheduler_addunlock(sched, t->cj->super->init, t);
942
943
        scheduler_addunlock(sched, t, t->cj->super->ghost);
        scheduler_addunlock(sched, t->cj->super->ghost, t2);
Matthieu Schaller's avatar
Matthieu Schaller committed
944
        scheduler_addunlock(sched, t2, t->cj->super->kick);
945
946
947
948
949
950
951
952
953
954
955
956
957
958
      }
      t->ci->force = engine_addlink(e, t->ci->force, t2);
      atomic_inc(&t->ci->nr_force);
      t->cj->force = engine_addlink(e, t->cj->force, t2);
      atomic_inc(&t->cj->nr_force);
    }

    /* Otherwise, sub interaction? */
    else if (t->type == task_type_sub && t->subtype == task_subtype_density) {
      t2 = scheduler_addtask(sched, task_type_sub, task_subtype_force, t->flags,
                             0, t->ci, t->cj, 0);
      if (t->ci->nodeID == nodeID) {
        scheduler_addunlock(sched, t, t->ci->super->ghost);
        scheduler_addunlock(sched, t->ci->super->ghost, t2);
Matthieu Schaller's avatar
Matthieu Schaller committed
959
        scheduler_addunlock(sched, t2, t->ci->super->kick);
960
961
962
963
964
      }
      if (t->cj != NULL && t->cj->nodeID == nodeID &&
          t->ci->super != t->cj->super) {
        scheduler_addunlock(sched, t, t->cj->super->ghost);
        scheduler_addunlock(sched, t->cj->super->ghost, t2);
Matthieu Schaller's avatar
Matthieu Schaller committed
965
        scheduler_addunlock(sched, t2, t->cj->super->kick);
966
967
968
969
970
971
972
973
974
      }
      t->ci->force = engine_addlink(e, t->ci->force, t2);
      atomic_inc(&t->ci->nr_force);
      if (t->cj != NULL) {
        t->cj->force = engine_addlink(e, t->cj->force, t2);
        atomic_inc(&t->cj->nr_force);
      }
    }

Matthieu Schaller's avatar
Matthieu Schaller committed
975
976
977
    /* /\* Kick tasks should rely on the grav_down tasks of their cell. *\/ */
    /* else if (t->type == task_type_kick && t->ci->grav_down != NULL) */
    /*   scheduler_addunlock(sched, t->ci->grav_down, t); */
978
979
980
981
982
983
984
985
986
987
988
  }

/* Add the communication tasks if MPI is being used. */
#ifdef WITH_MPI

  /* Loop over the proxies. */
  for (int pid = 0; pid < e->nr_proxies; pid++) {

    /* Get a handle on the proxy. */
    struct proxy *p = &e->proxies[pid];

989
    /* Loop through the proxy's incoming cells and add the
990
991
992
993
994
995
996
997
998
999
1000
       recv tasks. */
    for (k = 0; k < p->nr_cells_in; k++)
      engine_addtasks_recv(e, p->cells_in[k], NULL, NULL);

    /* Loop through the proxy's outgoing cells and add the
       send tasks. */
    for (k = 0; k < p->nr_cells_out; k++)
      engine_addtasks_send(e, p->cells_out[k], p->cells_in[0]);
  }

#endif
For faster browsing, not all history is shown. View entire blame