Skip to content
Snippets Groups Projects
engine.c 75.02 KiB
/*******************************************************************************
 * This file is part of SWIFT.
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
 *               2015 Peter W. Draper (p.w.draper@durham.ac.uk)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdbool.h>

/* MPI headers. */
#ifdef WITH_MPI
#include <mpi.h>
#endif

#ifdef HAVE_LIBNUMA
#include <numa.h>
#endif

/* This object's header. */
#include "engine.h"

/* Local headers. */
#include "atomic.h"
#include "cell.h"
#include "clocks.h"
#include "cycle.h"
#include "debug.h"
#include "error.h"
#include "hydro.h"
#include "minmax.h"
#include "part.h"
#include "partition.h"
#include "timers.h"

#define engine_redistribute_alloc_margin 1.2

const char *engine_policy_names[12] = {
    "none",          "rand",   "steal",        "keep",
    "block",         "fix_dt", "cpu_tight",    "mpi",
    "numa_affinity", "hydro",  "self_gravity", "external_gravity"};

/** The rank of the engine as a global variable (for messages). */
int engine_rank;

/**
 * @brief Link a density/force task to a cell.
 *
 * @param e The #engine.
 * @param l The #link.
 * @param t The #task.
 *
 * @return The new #link pointer.
 */

struct link *engine_addlink(struct engine *e, struct link *l, struct task *t) {

  const int ind = atomic_inc(&e->nr_links);
  if (ind >= e->size_links) {
    error("Link table overflow.");
  }
  struct link *res = &e->links[ind];
  res->next = l;
  res->t = t;
  return res;
}

/**
 * @brief Generate the ghosts all the O(Npart) tasks for a hierarchy of cells.
 *
 * Tasks are only created here. The dependencies will be added later on.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param super The super #cell.
 */

void engine_make_ghost_tasks(struct engine *e, struct cell *c,
                             struct cell *super) {

  struct scheduler *s = &e->sched;

  /* Am I the super-cell? */
  if (super == NULL && c->nr_tasks > 0) {

    /* Remember me. */
    super = c;

    /* Local tasks only... */
    if (c->nodeID == e->nodeID) {

      /* Generate the ghost task. */
      c->ghost = scheduler_addtask(s, task_type_ghost, task_subtype_none, 0, 0,
                                   c, NULL, 0);
      /* Add the drift task. */
      c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0, 0,
                                   c, NULL, 0);
      /* Add the init task. */
      c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0, c,
                                  NULL, 0);
      /* Add the kick task. */
      c->kick = scheduler_addtask(s, task_type_kick, task_subtype_none, 0, 0, c,
                                  NULL, 0);
    }
  }

  /* Set the super-cell. */
  c->super = super;

  /* Recurse. */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_make_ghost_tasks(e, c->progeny[k], super);
}

/**
 * @brief Redistribute the particles amongst the nodes according
 *      to their cell's node IDs.
 *
 * The strategy here is as follows:
 * 1) Each node counts the number of particles it has to send to each other
 * node.
 * 2) The number of particles of each type is then exchanged.
 * 3) The particles to send are placed in a temporary buffer in which the
 * part-gpart links are preserved.
 * 4) Each node allocates enough space for the new particles.
 * 5) (Asynchronous) communications are issued to transfer the data.
 *
 *
 * @param e The #engine.
 */
void engine_redistribute(struct engine *e) {

#ifdef WITH_MPI

  const int nr_nodes = e->nr_nodes;
  const int nodeID = e->nodeID;
  struct space *s = e->s;
  struct cell *cells = s->cells;
  const int nr_cells = s->nr_cells;
  const int *cdim = s->cdim;
  const double ih[3] = {s->ih[0], s->ih[1], s->ih[2]};
  const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
  struct part *parts = s->parts;
  struct xpart *xparts = s->xparts;
  struct gpart *gparts = s->gparts;
  ticks tic = getticks();

  /* Allocate temporary arrays to store the counts of particles to be sent
     and the destination of each particle */
  int *counts, *g_counts;
  if ((counts = (int *)malloc(sizeof(int) *nr_nodes *nr_nodes)) == NULL)
    error("Failed to allocate count temporary buffer.");
  if ((g_counts = (int *)malloc(sizeof(int) *nr_nodes *nr_nodes)) == NULL)
    error("Failed to allocate gcount temporary buffer.");
  bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
  bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);

  // MATTHIEU: Should be int and not size_t once Pedro's changes are merged.
  size_t *dest, *g_dest;
  if((dest = (size_t *)malloc(sizeof(size_t) * s->nr_parts)) == NULL)
    error("Failed to allocate dest temporary buffer.");
  if((g_dest = (size_t *)malloc(sizeof(size_t) * s->nr_gparts)) == NULL)
    error("Failed to allocate g_dest temporary buffer.");

  
  /* The counts array is indexed as count[from * nr_nodes + to]. */
  for (size_t k = 0; k < s->nr_parts; k++) {

    /* Periodic boundary conditions */
    for (int j = 0; j < 3; j++) {
      if (parts[k].x[j] < 0.0)
        parts[k].x[j] += dim[j];
      else if (parts[k].x[j] >= dim[j])
        parts[k].x[j] -= dim[j];
    }
    const int cid = cell_getid(cdim, parts[k].x[0] * ih[0],
                               parts[k].x[1] * ih[1], parts[k].x[2] * ih[2]);
    /* if (cid < 0 || cid >= s->nr_cells)
       error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].",
             cid, k, parts[k].x[0], parts[k].x[1], parts[k].x[2]); */
    dest[k] = cells[cid].nodeID;
    counts[nodeID * nr_nodes + dest[k]] += 1;
  }

  /* Sort the particles according to their cell index. */
  space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);

  /* Now, do the same for the gparts */
  for (size_t k = 0; k < s->nr_gparts; k++) {

    /* Periodic boundary conditions */
    for (int j = 0; j < 3; j++) {
      if (gparts[k].x[j] < 0.0)
        gparts[k].x[j] += dim[j];
      else if (gparts[k].x[j] >= dim[j])
        gparts[k].x[j] -= dim[j];
    }
    const int cid = cell_getid(cdim, gparts[k].x[0] * ih[0],
                               gparts[k].x[1] * ih[1], gparts[k].x[2] * ih[2]);
    /* if (cid < 0 || cid >= s->nr_cells)
       error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].",
             cid, k, g_parts[k].x[0], g_parts[k].x[1], g_parts[k].x[2]); */
    g_dest[k] = cells[cid].nodeID;
    g_counts[nodeID * nr_nodes + dest[k]] += 1;
  }

  /* Sort the gparticles according to their cell index. */
  space_gparts_sort(gparts, g_dest, s->nr_gparts, 0, nr_nodes - 1);

  /* Get all the counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
                    MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce particle transfer counts.");

  /* Get all the g_counts from all the nodes. */
  if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
                    MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
    error("Failed to allreduce gparticle transfer counts.");

  /* Each node knows how many parts and gparts will be transferred to every
     other node. We can start preparing to receive data */

  /* Get the new number of parts and gparts for this node */
  size_t nr_parts = 0, nr_gparts = 0;
  for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
  for (int k = 0; k < nr_nodes; k++)
    nr_gparts += g_counts[k * nr_nodes + nodeID];

  /* Allocate the new arrays with some extra margin */
  struct part *parts_new = NULL;
  struct xpart *xparts_new = NULL;
  struct gpart *gparts_new = NULL;
  if (posix_memalign((void **)&parts_new, part_align,
                     sizeof(struct part) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new part data.");
  if (posix_memalign((void **)&xparts_new, xpart_align,
                     sizeof(struct xpart) * nr_parts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new xpart data.");
  if (posix_memalign((void **)&gparts_new, gpart_align,
                     sizeof(struct gpart) * nr_gparts *
                         engine_redistribute_alloc_margin) != 0)
    error("Failed to allocate new gpart data.");

  /* Prepare MPI requests for the asynchronous communications */
  MPI_Request *reqs;
  if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
      NULL)
    error("Failed to allocate MPI request list.");
  for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;

  /* Emit the sends and recvs for the particle and gparticle data. */
  size_t offset_send = 0, offset_recv = 0;
  size_t g_offset_send = 0, g_offset_recv = 0;
  for (int k = 0; k < nr_nodes; k++) {

    /* Indices in the count arrays of the node of interest */
    const int ind_send = nodeID * nr_nodes + k;
    const int ind_recv = k * nr_nodes + nodeID;

    /* Are we sending any part/xpart ? */
    if (counts[ind_send] > 0) {

      /* If the send is to the same node, just copy */
      if (k == nodeID) {
        memcpy(&parts_new[offset_recv], &s->parts[offset_send],
               sizeof(struct part) * counts[ind_recv]);
        memcpy(&xparts_new[offset_recv], &s->xparts[offset_send],
               sizeof(struct xpart) * counts[ind_recv]);
        offset_send += counts[ind_send];
        offset_recv += counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->parts[offset_send], counts[ind_send],
                      e->part_mpi_type, k, 3 * ind_send + 0, MPI_COMM_WORLD,
                      &reqs[6 * k]) != MPI_SUCCESS)
          error("Failed to isend parts to node %i.", k);
        if (MPI_Isend(&s->xparts[offset_send], counts[ind_send],
                      e->xpart_mpi_type, k, 3 * ind_send + 1, MPI_COMM_WORLD,
                      &reqs[6 * k + 1]) != MPI_SUCCESS)
          error("Failed to isend xparts to node %i.", k);
        offset_send += counts[ind_send];
      }
    }

    /* Are we sending any gpart ? */
    if (g_counts[ind_send] > 0) {

      /* If the send is to the same node, just copy */
      if (k == nodeID) {
        memcpy(&gparts_new[g_offset_recv], &s->gparts[offset_send],
               sizeof(struct gpart) * g_counts[ind_recv]);
        g_offset_send += g_counts[ind_send];
        g_offset_recv += g_counts[ind_recv];

        /* Else, emit some communications */
      } else {
        if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
                      e->gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
                      &reqs[6 * k + 2]) != MPI_SUCCESS)
          error("Failed to isend gparts to node %i.", k);
        g_offset_send += counts[ind_send];
      }
    }

    /* Now emit the corresponding Irecv() */

    /* Are we receiving any part/xpart from this node ? */
    if (k != nodeID && counts[ind_recv] > 0) {
      if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], e->part_mpi_type,
                    k, 3 * ind_recv + 0, MPI_COMM_WORLD,
                    &reqs[6 * k + 3]) != MPI_SUCCESS)
        error("Failed to emit irecv of parts from node %i.", k);
      if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv],
                    e->xpart_mpi_type, k, 3 * ind_recv + 1, MPI_COMM_WORLD,
                    &reqs[6 * k + 4]) != MPI_SUCCESS)
        error("Failed to emit irecv of xparts from node %i.", k);
      offset_recv += counts[ind_recv];
    }

    /* Are we receiving any gpart from this node ? */
    if (k != nodeID && g_counts[ind_recv] > 0) {
      if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
                    e->gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
                    &reqs[6 * k + 5]) != MPI_SUCCESS)
        error("Failed to emit irecv of gparts from node %i.", k);
      g_offset_recv += g_counts[ind_recv];
    }
  }

  /* Wait for all the sends and recvs to tumble in. */
  MPI_Status stats[6 * nr_nodes];
  int res;
  if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
    for (int k = 0; k < 6 * nr_nodes; k++) {
      char buff[MPI_MAX_ERROR_STRING];
      int res;
      MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
      message("request %i has error '%s'.", k, buff);
    }
    error("Failed during waitall for part data.");
  }

  /* Verify that all parts are in the right place. */
  /* for ( k = 0 ; k < nr_parts ; k++ ) {
      cid = cell_getid( cdim , parts_new[k].x[0]*ih[0] , parts_new[k].x[1]*ih[1]
     , parts_new[k].x[2]*ih[2] );
      if ( cells[ cid ].nodeID != nodeID )
          error( "Received particle (%i) that does not belong here (nodeID=%i)."
     , k , cells[ cid ].nodeID );
      } */

  /* Set the new part data, free the old. */
  free(parts);
  free(xparts);
  free(gparts);
  s->parts = parts_new;
  s->xparts = xparts_new;
  s->gparts = gparts_new;
  s->nr_parts = nr_parts;
  s->nr_gparts = nr_gparts;
  s->size_parts = engine_redistribute_alloc_margin * nr_parts;
  s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;

  /* Clean up the temporary stuff. */
  free(reqs);
  free(counts);
  free(dest);

  /* Be verbose about what just happened. */
  if (e->verbose) {
    int my_cells = 0;
    for (int k = 0; k < nr_cells; k++)
      if (cells[k].nodeID == nodeID) my_cells += 1;
    message("node %i now has %zi parts in %i cells.", nodeID, nr_parts,
            my_cells);
  }

  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
#else
  error("SWIFT was not compiled with MPI support.");
#endif
}

/**
 * @brief Repartition the cells amongst the nodes.
 *
 * @param e The #engine.
 */

void engine_repartition(struct engine *e) {

#if defined(WITH_MPI) && defined(HAVE_METIS)

  ticks tic = getticks();

  /* Clear the repartition flag. */
  enum repartition_type reparttype = e->forcerepart;
  e->forcerepart = REPART_NONE;

  /* Nothing to do if only using a single node. Also avoids METIS
   * bug that doesn't handle this case well. */
  if (e->nr_nodes == 1) return;

  /* Do the repartitioning. */
  partition_repartition(reparttype, e->nodeID, e->nr_nodes, e->s,
                        e->sched.tasks, e->sched.nr_tasks);

  /* Now comes the tricky part: Exchange particles between all nodes.
     This is done in two steps, first allreducing a matrix of
     how many particles go from where to where, then re-allocating
     the parts array, and emitting the sends and receives.
     Finally, the space, tasks, and proxies need to be rebuilt. */

  /* Redistribute the particles between the nodes. */
  engine_redistribute(e);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Tell the engine it should re-build whenever possible */
  e->forcerebuild = 1;

  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
#else
  error("SWIFT was not compiled with MPI and METIS support.");
#endif
}

/**
 * @brief Add up/down gravity tasks to a cell hierarchy.
 *
 * @param e The #engine.
 * @param c The #cell
 * @param up The upward gravity #task.
 * @param down The downward gravity #task.
 */

void engine_addtasks_grav(struct engine *e, struct cell *c, struct task *up,
                          struct task *down) {

  /* Link the tasks to this cell. */
  c->grav_up = up;
  c->grav_down = down;

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_grav(e, c->progeny[k], up, down);
}

/**
 * @brief Add send tasks to a hierarchy of cells.
 *
 * @param e The #engine.
 * @param ci The sending #cell.
 * @param cj The receiving #cell
 */

void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj) {

#ifdef WITH_MPI
  struct link *l = NULL;
  struct scheduler *s = &e->sched;

  /* Check if any of the density tasks are for the target node. */
  for (l = ci->density; l != NULL; l = l->next)
    if (l->t->ci->nodeID == cj->nodeID ||
        (l->t->cj != NULL && l->t->cj->nodeID == cj->nodeID))
      break;

  /* If so, attach send tasks. */
  if (l != NULL) {

    /* Create the tasks. */
    struct task *t_xv = scheduler_addtask(s, task_type_send, task_subtype_none,
                                          2 * ci->tag, 0, ci, cj, 0);
    struct task *t_rho = scheduler_addtask(s, task_type_send, task_subtype_none,
                                           2 * ci->tag + 1, 0, ci, cj, 0);

    /* The send_rho task depends on the cell's ghost task. */
    scheduler_addunlock(s, ci->super->ghost, t_rho);

    /* The send_rho task should unlock the super-cell's kick task. */
    scheduler_addunlock(s, t_rho, ci->super->kick);

    /* The send_xv task should unlock the super-cell's ghost task. */
    scheduler_addunlock(s, t_xv, ci->super->ghost);

  }

  /* Recurse? */
  else if (ci->split)
    for (int k = 0; k < 8; k++)
      if (ci->progeny[k] != NULL) engine_addtasks_send(e, ci->progeny[k], cj);

#else
  error("SWIFT was not compiled with MPI support.");
#endif
}

/**
 * @brief Add recv tasks to a hierarchy of cells.
 *
 * @param e The #engine.
 * @param c The #cell.
 * @param t_xv The recv_xv #task, if it has already been created.
 * @param t_rho The recv_rho #task, if it has already been created.
 */

void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv,
                          struct task *t_rho) {

#ifdef WITH_MPI
  struct scheduler *s = &e->sched;

  /* Do we need to construct a recv task? */
  if (t_xv == NULL && c->nr_density > 0) {

    /* Create the tasks. */
    t_xv = c->recv_xv = scheduler_addtask(s, task_type_recv, task_subtype_none,
                                          2 * c->tag, 0, c, NULL, 0);
    t_rho = c->recv_rho = scheduler_addtask(
        s, task_type_recv, task_subtype_none, 2 * c->tag + 1, 0, c, NULL, 0);
  }

  /* Add dependencies. */
  for (struct link *l = c->density; l != NULL; l = l->next) {
    scheduler_addunlock(s, t_xv, l->t);
    scheduler_addunlock(s, l->t, t_rho);
  }
  for (struct link *l = c->force; l != NULL; l = l->next)
    scheduler_addunlock(s, t_rho, l->t);
  if (c->sorts != NULL) scheduler_addunlock(s, t_xv, c->sorts);

  /* Recurse? */
  if (c->split)
    for (int k = 0; k < 8; k++)
      if (c->progeny[k] != NULL)
        engine_addtasks_recv(e, c->progeny[k], t_xv, t_rho);

#else
  error("SWIFT was not compiled with MPI support.");
#endif
}

/**
 * @brief Exchange cell structures with other nodes.
 *
 * @param e The #engine.
 */

void engine_exchange_cells(struct engine *e) {

#ifdef WITH_MPI

  struct space *s = e->s;
  struct cell *cells = s->cells;
  const int nr_cells = s->nr_cells;
  const int nr_proxies = e->nr_proxies;
  int offset[nr_cells];
  MPI_Request reqs_in[engine_maxproxies];
  MPI_Request reqs_out[engine_maxproxies];
  MPI_Status status;
  ticks tic = getticks();

  /* Run through the cells and get the size of the ones that will be sent off.
   */
  int count_out = 0;
  for (int k = 0; k < nr_cells; k++) {
    offset[k] = count_out;
    if (cells[k].sendto)
      count_out += (cells[k].pcell_size = cell_getsize(&cells[k]));
  }

  /* Allocate the pcells. */
  struct pcell *pcells;
  if ((pcells = (struct pcell *)malloc(sizeof(struct pcell) * count_out)) ==
      NULL)
    error("Failed to allocate pcell buffer.");

  /* Pack the cells. */
  cell_next_tag = 0;
  for (int k = 0; k < nr_cells; k++)
    if (cells[k].sendto) {
      cell_pack(&cells[k], &pcells[offset[k]]);
      cells[k].pcell = &pcells[offset[k]];
    }

  /* Launch the proxies. */
  for (int k = 0; k < nr_proxies; k++) {
    proxy_cells_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_cells_count_in;
    reqs_out[k] = e->proxies[k].req_cells_count_out;
  }

  /* Wait for each count to come in and start the recv. */
  for (int k = 0; k < nr_proxies; k++) {
    int pid;
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_cells_exch2(&e->proxies[pid]);
  }

  /* Wait for all the sends to have finished too. */
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Set the requests for the cells. */
  for (int k = 0; k < nr_proxies; k++) {
    reqs_in[k] = e->proxies[k].req_cells_in;
    reqs_out[k] = e->proxies[k].req_cells_out;
  }

  /* Wait for each pcell array to come in from the proxies. */
  for (int k = 0; k < nr_proxies; k++) {
    int pid;
    if (MPI_Waitany(nr_proxies, reqs_in, &pid, &status) != MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "cell data from proxy %i has arrived." , pid );
    for (int count = 0, j = 0; j < e->proxies[pid].nr_cells_in; j++)
      count += cell_unpack(&e->proxies[pid].pcells_in[count],
                           e->proxies[pid].cells_in[j], e->s);
  }

  /* Wait for all the sends to have finished too. */
  if (MPI_Waitall(nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Count the number of particles we need to import and re-allocate
     the buffer if needed. */
  int count_in = 0;
  for (int k = 0; k < nr_proxies; k++)
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++)
      count_in += e->proxies[k].cells_in[j]->count;
  if (count_in > s->size_parts_foreign) {
    if (s->parts_foreign != NULL) free(s->parts_foreign);
    s->size_parts_foreign = 1.1 * count_in;
    if (posix_memalign((void **)&s->parts_foreign, part_align,
                       sizeof(struct part) * s->size_parts_foreign) != 0)
      error("Failed to allocate foreign part data.");
  }

  /* Unpack the cells and link to the particle data. */
  struct part *parts = s->parts_foreign;
  for (int k = 0; k < nr_proxies; k++) {
    for (int j = 0; j < e->proxies[k].nr_cells_in; j++) {
      cell_link(e->proxies[k].cells_in[j], parts);
      parts = &parts[e->proxies[k].cells_in[j]->count];
    }
  }
  s->nr_parts_foreign = parts - s->parts_foreign;

  /* Is the parts buffer large enough? */
  if (s->nr_parts_foreign > s->size_parts_foreign)
    error("Foreign parts buffer too small.");

  /* Free the pcell buffer. */
  free(pcells);

  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());

#else
  error("SWIFT was not compiled with MPI support.");
#endif
}

/**
 * @brief Exchange straying parts with other nodes.
 *
 * @param e The #engine.
 * @param offset The index in the parts array as of which the foreign parts
 *reside.
 * @param ind The ID of the foreign #cell.
 * @param N The number of stray parts.
 *
 * @return The number of arrived parts copied to parts and xparts.
 */

int engine_exchange_strays(struct engine *e, int offset, size_t *ind,
                           size_t N) {

#ifdef WITH_MPI

  struct space *s = e->s;
  ticks tic = getticks();

  /* Re-set the proxies. */
  for (int k = 0; k < e->nr_proxies; k++) e->proxies[k].nr_parts_out = 0;

  /* Put the parts into the corresponding proxies. */
  for (size_t k = 0; k < N; k++) {
    const int node_id = e->s->cells[ind[k]].nodeID;
    if (node_id < 0 || node_id >= e->nr_nodes)
      error("Bad node ID %i.", node_id);
    const int pid = e->proxy_ind[node_id];
    if (pid < 0)
      error(
          "Do not have a proxy for the requested nodeID %i for part with "
          "id=%llu, x=[%e,%e,%e].",
          node_id, s->parts[offset + k].id, s->parts[offset + k].x[0],
          s->parts[offset + k].x[1], s->parts[offset + k].x[2]);
    proxy_parts_load(&e->proxies[pid], &s->parts[offset + k],
                     &s->xparts[offset + k], 1);
  }

  /* Launch the proxies. */
  MPI_Request reqs_in[2 * engine_maxproxies];
  MPI_Request reqs_out[2 * engine_maxproxies];
  for (int k = 0; k < e->nr_proxies; k++) {
    proxy_parts_exch1(&e->proxies[k]);
    reqs_in[k] = e->proxies[k].req_parts_count_in;
    reqs_out[k] = e->proxies[k].req_parts_count_out;
  }

  /* Wait for each count to come in and start the recv. */
  for (int k = 0; k < e->nr_proxies; k++) {
    int pid;
    if (MPI_Waitany(e->nr_proxies, reqs_in, &pid, MPI_STATUS_IGNORE) !=
            MPI_SUCCESS ||
        pid == MPI_UNDEFINED)
      error("MPI_Waitany failed.");
    // message( "request from proxy %i has arrived." , pid );
    proxy_parts_exch2(&e->proxies[pid]);
  }

  /* Wait for all the sends to have finished too. */
  if (MPI_Waitall(e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
    error("MPI_Waitall on sends failed.");

  /* Count the total number of incoming particles and make sure we have
     enough space to accommodate them. */
  size_t count_in = 0;
  for (int k = 0; k < e->nr_proxies; k++) count_in += e->proxies[k].nr_parts_in;
  if (e->verbose) message("sent out %zi particles, got %zi back.", N, count_in);
  if (offset + count_in > s->size_parts) {
    s->size_parts = (offset + count_in) * 1.05;
    struct part *parts_new = NULL;
    struct xpart *xparts_new = NULL;
    if (posix_memalign((void **)&parts_new, part_align,
                       sizeof(struct part) * s->size_parts) != 0 ||
        posix_memalign((void **)&xparts_new, part_align,
                       sizeof(struct xpart) * s->size_parts) != 0)
      error("Failed to allocate new part data.");
    memcpy(parts_new, s->parts, sizeof(struct part) * offset);
    memcpy(xparts_new, s->xparts, sizeof(struct xpart) * offset);
    free(s->parts);
    free(s->xparts);
    s->parts = parts_new;
    s->xparts = xparts_new;
  }

  /* Collect the requests for the particle data from the proxies. */
  int nr_in = 0, nr_out = 0;
  for (int k = 0; k < e->nr_proxies; k++) {
    if (e->proxies[k].nr_parts_in > 0) {
      reqs_in[2 * k] = e->proxies[k].req_parts_in;
      reqs_in[2 * k + 1] = e->proxies[k].req_xparts_in;
      nr_in += 1;
    } else
      reqs_in[2 * k] = reqs_in[2 * k + 1] = MPI_REQUEST_NULL;
    if (e->proxies[k].nr_parts_out > 0) {
      reqs_out[2 * k] = e->proxies[k].req_parts_out;
      reqs_out[2 * k + 1] = e->proxies[k].req_xparts_out;
      nr_out += 1;
    } else
      reqs_out[2 * k] = reqs_out[2 * k + 1] = MPI_REQUEST_NULL;
  }

  /* Wait for each part array to come in and collect the new
     parts from the proxies. */
  size_t count = 0;
  for (int k = 0; k < 2 * (nr_in + nr_out); k++) {
    int err, pid;
    if ((err = MPI_Waitany(2 * e->nr_proxies, reqs_in, &pid,
                           MPI_STATUS_IGNORE)) != MPI_SUCCESS) {
      char buff[MPI_MAX_ERROR_STRING];
      int res;
      MPI_Error_string(err, buff, &res);
      error("MPI_Waitany failed (%s).", buff);
    }
    if (pid == MPI_UNDEFINED) break;
    // message( "request from proxy %i has arrived." , pid );
    if (reqs_in[pid & ~1] == MPI_REQUEST_NULL &&
        reqs_in[pid | 1] == MPI_REQUEST_NULL) {
      struct proxy *p = &e->proxies[pid >> 1];
      memcpy(&s->parts[offset + count], p->parts_in,
             sizeof(struct part) * p->nr_parts_in);
      memcpy(&s->xparts[offset + count], p->xparts_in,
             sizeof(struct xpart) * p->nr_parts_in);
      /* for (int k = offset; k < offset + count; k++)
         message(
            "received particle %lli, x=[%.3e %.3e %.3e], h=%.3e, from node %i.",
            s->parts[k].id, s->parts[k].x[0], s->parts[k].x[1],
            s->parts[k].x[2], s->parts[k].h, p->nodeID); */
      count += p->nr_parts_in;
    }
  }

  /* Wait for all the sends to have finished too. */
  if (nr_out > 0)
    if (MPI_Waitall(2 * e->nr_proxies, reqs_out, MPI_STATUSES_IGNORE) !=
        MPI_SUCCESS)
      error("MPI_Waitall on sends failed.");

  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());

  /* Return the number of harvested parts. */
  return count;

#else
  error("SWIFT was not compiled with MPI support.");
  return 0;
#endif
}

/**
 * @brief Constructs the top-level pair tasks for the first hydro loop over
 *neighbours
 *
 * Here we construct all the tasks for all possible neighbouring non-empty
 * local cells in the hierarchy. No dependencies are being added thus far.
 * Additional loop over neighbours can later be added by simply duplicating
 * all the tasks created by this function.
 *
 * @param e The #engine.
 */
void engine_make_hydroloop_tasks(struct engine *e) {

  struct space *s = e->s;
  struct scheduler *sched = &e->sched;
  const int nodeID = e->nodeID;
  const int *cdim = s->cdim;
  struct cell *cells = s->cells;

  /* Run through the highest level of cells and add pairs. */
  for (int i = 0; i < cdim[0]; i++) {
    for (int j = 0; j < cdim[1]; j++) {
      for (int k = 0; k < cdim[2]; k++) {

        /* Get the cell */
        const int cid = cell_getid(cdim, i, j, k);
        struct cell *ci = &cells[cid];

        /* Skip cells without hydro particles */
        if (ci->count == 0) continue;

        /* If the cells is local build a self-interaction */
        if (ci->nodeID == nodeID)
          scheduler_addtask(sched, task_type_self, task_subtype_density, 0, 0,
                            ci, NULL, 0);

        /* Now loop over all the neighbours of this cell */
        for (int ii = -1; ii < 2; ii++) {
          int iii = i + ii;
          if (!s->periodic && (iii < 0 || iii >= cdim[0])) continue;
          iii = (iii + cdim[0]) % cdim[0];
          for (int jj = -1; jj < 2; jj++) {
            int jjj = j + jj;
            if (!s->periodic && (jjj < 0 || jjj >= cdim[1])) continue;
            jjj = (jjj + cdim[1]) % cdim[1];
            for (int kk = -1; kk < 2; kk++) {
              int kkk = k + kk;
              if (!s->periodic && (kkk < 0 || kkk >= cdim[2])) continue;
              kkk = (kkk + cdim[2]) % cdim[2];

              /* Get the neighbouring cell */
              const int cjd = cell_getid(cdim, iii, jjj, kkk);
              struct cell *cj = &cells[cjd];

              /* Is that neighbour local and does it have particles ? */
              if (cid >= cjd || cj->count == 0 ||
                  (ci->nodeID != nodeID && cj->nodeID != nodeID))
                continue;

              /* Construct the pair task */
              const int sid =
                  sortlistID[(kk + 1) + 3 * ((jj + 1) + 3 * (ii + 1))];
              scheduler_addtask(sched, task_type_pair, task_subtype_density,
                                sid, 0, ci, cj, 1);
            }
          }
        }
      }
    }
  }
}

/**
 * @brief Counts the tasks associated with one cell and constructs the links
 *
 * For each hydrodynamic task, construct the links with the corresponding cell.
 * Similarly, construct the dependencies for all the sorting tasks.
 *
 * @param e The #engine.
 */
void engine_count_and_link_tasks(struct engine *e) {

  struct scheduler *sched = &e->sched;
  const int nr_tasks = sched->nr_tasks;

  for (int k = 0; k < nr_tasks; k++) {

    /* Get the current task. */
    struct task *t = &sched->tasks[k];
    if (t->skip) continue;

    /* Link sort tasks together. */
    if (t->type == task_type_sort && t->ci->split)
      for (int j = 0; j < 8; j++)
        if (t->ci->progeny[j] != NULL && t->ci->progeny[j]->sorts != NULL) {
          t->ci->progeny[j]->sorts->skip = 0;
          scheduler_addunlock(sched, t->ci->progeny[j]->sorts, t);
        }

    /* Link density tasks to cells. */
    if (t->type == task_type_self) {
      atomic_inc(&t->ci->nr_tasks);
      if (t->subtype == task_subtype_density) {
        t->ci->density = engine_addlink(e, t->ci->density, t);
        atomic_inc(&t->ci->nr_density);
      }
    } else if (t->type == task_type_pair) {
      atomic_inc(&t->ci->nr_tasks);
      atomic_inc(&t->cj->nr_tasks);
      if (t->subtype == task_subtype_density) {
        t->ci->density = engine_addlink(e, t->ci->density, t);
        atomic_inc(&t->ci->nr_density);
        t->cj->density = engine_addlink(e, t->cj->density, t);
        atomic_inc(&t->cj->nr_density);
      }
    } else if (t->type == task_type_sub) {
      atomic_inc(&t->ci->nr_tasks);
      if (t->cj != NULL) atomic_inc(&t->cj->nr_tasks);
      if (t->subtype == task_subtype_density) {
        t->ci->density = engine_addlink(e, t->ci->density, t);
        atomic_inc(&t->ci->nr_density);
        if (t->cj != NULL) {
          t->cj->density = engine_addlink(e, t->cj->density, t);
          atomic_inc(&t->cj->nr_density);
        }
      }
    }

    /* /\* Link gravity multipole tasks to the up/down tasks. *\/ */
    /* if (t->type == task_type_grav_mm || */
    /*     (t->type == task_type_sub && t->subtype == task_subtype_grav)) { */
    /*   atomic_inc(&t->ci->nr_tasks); */
    /*   scheduler_addunlock(sched, t->ci->grav_up, t); */
    /*   scheduler_addunlock(sched, t, t->ci->grav_down); */
    /*   if (t->cj != NULL && t->ci->grav_up != t->cj->grav_up) { */
    /*     scheduler_addunlock(sched, t->cj->grav_up, t); */
    /*     scheduler_addunlock(sched, t, t->cj->grav_down); */
    /*   } */
    /* } */
  }
}

/**
 * @brief Duplicates the first hydro loop and construct all the
 * dependencies for the hydro part
 *
 * This is done by looping over all the previously constructed tasks
 * and adding another task involving the same cells but this time
 * corresponding to the second hydro loop over neighbours.
 * With all the relevant tasks for a given cell available, we construct
 * all the dependencies for that cell.
 *
 * @param e The #engine.
 */
void engine_make_extra_hydroloop_tasks(struct engine *e) {

  struct scheduler *sched = &e->sched;
  const int nodeID = e->nodeID;
  const int nr_tasks = sched->nr_tasks;

  for (int k = 0; k < nr_tasks; k++) {

    /* Get a pointer to the task. */
    struct task *t = &sched->tasks[k];

    /* Skip? */
    if (t->skip) continue;

    /* Self-interaction? */
    if (t->type == task_type_self && t->subtype == task_subtype_density) {

      /* Start by constructing the task for the second hydro loop */
      struct task *t2 = scheduler_addtask(
          sched, task_type_self, task_subtype_force, 0, 0, t->ci, NULL, 0);

      /* Add the link between the new loop and the cell */
      t->ci->force = engine_addlink(e, t->ci->force, t2);
      atomic_inc(&t->ci->nr_force);

      /* Now, build all the dependencies for the hydro */
      /* init --> t (density loop) --> ghost --> t2 (force loop) --> kick */
      scheduler_addunlock(sched, t->ci->super->init, t);
      scheduler_addunlock(sched, t, t->ci->super->ghost);
      scheduler_addunlock(sched, t->ci->super->ghost, t2);
      scheduler_addunlock(sched, t2, t->ci->super->kick);
    }

    /* Otherwise, pair interaction? */
    else if (t->type == task_type_pair && t->subtype == task_subtype_density) {

      /* Start by constructing the task for the second hydro loop */
      struct task *t2 = scheduler_addtask(
          sched, task_type_pair, task_subtype_force, 0, 0, t->ci, t->cj, 0);

      /* Add the link between the new loop and both cells */
      t->ci->force = engine_addlink(e, t->ci->force, t2);
      atomic_inc(&t->ci->nr_force);
      t->cj->force = engine_addlink(e, t->cj->force, t2);
      atomic_inc(&t->cj->nr_force);

      /* Now, build all the dependencies for the hydro for the cells */
      /* that are local and are not descendant of the same super-cells */
      /* init --> t (density loop) --> ghost --> t2 (force loop) --> kick */
      if (t->ci->nodeID == nodeID) {
        scheduler_addunlock(sched, t->ci->super->init, t);
        scheduler_addunlock(sched, t, t->ci->super->ghost);
        scheduler_addunlock(sched, t->ci->super->ghost, t2);
        scheduler_addunlock(sched, t2, t->ci->super->kick);
      }
      if (t->cj->nodeID == nodeID && t->ci->super != t->cj->super) {
        scheduler_addunlock(sched, t->cj->super->init, t);
        scheduler_addunlock(sched, t, t->cj->super->ghost);
        scheduler_addunlock(sched, t->cj->super->ghost, t2);
        scheduler_addunlock(sched, t2, t->cj->super->kick);
      }
    }

    /* Otherwise, sub interaction? */
    else if (t->type == task_type_sub && t->subtype == task_subtype_density) {

      /* Start by constructing the task for the second hydro loop */
      struct task *t2 =
          scheduler_addtask(sched, task_type_sub, task_subtype_force, t->flags,
                            0, t->ci, t->cj, 0);

      /* Add the link between the new loop and both cells */
      t->ci->force = engine_addlink(e, t->ci->force, t2);
      atomic_inc(&t->ci->nr_force);
      if (t->cj != NULL) {
        t->cj->force = engine_addlink(e, t->cj->force, t2);
        atomic_inc(&t->cj->nr_force);
      }

      /* Now, build all the dependencies for the hydro for the cells */
      /* that are local and are not descendant of the same super-cells */
      /* init --> t (density loop) --> ghost --> t2 (force loop) --> kick */
      if (t->ci->nodeID == nodeID) {
        scheduler_addunlock(sched, t, t->ci->super->ghost);
        scheduler_addunlock(sched, t->ci->super->ghost, t2);
        scheduler_addunlock(sched, t2, t->ci->super->kick);
      }
      if (t->cj != NULL && t->cj->nodeID == nodeID &&
          t->ci->super != t->cj->super) {
        scheduler_addunlock(sched, t, t->cj->super->ghost);
        scheduler_addunlock(sched, t->cj->super->ghost, t2);
        scheduler_addunlock(sched, t2, t->cj->super->kick);
      }
    }

    /* /\* Kick tasks should rely on the grav_down tasks of their cell. *\/ */
    /* else if (t->type == task_type_kick && t->ci->grav_down != NULL) */
    /*   scheduler_addunlock(sched, t->ci->grav_down, t); */
  }
}

/**
 * @brief Constructs the top-level pair tasks for the gravity M-M interactions
 *
 * Correct implementation is still lacking here.
 *
 * @param e The #engine.
 */
void engine_make_gravityinteraction_tasks(struct engine *e) {

  struct space *s = e->s;
  struct scheduler *sched = &e->sched;
  const int nr_cells = s->nr_cells;
  struct cell *cells = s->cells;

  /* Loop over all cells. */
  for (int i = 0; i < nr_cells; i++) {

    /* If it has gravity particles, add a self-task */
    if (cells[i].gcount > 0) {
      scheduler_addtask(sched, task_type_grav_mm, task_subtype_none, -1, 0,
                        &cells[i], NULL, 0);

      /* Loop over all remainding cells */
      for (int j = i + 1; j < nr_cells; j++) {

        /* If that other cell has gravity parts, add a pair interaction */
        if (cells[j].gcount > 0) {
          scheduler_addtask(sched, task_type_grav_mm, task_subtype_none, -1, 0,
                            &cells[i], &cells[j], 0);
        }
      }
    }
  }
}

/**
 * @brief Constructs the gravity tasks building the multipoles and propagating
 *them to the children
 *
 * Correct implementation is still lacking here.
 *
 * @param e The #engine.
 */
void engine_make_gravityrecursive_tasks(struct engine *e) {

  struct space *s = e->s;
  struct scheduler *sched = &e->sched;
  const int nodeID = e->nodeID;
  const int nr_cells = s->nr_cells;
  struct cell *cells = s->cells;

  for (int k = 0; k < nr_cells; k++) {

    /* Only do this for local cells containing gravity particles */
    if (cells[k].nodeID == nodeID && cells[k].gcount > 0) {

      /* Create tasks at top level. */
      struct task *up =
          scheduler_addtask(sched, task_type_grav_up, task_subtype_none, 0, 0,
                            &cells[k], NULL, 0);
      struct task *down =
          scheduler_addtask(sched, task_type_grav_down, task_subtype_none, 0, 0,
                            &cells[k], NULL, 0);

      /* Push tasks down the cell hierarchy. */
      engine_addtasks_grav(e, &cells[k], up, down);
    }
  }
}

/**
 * @brief Fill the #space's task list.
 *
 * @param e The #engine we are working with.
 */
void engine_maketasks(struct engine *e) {

  struct space *s = e->s;
  struct scheduler *sched = &e->sched;
  struct cell *cells = s->cells;
  const int nr_cells = s->nr_cells;
  const ticks tic = getticks();

  /* Re-set the scheduler. */
  scheduler_reset(sched, s->tot_cells * engine_maxtaskspercell);

  /* Add the space sorting tasks. */
  for (int i = 0; i < e->nr_threads; i++)
    scheduler_addtask(sched, task_type_psort, task_subtype_none, i, 0, NULL,
                      NULL, 0);

  /* Construct the firt hydro loop over neighbours */
  engine_make_hydroloop_tasks(e);

  /* Add the gravity mm tasks. */
  if ((e->policy & engine_policy_self_gravity) == engine_policy_self_gravity)
    engine_make_gravityinteraction_tasks(e);

  /* Split the tasks. */
  scheduler_splittasks(sched);

  /* Allocate the list of cell-task links. The maximum number of links
     is the number of cells (s->tot_cells) times the number of neighbours (27)
     times the number of interaction types (2, density and force). */
  if (e->links != NULL) free(e->links);
  e->size_links = s->tot_cells * 27 * 2;
  if ((e->links = malloc(sizeof(struct link) * e->size_links)) == NULL)
    error("Failed to allocate cell-task links.");
  e->nr_links = 0;

  /* Add the gravity up/down tasks at the top-level cells and push them down. */
  if ((e->policy & engine_policy_self_gravity) == engine_policy_self_gravity)
    engine_make_gravityrecursive_tasks(e);

  /* Count the number of tasks associated with each cell and
     store the density tasks in each cell, and make each sort
     depend on the sorts of its progeny. */
  engine_count_and_link_tasks(e);

  /* Append a ghost task to each cell, and add kick tasks to the
     super cells. */
  for (int k = 0; k < nr_cells; k++)
    engine_make_ghost_tasks(e, &cells[k], NULL);

  /* Run through the tasks and make force tasks for each density task.
     Each force task depends on the cell ghosts and unlocks the kick task
     of its super-cell. */
  engine_make_extra_hydroloop_tasks(e);

  /* Add the communication tasks if MPI is being used. */
  if ((e->policy & engine_policy_mpi) == engine_policy_mpi) {

    /* Loop over the proxies. */
    for (int pid = 0; pid < e->nr_proxies; pid++) {

      /* Get a handle on the proxy. */
      struct proxy *p = &e->proxies[pid];

      /* Loop through the proxy's incoming cells and add the
         recv tasks. */
      for (int k = 0; k < p->nr_cells_in; k++)
        engine_addtasks_recv(e, p->cells_in[k], NULL, NULL);

      /* Loop through the proxy's outgoing cells and add the
         send tasks. */
      for (int k = 0; k < p->nr_cells_out; k++)
        engine_addtasks_send(e, p->cells_out[k], p->cells_in[0]);
    }
  }
  /* Set the unlocks per task. */
  scheduler_set_unlocks(sched);

  /* Rank the tasks. */
  scheduler_ranktasks(sched);

  /* Weight the tasks. */
  scheduler_reweight(sched);

  /* Set the tasks age. */
  e->tasks_age = 0;

  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
}

/**
 * @brief Mark tasks to be skipped and set the sort flags accordingly.
 *
 * @return 1 if the space has to be rebuilt, 0 otherwise.
 */

int engine_marktasks(struct engine *e) {

  struct scheduler *s = &e->sched;
  const int ti_end = e->ti_current;
  const int nr_tasks = s->nr_tasks;
  const int *const ind = s->tasks_ind;
  struct task *tasks = s->tasks;
  const ticks tic = getticks();

  /* Much less to do here if we're on a fixed time-step. */
  if ((e->policy & engine_policy_fixdt) == engine_policy_fixdt) {

    /* Run through the tasks and mark as skip or not. */
    for (int k = 0; k < nr_tasks; k++) {

      /* Get a handle on the kth task. */
      struct task *t = &tasks[ind[k]];

      /* Pair? */
      if (t->type == task_type_pair ||
          (t->type == task_type_sub && t->cj != NULL)) {

        /* Local pointers. */
        const struct cell *ci = t->ci;
        const struct cell *cj = t->cj;

        /* Too much particle movement? */
        if (t->tight &&
            (fmaxf(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin ||
             ci->dx_max > space_maxreldx * ci->h_max ||
             cj->dx_max > space_maxreldx * cj->h_max))
          return 1;

      }

      /* Sort? */
      else if (t->type == task_type_sort) {

        /* If all the sorts have been done, make this task implicit. */
        if (!(t->flags & (t->flags ^ t->ci->sorted))) t->implicit = 1;
      }
    }

    /* Multiple-timestep case */
  } else {

    /* Run through the tasks and mark as skip or not. */
    for (int k = 0; k < nr_tasks; k++) {

      /* Get a handle on the kth task. */
      struct task *t = &tasks[ind[k]];

      /* Sort-task? Note that due to the task ranking, the sorts
         will all come before the pairs. */
      if (t->type == task_type_sort) {

        /* Re-set the flags. */
        t->flags = 0;
        t->skip = 1;

      }

      /* Single-cell task? */
      else if (t->type == task_type_self || t->type == task_type_ghost ||
               (t->type == task_type_sub && t->cj == NULL)) {

        /* Set this task's skip. */
        t->skip = (t->ci->ti_end_min > ti_end);
      }

      /* Pair? */
      else if (t->type == task_type_pair ||
               (t->type == task_type_sub && t->cj != NULL)) {

        /* Local pointers. */
        const struct cell *ci = t->ci;
        const struct cell *cj = t->cj;

        /* Set this task's skip. */
        t->skip = (ci->ti_end_min > ti_end && cj->ti_end_min > ti_end);

        /* Too much particle movement? */
        if (t->tight &&
            (fmaxf(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin ||
             ci->dx_max > space_maxreldx * ci->h_max ||
             cj->dx_max > space_maxreldx * cj->h_max))
          return 1;

        /* Set the sort flags. */
        if (!t->skip && t->type == task_type_pair) {
          if (!(ci->sorted & (1 << t->flags))) {
            ci->sorts->flags |= (1 << t->flags);
            ci->sorts->skip = 0;
          }
          if (!(cj->sorted & (1 << t->flags))) {
            cj->sorts->flags |= (1 << t->flags);
            cj->sorts->skip = 0;
          }
        }

      }

      /* Kick? */
      else if (t->type == task_type_kick) {
        t->skip = (t->ci->ti_end_min > ti_end);
        t->ci->updated = 0;
      }

      /* Drift? */
      else if (t->type == task_type_drift)
        t->skip = 0;

      /* Init? */
      else if (t->type == task_type_init) {
        /* Set this task's skip. */
        t->skip = (t->ci->ti_end_min > ti_end);
      }

      /* None? */
      else if (t->type == task_type_none)
        t->skip = 1;
    }
  }

  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());

  /* All is well... */
  return 0;
}

/**
 * @brief Prints the number of tasks in the engine
 *
 * @param e The #engine.
 */

void engine_print_task_counts(struct engine *e) {

  struct scheduler *sched = &e->sched;

  /* Count and print the number of each task type. */
  int counts[task_type_count + 1];
  for (int k = 0; k <= task_type_count; k++) counts[k] = 0;
  for (int k = 0; k < sched->nr_tasks; k++)
    if (!sched->tasks[k].skip)
      counts[(int)sched->tasks[k].type] += 1;
    else
      counts[task_type_count] += 1;
#ifdef WITH_MPI
  printf("[%04i] %s engine_print_task_counts: task counts are [ %s=%i",
         e->nodeID, clocks_get_timesincestart(), taskID_names[0], counts[0]);
#else
  printf("%s engine_print_task_counts: task counts are [ %s=%i",
         clocks_get_timesincestart(), taskID_names[0], counts[0]);
#endif
  for (int k = 1; k < task_type_count; k++)
    printf(" %s=%i", taskID_names[k], counts[k]);
  printf(" skipped=%i ]\n", counts[task_type_count]);
  fflush(stdout);
  message("nr_parts = %zi.", e->s->nr_parts);
}

/**
 * @brief Rebuild the space and tasks.
 *
 * @param e The #engine.
 */

void engine_rebuild(struct engine *e) {

  const ticks tic = getticks();

  /* Clear the forcerebuild flag, whatever it was. */
  e->forcerebuild = 0;

  /* Re-build the space. */
  space_rebuild(e->s, 0.0, e->verbose);

/* If in parallel, exchange the cell structure. */
#ifdef WITH_MPI
  engine_exchange_cells(e);
#endif

  /* Re-build the tasks. */
  engine_maketasks(e);

  /* Run through the tasks and mark as skip or not. */
  if (engine_marktasks(e))
    error("engine_marktasks failed after space_rebuild.");

  /* Print the status of the system */
  engine_print_task_counts(e);

  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
}

/**
 * @brief Prepare the #engine by re-building the cells and tasks.
 *
 * @param e The #engine to prepare.
 */

void engine_prepare(struct engine *e) {

  TIMER_TIC;

  /* Run through the tasks and mark as skip or not. */
  int rebuild = (e->forcerebuild || engine_marktasks(e));

/* Collect the values of rebuild from all nodes. */
#ifdef WITH_MPI
  int buff;
  if (MPI_Allreduce(&rebuild, &buff, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD) !=
      MPI_SUCCESS)
    error("Failed to aggregate the rebuild flag across nodes.");
  rebuild = buff;
#endif
  e->tic_step = getticks();

  /* Did this not go through? */
  if (rebuild) {
    engine_rebuild(e);
  }

  /* Re-rank the tasks every now and then. */
  if (e->tasks_age % engine_tasksreweight == 1) {
    scheduler_reweight(&e->sched);
  }
  e->tasks_age += 1;

  TIMER_TOC(timer_prepare);

  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
}

/**
 * @brief Implements a barrier for the #runner threads.
 *
 * @param e The #engine.
 * @param tid The thread ID
 */

void engine_barrier(struct engine *e, int tid) {

  /* First, get the barrier mutex. */
  if (pthread_mutex_lock(&e->barrier_mutex) != 0)
    error("Failed to get barrier mutex.");

  /* This thread is no longer running. */
  e->barrier_running -= 1;
  /* If all threads are in, send a signal... */
  if (e->barrier_running == 0)
    if (pthread_cond_broadcast(&e->barrier_cond) != 0)
      error("Failed to broadcast barrier full condition.");

  /* Wait for the barrier to open. */
  while (e->barrier_launch == 0 || tid >= e->barrier_launchcount)
    if (pthread_cond_wait(&e->barrier_cond, &e->barrier_mutex) != 0)
      error("Error waiting for barrier to close.");

  /* This thread has been launched. */
  e->barrier_running += 1;
  e->barrier_launch -= 1;

  /* If I'm the last one out, signal the condition again. */
  if (e->barrier_launch == 0)
    if (pthread_cond_broadcast(&e->barrier_cond) != 0)
      error("Failed to broadcast empty barrier condition.");

  /* Last but not least, release the mutex. */
  if (pthread_mutex_unlock(&e->barrier_mutex) != 0)
    error("Failed to get unlock the barrier mutex.");
}

/**
 * @brief Mapping function to collect the data from the kick.
 */

void engine_collect_kick(struct cell *c) {

  /* Skip super-cells (Their values are already set) */
  if (c->kick != NULL) return;

  /* Counters for the different quantities. */
  int updated = 0;
  double e_kin = 0.0, e_int = 0.0, e_pot = 0.0;
  float mom[3] = {0.0f, 0.0f, 0.0f}, ang[3] = {0.0f, 0.0f, 0.0f};
  int ti_end_min = max_nr_timesteps, ti_end_max = 0;

  /* Only do something is the cell is non-empty */
  if (c->count != 0) {

    /* If this cell is not split, I'm in trouble. */
    if (!c->split) error("Cell has no super-cell.");

    /* Collect the values from the progeny. */
    for (int k = 0; k < 8; k++) {
      struct cell *cp = c->progeny[k];
      if (cp != NULL) {

        /* Recurse */
        engine_collect_kick(cp);

        /* And update */
        ti_end_min = min(ti_end_min, cp->ti_end_min);
        ti_end_max = max(ti_end_max, cp->ti_end_max);
        updated += cp->updated;
        e_kin += cp->e_kin;
        e_int += cp->e_int;
        e_pot += cp->e_pot;
        mom[0] += cp->mom[0];
        mom[1] += cp->mom[1];
        mom[2] += cp->mom[2];
        ang[0] += cp->ang[0];
        ang[1] += cp->ang[1];
        ang[2] += cp->ang[2];
      }
    }
  }
  /* Store the collected values in the cell. */
  c->ti_end_min = ti_end_min;
  c->ti_end_max = ti_end_max;
  c->updated = updated;
  c->e_kin = e_kin;
  c->e_int = e_int;
  c->e_pot = e_pot;
  c->mom[0] = mom[0];
  c->mom[1] = mom[1];
  c->mom[2] = mom[2];
  c->ang[0] = ang[0];
  c->ang[1] = ang[1];
  c->ang[2] = ang[2];
}

/**
 * @brief Launch the runners.
 *
 * @param e The #engine.
 * @param nr_runners The number of #runner to let loose.
 * @param mask The task mask to launch.
 * @param submask The sub-task mask to launch.
 */

void engine_launch(struct engine *e, int nr_runners, unsigned int mask,
                   unsigned int submask) {

  /* Prepare the scheduler. */
  atomic_inc(&e->sched.waiting);

  /* Cry havoc and let loose the dogs of war. */
  e->barrier_launch = nr_runners;
  e->barrier_launchcount = nr_runners;
  if (pthread_cond_broadcast(&e->barrier_cond) != 0)
    error("Failed to broadcast barrier open condition.");

  /* Load the tasks. */
  pthread_mutex_unlock(&e->barrier_mutex);
  scheduler_start(&e->sched, mask, submask);
  pthread_mutex_lock(&e->barrier_mutex);

  /* Remove the safeguard. */
  pthread_mutex_lock(&e->sched.sleep_mutex);
  atomic_dec(&e->sched.waiting);
  pthread_cond_broadcast(&e->sched.sleep_cond);
  pthread_mutex_unlock(&e->sched.sleep_mutex);

  /* Sit back and wait for the runners to come home. */
  while (e->barrier_launch || e->barrier_running)
    if (pthread_cond_wait(&e->barrier_cond, &e->barrier_mutex) != 0)
      error("Error while waiting for barrier.");
}

/**
 * @brief Initialises the particles and set them in a state ready to move
 *forward in time.
 *
 * @param e The #engine
 */
void engine_init_particles(struct engine *e) {

  struct space *s = e->s;

  if (e->nodeID == 0) message("Initialising particles");

  /* Make sure all particles are ready to go */
  /* i.e. clean-up any stupid state in the ICs */
  space_map_cells_pre(s, 1, cell_init_parts, NULL);

  engine_prepare(e);

  engine_marktasks(e);

  // printParticle(e->s->parts, 1000, e->s->nr_parts);
  // printParticle(e->s->parts, 515050, e->s->nr_parts);

  // message("\n0th DENSITY CALC\n");

  /* Build the masks corresponding to the policy */
  unsigned int mask = 0;
  unsigned int submask = 0;

  /* We always have sort tasks */
  mask |= 1 << task_type_sort;

  /* Add the tasks corresponding to hydro operations to the masks */
  if ((e->policy & engine_policy_hydro) == engine_policy_hydro) {

    mask |= 1 << task_type_init;
    mask |= 1 << task_type_self;
    mask |= 1 << task_type_pair;
    mask |= 1 << task_type_sub;
    mask |= 1 << task_type_ghost;

    submask |= 1 << task_subtype_density;
  }

  /* Add the tasks corresponding to self-gravity to the masks */
  if ((e->policy & engine_policy_self_gravity) == engine_policy_self_gravity) {

    /* Nothing here for now */
  }

  /* Add the tasks corresponding to self-gravity to the masks */
  if ((e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity) {

    /* Nothing here for now */
  }

  /* Add MPI tasks if need be */
  if ((e->policy & engine_policy_mpi) == engine_policy_mpi) {

    mask |= 1 << task_type_send;
    mask |= 1 << task_type_recv;
  }

  /* Now, launch the calculation */
  TIMER_TIC;
  engine_launch(e, e->nr_threads, mask, submask);
  TIMER_TOC(timer_runners);

  // message("\n0th ENTROPY CONVERSION\n")

  /* Apply some conversions (e.g. internal energy -> entropy) */
  space_map_cells_pre(s, 1, cell_convert_hydro, NULL);

  // printParticle(e->s->parts, e->s->xparts,1000, e->s->nr_parts);
  // printParticle(e->s->parts, e->s->xparts,515050, e->s->nr_parts);

  /* Ready to go */
  e->step = -1;
}

/**
 * @brief Let the #engine loose to compute the forces.
 *
 * @param e The #engine.
 */
void engine_step(struct engine *e) {

  int updates = 0;
  int ti_end_min = max_nr_timesteps, ti_end_max = 0;
  double e_pot = 0.0, e_int = 0.0, e_kin = 0.0;
  float mom[3] = {0.0, 0.0, 0.0};
  float ang[3] = {0.0, 0.0, 0.0};
  struct space *s = e->s;

  TIMER_TIC2;

  struct clocks_time time1, time2;
  clocks_gettime(&time1);

  /* Collect the cell data. */
  for (int k = 0; k < s->nr_cells; k++)
    if (s->cells[k].nodeID == e->nodeID) {
      struct cell *c = &s->cells[k];

      /* Recurse */
      engine_collect_kick(c);

      /* And aggregate */
      ti_end_min = min(ti_end_min, c->ti_end_min);
      ti_end_max = max(ti_end_max, c->ti_end_max);
      e_kin += c->e_kin;
      e_int += c->e_int;
      e_pot += c->e_pot;
      updates += c->updated;
      mom[0] += c->mom[0];
      mom[1] += c->mom[1];
      mom[2] += c->mom[2];
      ang[0] += c->ang[0];
      ang[1] += c->ang[1];
      ang[2] += c->ang[2];
    }

/* Aggregate the data from the different nodes. */
#ifdef WITH_MPI
  {
    int in_i[4], out_i[4];
    out_i[0] = ti_end_min;
    if (MPI_Allreduce(out_i, in_i, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD) !=
        MPI_SUCCESS)
      error("Failed to aggregate t_end_min.");
    ti_end_min = in_i[0];
    out_i[0] = ti_end_max;
    if (MPI_Allreduce(out_i, in_i, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD) !=
        MPI_SUCCESS)
      error("Failed to aggregate t_end_max.");
    ti_end_max = in_i[0];
  }
  {
    double in_d[4], out_d[4];
    out_d[0] = updates;
    out_d[1] = e_kin;
    out_d[2] = e_int;
    out_d[3] = e_pot;
    if (MPI_Allreduce(out_d, in_d, 4, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD) !=
        MPI_SUCCESS)
      error("Failed to aggregate energies.");
    updates = in_d[0];
    e_kin = in_d[1];
    e_int = in_d[2];
    e_pot = in_d[3];
  }
#endif

  // message("\nDRIFT\n");

  /* Move forward in time */
  e->ti_old = e->ti_current;
  e->ti_current = ti_end_min;
  e->step += 1;
  e->time = e->ti_current * e->timeBase + e->timeBegin;
  e->timeOld = e->ti_old * e->timeBase + e->timeBegin;
  e->timeStep = (e->ti_current - e->ti_old) * e->timeBase;

  /* Drift everybody */
  engine_launch(e, e->nr_threads, 1 << task_type_drift, 0);

  // printParticle(e->s->parts, e->s->xparts, 1000, e->s->nr_parts);
  // printParticle(e->s->parts, e->s->xparts, 515050, e->s->nr_parts);

  // if(e->step == 2)   exit(0);

  if (e->nodeID == 0) {

    /* Print some information to the screen */
    printf("%d %e %e %d %.3f\n", e->step, e->time, e->timeStep, updates,
           e->wallclock_time);
    fflush(stdout);

    /* Write some energy statistics */
    fprintf(e->file_stats, "%d %f %f %f %f %f %f %f %f %f %f %f\n", e->step,
            e->time, e_kin, e_int, e_pot, e_kin + e_int + e_pot, mom[0], mom[1],
            mom[2], ang[0], ang[1], ang[2]);
    fflush(e->file_stats);
  }

  // message("\nACCELERATION AND KICK\n");

  /* Re-distribute the particles amongst the nodes? */
  if (e->forcerepart != REPART_NONE) engine_repartition(e);

  /* Prepare the space. */
  engine_prepare(e);

  /* Build the masks corresponding to the policy */
  unsigned int mask = 0, submask = 0;

  /* We always have sort tasks and kick tasks */
  mask |= 1 << task_type_sort;
  mask |= 1 << task_type_kick;

  /* Add the tasks corresponding to hydro operations to the masks */
  if ((e->policy & engine_policy_hydro) == engine_policy_hydro) {

    mask |= 1 << task_type_init;
    mask |= 1 << task_type_self;
    mask |= 1 << task_type_pair;
    mask |= 1 << task_type_sub;
    mask |= 1 << task_type_ghost;

    submask |= 1 << task_subtype_density;
    submask |= 1 << task_subtype_force;
  }

  /* Add the tasks corresponding to self-gravity to the masks */
  if ((e->policy & engine_policy_self_gravity) == engine_policy_self_gravity) {

    /* Nothing here for now */
  }

  /* Add the tasks corresponding to self-gravity to the masks */
  if ((e->policy & engine_policy_external_gravity) ==
      engine_policy_external_gravity) {

    /* Nothing here for now */
  }
  /* Add MPI tasks if need be */
  if ((e->policy & engine_policy_mpi) == engine_policy_mpi) {

    mask |= 1 << task_type_send;
    mask |= 1 << task_type_recv;
  }

  /* Send off the runners. */
  TIMER_TIC;
  engine_launch(e, e->nr_threads, mask, submask);
  TIMER_TOC(timer_runners);

  TIMER_TOC2(timer_step);

  clocks_gettime(&time2);

  e->wallclock_time = (float)clocks_diff(&time1, &time2);
  // printParticle(e->s->parts, e->s->xparts,1000, e->s->nr_parts);
  // printParticle(e->s->parts, e->s->xparts,515050, e->s->nr_parts);
}

/**
 * @brief Returns 1 if the simulation has reached its end point, 0 otherwise
 */
int engine_is_done(struct engine *e) {
  return !(e->ti_current < max_nr_timesteps);
}

/**
 * @brief Create and fill the proxies.
 *
 * @param e The #engine.
 */

void engine_makeproxies(struct engine *e) {

#ifdef WITH_MPI
  const int *cdim = e->s->cdim;
  const struct space *s = e->s;
  struct cell *cells = s->cells;
  struct proxy *proxies = e->proxies;
  ticks tic = getticks();

  /* Prepare the proxies and the proxy index. */
  if (e->proxy_ind == NULL)
    if ((e->proxy_ind = (int *)malloc(sizeof(int) * e->nr_nodes)) == NULL)
      error("Failed to allocate proxy index.");
  for (int k = 0; k < e->nr_nodes; k++) e->proxy_ind[k] = -1;
  e->nr_proxies = 0;

  /* The following loop is super-clunky, but it's necessary
     to ensure that the order of the send and recv cells in
     the proxies is identical for all nodes! */

  /* Loop over each cell in the space. */
  int ind[3];
  for (ind[0] = 0; ind[0] < cdim[0]; ind[0]++)
    for (ind[1] = 0; ind[1] < cdim[1]; ind[1]++)
      for (ind[2] = 0; ind[2] < cdim[2]; ind[2]++) {

        /* Get the cell ID. */
        const int cid = cell_getid(cdim, ind[0], ind[1], ind[2]);

        /* Loop over all its neighbours (periodic). */
        for (int i = -1; i <= 1; i++) {
          int ii = ind[0] + i;
          if (ii >= cdim[0])
            ii -= cdim[0];
          else if (ii < 0)
            ii += cdim[0];
          for (int j = -1; j <= 1; j++) {
            int jj = ind[1] + j;
            if (jj >= cdim[1])
              jj -= cdim[1];
            else if (jj < 0)
              jj += cdim[1];
            for (int k = -1; k <= 1; k++) {
              int kk = ind[2] + k;
              if (kk >= cdim[2])
                kk -= cdim[2];
              else if (kk < 0)
                kk += cdim[2];

              /* Get the cell ID. */
              const int cjd = cell_getid(cdim, ii, jj, kk);

              /* Add to proxies? */
              if (cells[cid].nodeID == e->nodeID &&
                  cells[cjd].nodeID != e->nodeID) {
                int pid = e->proxy_ind[cells[cjd].nodeID];
                if (pid < 0) {
                  if (e->nr_proxies == engine_maxproxies)
                    error("Maximum number of proxies exceeded.");
                  proxy_init(&proxies[e->nr_proxies], e->nodeID,
                             cells[cjd].nodeID);
                  e->proxy_ind[cells[cjd].nodeID] = e->nr_proxies;
                  pid = e->nr_proxies;
                  e->nr_proxies += 1;
                }
                proxy_addcell_in(&proxies[pid], &cells[cjd]);
                proxy_addcell_out(&proxies[pid], &cells[cid]);
                cells[cid].sendto |= (1ULL << pid);
              }

              if (cells[cjd].nodeID == e->nodeID &&
                  cells[cid].nodeID != e->nodeID) {
                int pid = e->proxy_ind[cells[cid].nodeID];
                if (pid < 0) {
                  if (e->nr_proxies == engine_maxproxies)
                    error("Maximum number of proxies exceeded.");
                  proxy_init(&proxies[e->nr_proxies], e->nodeID,
                             cells[cid].nodeID);
                  e->proxy_ind[cells[cid].nodeID] = e->nr_proxies;
                  pid = e->nr_proxies;
                  e->nr_proxies += 1;
                }
                proxy_addcell_in(&proxies[pid], &cells[cid]);
                proxy_addcell_out(&proxies[pid], &cells[cjd]);
                cells[cjd].sendto |= (1ULL << pid);
              }
            }
          }
        }
      }

  if (e->verbose)
    message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
#else
  error("SWIFT was not compiled with MPI support.");
#endif
}

/**
 * @brief Split the underlying space into regions and assign to separate nodes.
 *
 * @param e The #engine.
 * @param initial_partition structure defining the cell partition technique
 */
void engine_split(struct engine *e, struct partition *initial_partition) {

#ifdef WITH_MPI
  struct space *s = e->s;

  /* Do the initial partition of the cells. */
  partition_initial_partition(initial_partition, e->nodeID, e->nr_nodes, s);

  /* Make the proxies. */
  engine_makeproxies(e);

  /* Re-allocate the local parts. */
  if (e->nodeID == 0)
    message("Re-allocating parts array from %zi to %zi.", s->size_parts,
            (size_t)(s->nr_parts * 1.2));
  s->size_parts = s->nr_parts * 1.2;
  struct part *parts_new = NULL;
  struct xpart *xparts_new = NULL;
  if (posix_memalign((void **)&parts_new, part_align,
                     sizeof(struct part) * s->size_parts) != 0 ||
      posix_memalign((void **)&xparts_new, part_align,
                     sizeof(struct xpart) * s->size_parts) != 0)
    error("Failed to allocate new part data.");
  memcpy(parts_new, s->parts, sizeof(struct part) * s->nr_parts);
  memcpy(xparts_new, s->xparts, sizeof(struct xpart) * s->nr_parts);
  free(s->parts);
  free(s->xparts);
  s->parts = parts_new;
  s->xparts = xparts_new;
#else
  error("SWIFT was not compiled with MPI support.");
#endif
}

#if defined(HAVE_LIBNUMA) && defined(_GNU_SOURCE)
static bool hyperthreads_present(void) {
#ifdef __linux__
  FILE *f =
      fopen("/sys/devices/system/cpu/cpu0/topology/thread_siblings_list", "r");

  int c;
  while ((c = fgetc(f)) != EOF && c != ',')
    ;
  fclose(f);

  return c == ',';
#else
  return true;  // just guess
#endif
}
#endif

/**
 * @brief init an engine with the given number of threads, queues, and
 *      the given policy.
 *
 * @param e The #engine.
 * @param s The #space in which this #runner will run.
 * @param dt The initial time step to use.
 * @param nr_threads The number of threads to spawn.
 * @param nr_queues The number of task queues to create.
 * @param nr_nodes The number of MPI ranks.
 * @param nodeID The MPI rank of this node.
 * @param policy The queuing policy to use.
 * @param timeBegin Time at the begininning of the simulation.
 * @param timeEnd Time at the end of the simulation.
 * @param dt_min Minimal allowed timestep (unsed with fixdt policy)
 * @param dt_max Maximal allowed timestep
 * @param verbose Is this #engine talkative ?
 */

void engine_init(struct engine *e, struct space *s, float dt, int nr_threads,
                 int nr_queues, int nr_nodes, int nodeID, int policy,
                 float timeBegin, float timeEnd, float dt_min, float dt_max,
                 int verbose) {

  /* Store the values. */
  e->s = s;
  e->nr_threads = nr_threads;
  e->policy = policy;
  e->step = 0;
  e->nullstep = 0;
  e->nr_nodes = nr_nodes;
  e->nodeID = nodeID;
  e->proxy_ind = NULL;
  e->nr_proxies = 0;
  e->forcerebuild = 1;
  e->forcerepart = REPART_NONE;
  e->links = NULL;
  e->nr_links = 0;
  e->timeBegin = timeBegin;
  e->timeEnd = timeEnd;
  e->timeOld = timeBegin;
  e->time = timeBegin;
  e->ti_old = 0;
  e->ti_current = 0;
  e->timeStep = 0.;
  e->dt_min = dt_min;
  e->dt_max = dt_max;
  e->file_stats = NULL;
  e->verbose = verbose;
  e->wallclock_time = 0.f;
  engine_rank = nodeID;

  /* Make the space link back to the engine. */
  s->e = e;

#if defined(HAVE_SETAFFINITY)
  const int nr_cores = sysconf(_SC_NPROCESSORS_ONLN);
  int cpuid[nr_cores];
  cpu_set_t cpuset;
  if ((policy & engine_policy_cputight) == engine_policy_cputight) {
    for (int k = 0; k < nr_cores; k++) cpuid[k] = k;
  } else {
    /*  Get next highest power of 2. */
    int maxint = 1;
    while (maxint < nr_cores) maxint *= 2;

    cpuid[0] = 0;
    int k = 1;
    for (int i = 1; i < maxint; i *= 2)
      for (int j = maxint / i / 2; j < maxint; j += maxint / i)
        if (j < nr_cores && j != 0) cpuid[k++] = j;

#if defined(HAVE_LIBNUMA) && defined(_GNU_SOURCE)
    /* Ascending NUMA distance. Bubblesort(!) for stable equidistant CPUs. */
    if (numa_available() >= 0) {
      if (nodeID == 0) message("prefer NUMA-local CPUs");

      const int home = numa_node_of_cpu(sched_getcpu());
      const int half = nr_cores / 2;
      const bool swap_hyperthreads = hyperthreads_present();
      bool done = false;
      if (swap_hyperthreads && nodeID == 0)
        message("prefer physical cores to hyperthreads");

      while (!done) {
        done = true;
        for (int i = 1; i < nr_cores; i++) {
          const int node_a = numa_node_of_cpu(cpuid[i - 1]);
          const int node_b = numa_node_of_cpu(cpuid[i]);

          /* Avoid using local hyperthreads over unused remote physical cores.
           * Assume two hyperthreads, and that cpuid >= half partitions them.
           */
          const int thread_a = swap_hyperthreads && cpuid[i - 1] >= half;
          const int thread_b = swap_hyperthreads && cpuid[i] >= half;

          bool swap = thread_a > thread_b;
          if (thread_a == thread_b)
            swap = numa_distance(home, node_a) > numa_distance(home, node_b);

          if (swap) {
            const int t = cpuid[i - 1];
            cpuid[i - 1] = cpuid[i];
            cpuid[i] = t;
            done = false;
          }
        }
      }
    }
#endif

    if (nodeID == 0) {
#ifdef WITH_MPI
      printf("[%04i] %s engine_init: cpu map is [ ", nodeID,
             clocks_get_timesincestart());
#else
      printf("%s engine_init: cpu map is [ ", clocks_get_timesincestart());
#endif
      for (int i = 0; i < nr_cores; i++) printf("%i ", cpuid[i]);
      printf("].\n");
    }
  }
#endif

  /* Are we doing stuff in parallel? */
  if (nr_nodes > 1) {
#ifndef WITH_MPI
    error("SWIFT was not compiled with MPI support.");
#else
    e->policy |= engine_policy_mpi;
    if ((e->proxies = (struct proxy *)malloc(sizeof(struct proxy) *
                                             engine_maxproxies)) == NULL)
      error("Failed to allocate memory for proxies.");
    bzero(e->proxies, sizeof(struct proxy) * engine_maxproxies);
    e->nr_proxies = 0;
#endif
  }

  /* Open some files */
  if (e->nodeID == 0) {
    e->file_stats = fopen("energy.txt", "w");
    fprintf(e->file_stats,
            "# Step Time E_kin E_int E_pot E_tot "
            "p_x p_y p_z ang_x ang_y ang_z\n");
  }

  /* Print policy */
  engine_print_policy(e);

  /* Print information about the hydro scheme */
  if (e->nodeID == 0) message("Hydrodynamic scheme: %s", SPH_IMPLEMENTATION);

  /* Check we have sensible time bounds */
  if (timeBegin >= timeEnd)
    error(
        "Final simulation time (t_end = %e) must be larger than the start time "
        "(t_beg = %e)",
        timeEnd, timeBegin);

  /* Check we have sensible time step bounds */
  if (e->dt_min > e->dt_max)
    error(
        "Minimal time step size must be smaller than maximal time step size ");

  /* Deal with timestep */
  e->timeBase = (timeEnd - timeBegin) / max_nr_timesteps;
  e->ti_current = 0;

  /* Fixed time-step case */
  if ((e->policy & engine_policy_fixdt) == engine_policy_fixdt) {
    e->dt_min = e->dt_max;

    /* Find timestep on the timeline */
    int dti_timeline = max_nr_timesteps;
    while (e->dt_min < dti_timeline * e->timeBase) dti_timeline /= 2;

    e->dt_min = e->dt_max = dti_timeline * e->timeBase;

    if (e->nodeID == 0) message("Timestep set to %e", e->dt_max);
  } else {

    if (e->nodeID == 0) {
      message("Absolute minimal timestep size: %e", e->timeBase);

      float dt_min = timeEnd - timeBegin;
      while (dt_min > e->dt_min) dt_min /= 2.f;

      message("Minimal timestep size (on time-line): %e", dt_min);

      float dt_max = timeEnd - timeBegin;
      while (dt_max > e->dt_max) dt_max /= 2.f;

      message("Maximal timestep size (on time-line): %e", dt_max);
    }
  }

  if (e->dt_min < e->timeBase && e->nodeID == 0)
    error(
        "Minimal time-step size smaller than the absolute possible minimum "
        "dt=%e",
        e->timeBase);

  if (e->dt_max > (e->timeEnd - e->timeBegin) && e->nodeID == 0)
    error("Maximal time-step size larger than the simulation run time t=%e",
          e->timeEnd - e->timeBegin);

/* Construct types for MPI communications */
#ifdef WITH_MPI
  part_create_mpi_type(&e->part_mpi_type);
  xpart_create_mpi_type(&e->xpart_mpi_type);
  gpart_create_mpi_type(&e->gpart_mpi_type);
#endif

  /* First of all, init the barrier and lock it. */
  if (pthread_mutex_init(&e->barrier_mutex, NULL) != 0)
    error("Failed to initialize barrier mutex.");
  if (pthread_cond_init(&e->barrier_cond, NULL) != 0)
    error("Failed to initialize barrier condition variable.");
  if (pthread_mutex_lock(&e->barrier_mutex) != 0)
    error("Failed to lock barrier mutex.");
  e->barrier_running = 0;
  e->barrier_launch = 0;
  e->barrier_launchcount = 0;

  /* Init the scheduler with enough tasks for the initial sorting tasks. */
  int nr_tasks = 2 * s->tot_cells + e->nr_threads;
  scheduler_init(&e->sched, e->s, nr_tasks, nr_queues, scheduler_flag_steal,
                 e->nodeID);
  s->nr_queues = nr_queues;

  /* Create the sorting tasks. */
  for (int i = 0; i < e->nr_threads; i++)
    scheduler_addtask(&e->sched, task_type_psort, task_subtype_none, i, 0, NULL,
                      NULL, 0);

  scheduler_ranktasks(&e->sched);

  /* Allocate and init the threads. */
  if ((e->runners =
           (struct runner *)malloc(sizeof(struct runner) * nr_threads)) == NULL)
    error("Failed to allocate threads array.");
  for (int k = 0; k < nr_threads; k++) {
    e->runners[k].id = k;
    e->runners[k].e = e;
    e->barrier_running += 1;
    if (pthread_create(&e->runners[k].thread, NULL, &runner_main,
                       &e->runners[k]) != 0)
      error("Failed to create runner thread.");
    if ((e->policy & engine_policy_setaffinity) == engine_policy_setaffinity) {
#if defined(HAVE_SETAFFINITY)

      /* Set a reasonable queue ID. */
      e->runners[k].cpuid = cpuid[k % nr_cores];
      if (nr_queues < nr_threads)
        e->runners[k].qid = cpuid[k % nr_cores] * nr_queues / nr_cores;
      else
        e->runners[k].qid = k;

      /* Set the cpu mask to zero | e->id. */
      CPU_ZERO(&cpuset);
      CPU_SET(cpuid[k % nr_cores], &cpuset);

      /* Apply this mask to the runner's pthread. */
      if (pthread_setaffinity_np(e->runners[k].thread, sizeof(cpu_set_t),
                                 &cpuset) != 0)
        error("Failed to set thread affinity.");

#else
      error("SWIFT was not compiled with affinity enabled.");
#endif
    } else {
      e->runners[k].cpuid = k;
      e->runners[k].qid = k * nr_queues / nr_threads;
    }
    // message( "runner %i on cpuid=%i with qid=%i." , e->runners[k].id ,
    // e->runners[k].cpuid , e->runners[k].qid );
  }

  /* Wait for the runner threads to be in place. */
  while (e->barrier_running || e->barrier_launch)
    if (pthread_cond_wait(&e->barrier_cond, &e->barrier_mutex) != 0)
      error("Error while waiting for runner threads to get in place.");
}

/**
 * @brief Prints the current policy of an engine
 *
 * @param e The engine to print information about
 */
void engine_print_policy(struct engine *e) {

#ifdef WITH_MPI
  if (e->nodeID == 0) {
    printf("[0000] %s engine_policy: engine policies are [ ",
           clocks_get_timesincestart());
    for (int k = 1; k < 32; k++)
      if (e->policy & (1 << k)) printf(" %s ", engine_policy_names[k + 1]);
    printf(" ]\n");
    fflush(stdout);
  }
#else
  printf("%s engine_policy: engine policies are [ ",
         clocks_get_timesincestart());
  for (int k = 1; k < 32; k++)
    if (e->policy & (1 << k)) printf(" %s ", engine_policy_names[k + 1]);
  printf(" ]\n");
  fflush(stdout);
#endif
}