/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
* Matthieu Schaller (schaller@strw.leidenuniv.nl)
* 2015 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see .
*
******************************************************************************/
/* Config parameters. */
#include
/* This object's header. */
#include "engine.h"
/* Local headers. */
#include "memswap.h"
#ifdef WITH_MPI
/**
* Do the exchange of one type of particles with all the other nodes.
*
* @param label a label for the memory allocations of this particle type.
* @param counts 2D array with the counts of particles to exchange with
* each other node.
* @param parts the particle data to exchange
* @param new_nr_parts the number of particles this node will have after all
* exchanges have completed.
* @param sizeofparts sizeof the particle struct.
* @param alignsize the memory alignment required for this particle type.
* @param mpi_type the MPI_Datatype for these particles.
* @param nr_nodes the number of nodes to exchange with.
* @param nodeID the id of this node.
* @param syncredist whether to use slower more memory friendly synchronous
* exchanges.
*
* @result new particle data constructed from all the exchanges with the
* given alignment.
*/
static void *engine_do_redistribute(const char *label, int *counts, char *parts,
size_t new_nr_parts, size_t sizeofparts,
size_t alignsize, MPI_Datatype mpi_type,
int nr_nodes, int nodeID, int syncredist) {
/* Allocate a new particle array with some extra margin */
char *parts_new = NULL;
if (swift_memalign(
label, (void **)&parts_new, alignsize,
sizeofparts * new_nr_parts * engine_redistribute_alloc_margin) != 0)
error("Failed to allocate new particle data.");
if (syncredist) {
/* Slow synchronous redistribute,. */
size_t offset_send = 0, offset_recv = 0;
/* Only send and receive only "chunk" particles per request.
* Fixing the message size to 2GB. */
const int chunk = INT_MAX / sizeofparts;
int res = 0;
for (int k = 0; k < nr_nodes; k++) {
int kk = k;
/* Rank 0 decides the index of sending node */
MPI_Bcast(&kk, 1, MPI_INT, 0, MPI_COMM_WORLD);
int ind_recv = kk * nr_nodes + nodeID;
if (nodeID == kk) {
/* Send out our particles. */
offset_send = 0;
for (int j = 0; j < nr_nodes; j++) {
int ind_send = kk * nr_nodes + j;
/* Just copy our own parts */
if (counts[ind_send] > 0) {
if (j == nodeID) {
memcpy(&parts_new[offset_recv * sizeofparts],
&parts[offset_send * sizeofparts],
sizeofparts * counts[ind_recv]);
offset_send += counts[ind_send];
offset_recv += counts[ind_recv];
} else {
for (int i = 0, n = 0; i < counts[ind_send]; n++) {
/* Count and index, with chunk parts at most. */
size_t sendc = min(chunk, counts[ind_send] - i);
size_t sendo = offset_send + i;
res = MPI_Send(&parts[sendo * sizeofparts], sendc, mpi_type, j,
n, MPI_COMM_WORLD);
if (res != MPI_SUCCESS) {
mpi_error(res, "Failed to send parts to node %i from %i.", j,
nodeID);
}
i += sendc;
}
offset_send += counts[ind_send];
}
}
}
} else {
/* Listen for sends from kk. */
if (counts[ind_recv] > 0) {
for (int i = 0, n = 0; i < counts[ind_recv]; n++) {
/* Count and index, with +chunk parts at most. */
size_t recvc = min(chunk, counts[ind_recv] - i);
size_t recvo = offset_recv + i;
MPI_Status status;
res = MPI_Recv(&parts_new[recvo * sizeofparts], recvc, mpi_type, kk,
n, MPI_COMM_WORLD, &status);
if (res != MPI_SUCCESS) {
mpi_error(res, "Failed to recv of parts from node %i to %i.", kk,
nodeID);
}
i += recvc;
}
offset_recv += counts[ind_recv];
}
}
}
} else {
/* Asynchronous redistribute, can take a lot of memory. */
/* Prepare MPI requests for the asynchronous communications */
MPI_Request *reqs;
if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 2 * nr_nodes)) ==
NULL)
error("Failed to allocate MPI request list.");
/* Only send and receive only "chunk" particles per request. So we need to
* loop as many times as necessary here. Make 2Gb/sizeofparts so we only
* send 2Gb packets. */
const int chunk = INT_MAX / sizeofparts;
int sent = 0;
int recvd = 0;
int activenodes = 1;
while (activenodes) {
for (int k = 0; k < 2 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;
/* Emit the sends and recvs for the data. */
size_t offset_send = sent;
size_t offset_recv = recvd;
activenodes = 0;
for (int k = 0; k < nr_nodes; k++) {
/* Indices in the count arrays of the node of interest */
const int ind_send = nodeID * nr_nodes + k;
const int ind_recv = k * nr_nodes + nodeID;
/* Are we sending any data this loop? */
int sending = counts[ind_send] - sent;
if (sending > 0) {
activenodes++;
if (sending > chunk) sending = chunk;
/* If the send and receive is local then just copy. */
if (k == nodeID) {
int receiving = counts[ind_recv] - recvd;
if (receiving > chunk) receiving = chunk;
memcpy(&parts_new[offset_recv * sizeofparts],
&parts[offset_send * sizeofparts], sizeofparts * receiving);
} else {
/* Otherwise send it. */
int res =
MPI_Isend(&parts[offset_send * sizeofparts], sending, mpi_type,
k, ind_send, MPI_COMM_WORLD, &reqs[2 * k + 0]);
if (res != MPI_SUCCESS)
mpi_error(res, "Failed to isend parts to node %i.", k);
}
}
/* If we're sending to this node, then move past it to next. */
if (counts[ind_send] > 0) offset_send += counts[ind_send];
/* Are we receiving any data from this node? Note already done if coming
* from this node. */
if (k != nodeID) {
int receiving = counts[ind_recv] - recvd;
if (receiving > 0) {
activenodes++;
if (receiving > chunk) receiving = chunk;
int res = MPI_Irecv(&parts_new[offset_recv * sizeofparts],
receiving, mpi_type, k, ind_recv,
MPI_COMM_WORLD, &reqs[2 * k + 1]);
if (res != MPI_SUCCESS)
mpi_error(res, "Failed to emit irecv of parts from node %i.", k);
}
}
/* If we're receiving from this node, then move past it to next. */
if (counts[ind_recv] > 0) offset_recv += counts[ind_recv];
}
/* Wait for all the sends and recvs to tumble in. */
MPI_Status stats[2 * nr_nodes];
int res;
if ((res = MPI_Waitall(2 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
for (int k = 0; k < 2 * nr_nodes; k++) {
char buff[MPI_MAX_ERROR_STRING];
MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
message("request from source %i, tag %i has error '%s'.",
stats[k].MPI_SOURCE, stats[k].MPI_TAG, buff);
}
error("Failed during waitall for part data.");
}
/* Move to next chunks. */
sent += chunk;
recvd += chunk;
}
/* Free temps. */
free(reqs);
}
/* And return new memory. */
return parts_new;
}
#endif
#ifdef WITH_MPI /* redist_mapper */
/* Support for engine_redistribute threadpool dest mappers. */
struct redist_mapper_data {
int *counts;
int *dest;
int nodeID;
int nr_nodes;
struct cell *cells;
struct space *s;
void *base;
};
/* Generic function for accumulating counts for TYPE parts. Note
* we use a local counts array to avoid the atomic_add in the parts
* loop. */
#define ENGINE_REDISTRIBUTE_DEST_MAPPER(TYPE) \
engine_redistribute_dest_mapper_##TYPE(void *map_data, int num_elements, \
void *extra_data) { \
struct TYPE *parts = (struct TYPE *)map_data; \
struct redist_mapper_data *mydata = \
(struct redist_mapper_data *)extra_data; \
struct space *s = mydata->s; \
int *dest = \
mydata->dest + (ptrdiff_t)(parts - (struct TYPE *)mydata->base); \
int *lcounts = NULL; \
if ((lcounts = (int *)calloc(mydata->nr_nodes * mydata->nr_nodes, \
sizeof(int))) == NULL) \
error("Failed to allocate counts thread-specific buffer"); \
for (int k = 0; k < num_elements; k++) { \
for (int j = 0; j < 3; j++) { \
if (parts[k].x[j] < 0.0) \
parts[k].x[j] += s->dim[j]; \
else if (parts[k].x[j] >= s->dim[j]) \
parts[k].x[j] -= s->dim[j]; \
if (parts[k].x[j] == s->dim[j]) parts[k].x[j] = 0.0; \
} \
const int cid = cell_getid(s->cdim, parts[k].x[0] * s->iwidth[0], \
parts[k].x[1] * s->iwidth[1], \
parts[k].x[2] * s->iwidth[2]); \
dest[k] = s->cells_top[cid].nodeID; \
size_t ind = mydata->nodeID * mydata->nr_nodes + dest[k]; \
lcounts[ind] += 1; \
} \
for (int k = 0; k < (mydata->nr_nodes * mydata->nr_nodes); k++) \
atomic_add(&mydata->counts[k], lcounts[k]); \
free(lcounts); \
}
/**
* @brief Accumulate the counts of particles per cell.
* Threadpool helper for accumulating the counts of particles per cell.
*
* part version.
*/
void ENGINE_REDISTRIBUTE_DEST_MAPPER(part);
/**
* @brief Accumulate the counts of star particles per cell.
* Threadpool helper for accumulating the counts of particles per cell.
*
* spart version.
*/
void ENGINE_REDISTRIBUTE_DEST_MAPPER(spart);
/**
* @brief Accumulate the counts of gravity particles per cell.
* Threadpool helper for accumulating the counts of particles per cell.
*
* gpart version.
*/
void ENGINE_REDISTRIBUTE_DEST_MAPPER(gpart);
/**
* @brief Accumulate the counts of black holes particles per cell.
* Threadpool helper for accumulating the counts of particles per cell.
*
* bpart version.
*/
void ENGINE_REDISTRIBUTE_DEST_MAPPER(bpart);
/**
* @brief Accumulate the counts of sink particles per cell.
* Threadpool helper for accumulating the counts of particles per cell.
*
* sink version.
*/
void ENGINE_REDISTRIBUTE_DEST_MAPPER(sink);
#endif /* redist_mapper_data */
#ifdef WITH_MPI /* savelink_mapper_data */
/* Support for saving the linkage between gparts and parts/sparts. */
struct savelink_mapper_data {
int nr_nodes;
int *counts;
void *parts;
int nodeID;
};
/**
* @brief Save the offset of each gravity partner of a part or spart.
*
* The offset is from the start of the sorted particles to be sent to a node.
* This is possible as parts without gravity partners have a positive id.
* These offsets are used to restore the pointers on the receiving node.
*
* CHECKS should be eliminated as dead code when optimizing.
*/
#define ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(TYPE, CHECKS) \
engine_redistribute_savelink_mapper_##TYPE(void *map_data, int num_elements, \
void *extra_data) { \
int *nodes = (int *)map_data; \
struct savelink_mapper_data *mydata = \
(struct savelink_mapper_data *)extra_data; \
int nodeID = mydata->nodeID; \
int nr_nodes = mydata->nr_nodes; \
int *counts = mydata->counts; \
struct TYPE *parts = (struct TYPE *)mydata->parts; \
\
for (int j = 0; j < num_elements; j++) { \
int node = nodes[j]; \
int count = 0; \
size_t offset = 0; \
for (int i = 0; i < node; i++) offset += counts[nodeID * nr_nodes + i]; \
\
for (int k = 0; k < counts[nodeID * nr_nodes + node]; k++) { \
if (parts[k + offset].gpart != NULL) { \
if (CHECKS) \
if (parts[k + offset].gpart->id_or_neg_offset > 0) \
error("Trying to link a partnerless " #TYPE "!"); \
parts[k + offset].gpart->id_or_neg_offset = -count; \
count++; \
} \
} \
} \
}
/**
* @brief Save position of part-gpart links.
* Threadpool helper for accumulating the counts of particles per cell.
*/
#ifdef SWIFT_DEBUG_CHECKS
void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 1);
#else
void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 0);
#endif
/**
* @brief Save position of spart-gpart links.
* Threadpool helper for accumulating the counts of particles per cell.
*/
#ifdef SWIFT_DEBUG_CHECKS
void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 1);
#else
void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 0);
#endif
/**
* @brief Save position of bpart-gpart links.
* Threadpool helper for accumulating the counts of particles per cell.
*/
#ifdef SWIFT_DEBUG_CHECKS
void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(bpart, 1);
#else
void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(bpart, 0);
#endif
/**
* @brief Save position of sink-gpart links.
* Threadpool helper for accumulating the counts of particles per cell.
*/
#ifdef SWIFT_DEBUG_CHECKS
void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(sink, 1);
#else
void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(sink, 0);
#endif
#endif /* savelink_mapper_data */
#ifdef WITH_MPI /* relink_mapper_data */
/* Support for relinking parts, gparts, sparts, bparts and sinks after moving
* between nodes. */
struct relink_mapper_data {
int nodeID;
int nr_nodes;
int *counts;
int *s_counts;
int *g_counts;
int *b_counts;
int *sink_counts;
struct space *s;
};
/**
* @brief Restore the part/gpart and spart/gpart links for a list of nodes.
*
* @param map_data address of nodes to process.
* @param num_elements the number nodes to process.
* @param extra_data additional data defining the context (a
* relink_mapper_data).
*/
void engine_redistribute_relink_mapper(void *map_data, int num_elements,
void *extra_data) {
int *nodes = (int *)map_data;
struct relink_mapper_data *mydata = (struct relink_mapper_data *)extra_data;
int nodeID = mydata->nodeID;
int nr_nodes = mydata->nr_nodes;
int *counts = mydata->counts;
int *g_counts = mydata->g_counts;
int *s_counts = mydata->s_counts;
int *b_counts = mydata->b_counts;
int *sink_counts = mydata->sink_counts;
struct space *s = mydata->s;
for (int i = 0; i < num_elements; i++) {
int node = nodes[i];
/* Get offsets to correct parts of the counts arrays for this node. */
size_t offset_parts = 0;
size_t offset_gparts = 0;
size_t offset_sparts = 0;
size_t offset_bparts = 0;
size_t offset_sinks = 0;
for (int n = 0; n < node; n++) {
int ind_recv = n * nr_nodes + nodeID;
offset_parts += counts[ind_recv];
offset_gparts += g_counts[ind_recv];
offset_sparts += s_counts[ind_recv];
offset_bparts += b_counts[ind_recv];
offset_sinks += sink_counts[ind_recv];
}
/* Number of gparts sent from this node. */
int ind_recv = node * nr_nodes + nodeID;
const size_t count_gparts = g_counts[ind_recv];
/* Loop over the gparts received from this node */
for (size_t k = offset_gparts; k < offset_gparts + count_gparts; k++) {
/* Does this gpart have a gas partner ? */
if (s->gparts[k].type == swift_type_gas) {
const ptrdiff_t partner_index =
offset_parts - s->gparts[k].id_or_neg_offset;
/* Re-link */
s->gparts[k].id_or_neg_offset = -partner_index;
s->parts[partner_index].gpart = &s->gparts[k];
}
/* Does this gpart have a star partner ? */
else if (s->gparts[k].type == swift_type_stars) {
const ptrdiff_t partner_index =
offset_sparts - s->gparts[k].id_or_neg_offset;
/* Re-link */
s->gparts[k].id_or_neg_offset = -partner_index;
s->sparts[partner_index].gpart = &s->gparts[k];
}
/* Does this gpart have a black hole partner ? */
else if (s->gparts[k].type == swift_type_black_hole) {
const ptrdiff_t partner_index =
offset_bparts - s->gparts[k].id_or_neg_offset;
/* Re-link */
s->gparts[k].id_or_neg_offset = -partner_index;
s->bparts[partner_index].gpart = &s->gparts[k];
}
/* Does this gpart have a sink partner ? */
else if (s->gparts[k].type == swift_type_sink) {
const ptrdiff_t partner_index =
offset_sinks - s->gparts[k].id_or_neg_offset;
/* Re-link */
s->gparts[k].id_or_neg_offset = -partner_index;
s->sinks[partner_index].gpart = &s->gparts[k];
}
}
}
}
#endif /* relink_mapper_data */
/**
* @brief Redistribute the particles amongst the nodes according
* to their cell's node IDs.
*
* The strategy here is as follows:
* 1) Each node counts the number of particles it has to send to each other
* node.
* 2) The number of particles of each type is then exchanged.
* 3) The particles to send are placed in a temporary buffer in which the
* part-gpart links are preserved.
* 4) Each node allocates enough space for the new particles.
* 5) Asynchronous or synchronous communications are issued to transfer the
* data.
*
*
* @param e The #engine.
*/
void engine_redistribute(struct engine *e) {
#ifdef WITH_MPI
const int nr_nodes = e->nr_nodes;
const int nodeID = e->nodeID;
struct space *s = e->s;
struct cell *cells = s->cells_top;
const int nr_cells = s->nr_cells;
struct xpart *xparts = s->xparts;
struct part *parts = s->parts;
struct gpart *gparts = s->gparts;
struct spart *sparts = s->sparts;
struct bpart *bparts = s->bparts;
struct sink *sinks = s->sinks;
ticks tic = getticks();
size_t nr_parts = s->nr_parts;
size_t nr_gparts = s->nr_gparts;
size_t nr_sparts = s->nr_sparts;
size_t nr_bparts = s->nr_bparts;
size_t nr_sinks = s->nr_sinks;
/* Start by moving inhibited particles to the end of the arrays */
for (size_t k = 0; k < nr_parts; /* void */) {
if (parts[k].time_bin == time_bin_inhibited ||
parts[k].time_bin == time_bin_not_created) {
nr_parts -= 1;
/* Swap the particle */
memswap(&parts[k], &parts[nr_parts], sizeof(struct part));
/* Swap the xpart */
memswap(&xparts[k], &xparts[nr_parts], sizeof(struct xpart));
/* Swap the link with the gpart */
if (parts[k].gpart != NULL) {
parts[k].gpart->id_or_neg_offset = -k;
}
if (parts[nr_parts].gpart != NULL) {
parts[nr_parts].gpart->id_or_neg_offset = -nr_parts;
}
} else {
k++;
}
}
/* Now move inhibited star particles to the end of the arrays */
for (size_t k = 0; k < nr_sparts; /* void */) {
if (sparts[k].time_bin == time_bin_inhibited ||
sparts[k].time_bin == time_bin_not_created) {
nr_sparts -= 1;
/* Swap the particle */
memswap(&s->sparts[k], &s->sparts[nr_sparts], sizeof(struct spart));
/* Swap the link with the gpart */
if (s->sparts[k].gpart != NULL) {
s->sparts[k].gpart->id_or_neg_offset = -k;
}
if (s->sparts[nr_sparts].gpart != NULL) {
s->sparts[nr_sparts].gpart->id_or_neg_offset = -nr_sparts;
}
} else {
k++;
}
}
/* Now move inhibited black hole particles to the end of the arrays */
for (size_t k = 0; k < nr_bparts; /* void */) {
if (bparts[k].time_bin == time_bin_inhibited ||
bparts[k].time_bin == time_bin_not_created) {
nr_bparts -= 1;
/* Swap the particle */
memswap(&s->bparts[k], &s->bparts[nr_bparts], sizeof(struct bpart));
/* Swap the link with the gpart */
if (s->bparts[k].gpart != NULL) {
s->bparts[k].gpart->id_or_neg_offset = -k;
}
if (s->bparts[nr_bparts].gpart != NULL) {
s->bparts[nr_bparts].gpart->id_or_neg_offset = -nr_bparts;
}
} else {
k++;
}
}
/* Now move inhibited sink particles to the end of the arrays */
for (size_t k = 0; k < nr_sinks; /* void */) {
if (sinks[k].time_bin == time_bin_inhibited ||
sinks[k].time_bin == time_bin_not_created) {
nr_sinks -= 1;
/* Swap the particle */
memswap(&s->sinks[k], &s->sinks[nr_sinks], sizeof(struct sink));
/* Swap the link with the gpart */
if (s->sinks[k].gpart != NULL) {
s->sinks[k].gpart->id_or_neg_offset = -k;
}
if (s->sinks[nr_sinks].gpart != NULL) {
s->sinks[nr_sinks].gpart->id_or_neg_offset = -nr_sinks;
}
} else {
k++;
}
}
/* Finally do the same with the gravity particles */
for (size_t k = 0; k < nr_gparts; /* void */) {
if (gparts[k].time_bin == time_bin_inhibited ||
gparts[k].time_bin == time_bin_not_created) {
nr_gparts -= 1;
/* Swap the particle */
memswap_unaligned(&s->gparts[k], &s->gparts[nr_gparts],
sizeof(struct gpart));
/* Swap the link with part/spart */
if (s->gparts[k].type == swift_type_gas) {
s->parts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
} else if (s->gparts[k].type == swift_type_stars) {
s->sparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
} else if (s->gparts[k].type == swift_type_black_hole) {
s->bparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
} else if (s->gparts[k].type == swift_type_sink) {
s->sinks[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
}
if (s->gparts[nr_gparts].type == swift_type_gas) {
s->parts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
&s->gparts[nr_gparts];
} else if (s->gparts[nr_gparts].type == swift_type_stars) {
s->sparts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
&s->gparts[nr_gparts];
} else if (s->gparts[nr_gparts].type == swift_type_black_hole) {
s->bparts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
&s->gparts[nr_gparts];
} else if (s->gparts[nr_gparts].type == swift_type_sink) {
s->sinks[-s->gparts[nr_sinks].id_or_neg_offset].gpart =
&s->gparts[nr_gparts];
}
} else {
k++;
}
}
/* Now we are ready to deal with real particles and can start the exchange. */
/* Allocate temporary arrays to store the counts of particles to be sent
* and the destination of each particle */
int *counts;
if ((counts = (int *)calloc(nr_nodes * nr_nodes, sizeof(int))) == NULL)
error("Failed to allocate counts temporary buffer.");
int *dest;
if ((dest = (int *)swift_malloc("dest", sizeof(int) * nr_parts)) == NULL)
error("Failed to allocate dest temporary buffer.");
/* Simple index of node IDs, used for mappers over nodes. */
int *nodes = NULL;
if ((nodes = (int *)malloc(sizeof(int) * nr_nodes)) == NULL)
error("Failed to allocate nodes temporary buffer.");
for (int k = 0; k < nr_nodes; k++) nodes[k] = k;
/* Get destination of each particle */
struct redist_mapper_data redist_data;
redist_data.s = s;
redist_data.nodeID = nodeID;
redist_data.nr_nodes = nr_nodes;
redist_data.counts = counts;
redist_data.dest = dest;
redist_data.base = (void *)parts;
threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_part, parts,
nr_parts, sizeof(struct part), threadpool_auto_chunk_size,
&redist_data);
/* Sort the particles according to their cell index. */
if (nr_parts > 0)
space_parts_sort(s->parts, s->xparts, dest, &counts[nodeID * nr_nodes],
nr_nodes, 0);
#ifdef SWIFT_DEBUG_CHECKS
/* Verify that the part have been sorted correctly. */
for (size_t k = 0; k < nr_parts; k++) {
const struct part *p = &s->parts[k];
if (p->time_bin == time_bin_inhibited)
error("Inhibited particle found after sorting!");
if (p->time_bin == time_bin_not_created)
error("Inhibited particle found after sorting!");
/* New cell index */
const int new_cid =
cell_getid(s->cdim, p->x[0] * s->iwidth[0], p->x[1] * s->iwidth[1],
p->x[2] * s->iwidth[2]);
/* New cell of this part */
const struct cell *c = &s->cells_top[new_cid];
const int new_node = c->nodeID;
if (dest[k] != new_node)
error("part's new node index not matching sorted index.");
if (p->x[0] < c->loc[0] || p->x[0] > c->loc[0] + c->width[0] ||
p->x[1] < c->loc[1] || p->x[1] > c->loc[1] + c->width[1] ||
p->x[2] < c->loc[2] || p->x[2] > c->loc[2] + c->width[2])
error("part not sorted into the right top-level cell!");
}
#endif
/* We will need to re-link the gpart partners of parts, so save their
* relative positions in the sent lists. */
if (nr_parts > 0 && nr_gparts > 0) {
struct savelink_mapper_data savelink_data;
savelink_data.nr_nodes = nr_nodes;
savelink_data.counts = counts;
savelink_data.parts = (void *)parts;
savelink_data.nodeID = nodeID;
threadpool_map(&e->threadpool, engine_redistribute_savelink_mapper_part,
nodes, nr_nodes, sizeof(int), threadpool_auto_chunk_size,
&savelink_data);
}
swift_free("dest", dest);
/* Get destination of each s-particle */
int *s_counts;
if ((s_counts = (int *)calloc(nr_nodes * nr_nodes, sizeof(int))) == NULL)
error("Failed to allocate s_counts temporary buffer.");
int *s_dest;
if ((s_dest = (int *)swift_malloc("s_dest", sizeof(int) * nr_sparts)) == NULL)
error("Failed to allocate s_dest temporary buffer.");
redist_data.counts = s_counts;
redist_data.dest = s_dest;
redist_data.base = (void *)sparts;
threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_spart, sparts,
nr_sparts, sizeof(struct spart), threadpool_auto_chunk_size,
&redist_data);
/* Sort the particles according to their cell index. */
if (nr_sparts > 0)
space_sparts_sort(s->sparts, s_dest, &s_counts[nodeID * nr_nodes], nr_nodes,
0);
#ifdef SWIFT_DEBUG_CHECKS
/* Verify that the spart have been sorted correctly. */
for (size_t k = 0; k < nr_sparts; k++) {
const struct spart *sp = &s->sparts[k];
if (sp->time_bin == time_bin_inhibited)
error("Inhibited particle found after sorting!");
if (sp->time_bin == time_bin_not_created)
error("Inhibited particle found after sorting!");
/* New cell index */
const int new_cid =
cell_getid(s->cdim, sp->x[0] * s->iwidth[0], sp->x[1] * s->iwidth[1],
sp->x[2] * s->iwidth[2]);
/* New cell of this spart */
const struct cell *c = &s->cells_top[new_cid];
const int new_node = c->nodeID;
if (s_dest[k] != new_node)
error("spart's new node index not matching sorted index.");
if (sp->x[0] < c->loc[0] || sp->x[0] > c->loc[0] + c->width[0] ||
sp->x[1] < c->loc[1] || sp->x[1] > c->loc[1] + c->width[1] ||
sp->x[2] < c->loc[2] || sp->x[2] > c->loc[2] + c->width[2])
error("spart not sorted into the right top-level cell!");
}
#endif
/* We need to re-link the gpart partners of sparts. */
if (nr_sparts > 0) {
struct savelink_mapper_data savelink_data;
savelink_data.nr_nodes = nr_nodes;
savelink_data.counts = s_counts;
savelink_data.parts = (void *)sparts;
savelink_data.nodeID = nodeID;
threadpool_map(&e->threadpool, engine_redistribute_savelink_mapper_spart,
nodes, nr_nodes, sizeof(int), threadpool_auto_chunk_size,
&savelink_data);
}
swift_free("s_dest", s_dest);
/* Get destination of each b-particle */
int *b_counts;
if ((b_counts = (int *)calloc(nr_nodes * nr_nodes, sizeof(int))) == NULL)
error("Failed to allocate b_counts temporary buffer.");
int *b_dest;
if ((b_dest = (int *)swift_malloc("b_dest", sizeof(int) * nr_bparts)) == NULL)
error("Failed to allocate b_dest temporary buffer.");
redist_data.counts = b_counts;
redist_data.dest = b_dest;
redist_data.base = (void *)bparts;
threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_bpart, bparts,
nr_bparts, sizeof(struct bpart), threadpool_auto_chunk_size,
&redist_data);
/* Sort the particles according to their cell index. */
if (nr_bparts > 0)
space_bparts_sort(s->bparts, b_dest, &b_counts[nodeID * nr_nodes], nr_nodes,
0);
#ifdef SWIFT_DEBUG_CHECKS
/* Verify that the bpart have been sorted correctly. */
for (size_t k = 0; k < nr_bparts; k++) {
const struct bpart *bp = &s->bparts[k];
if (bp->time_bin == time_bin_inhibited)
error("Inhibited particle found after sorting!");
if (bp->time_bin == time_bin_not_created)
error("Inhibited particle found after sorting!");
/* New cell index */
const int new_cid =
cell_getid(s->cdim, bp->x[0] * s->iwidth[0], bp->x[1] * s->iwidth[1],
bp->x[2] * s->iwidth[2]);
/* New cell of this bpart */
const struct cell *c = &s->cells_top[new_cid];
const int new_node = c->nodeID;
if (b_dest[k] != new_node)
error("bpart's new node index not matching sorted index.");
if (bp->x[0] < c->loc[0] || bp->x[0] > c->loc[0] + c->width[0] ||
bp->x[1] < c->loc[1] || bp->x[1] > c->loc[1] + c->width[1] ||
bp->x[2] < c->loc[2] || bp->x[2] > c->loc[2] + c->width[2])
error("bpart not sorted into the right top-level cell!");
}
#endif
/* We need to re-link the gpart partners of bparts. */
if (nr_bparts > 0) {
struct savelink_mapper_data savelink_data;
savelink_data.nr_nodes = nr_nodes;
savelink_data.counts = b_counts;
savelink_data.parts = (void *)bparts;
savelink_data.nodeID = nodeID;
threadpool_map(&e->threadpool, engine_redistribute_savelink_mapper_bpart,
nodes, nr_nodes, sizeof(int), threadpool_auto_chunk_size,
&savelink_data);
}
swift_free("b_dest", b_dest);
/* Get destination of each sink-particle */
int *sink_counts;
if ((sink_counts = (int *)calloc(nr_nodes * nr_nodes, sizeof(int))) == NULL)
error("Failed to allocate sink_counts temporary buffer.");
int *sink_dest;
if ((sink_dest = (int *)swift_malloc("sink_dest", sizeof(int) * nr_sinks)) ==
NULL)
error("Failed to allocate sink_dest temporary buffer.");
redist_data.counts = sink_counts;
redist_data.dest = sink_dest;
redist_data.base = (void *)sinks;
threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_sink, sinks,
nr_sinks, sizeof(struct sink), threadpool_auto_chunk_size,
&redist_data);
/* Sort the particles according to their cell index. */
if (nr_sinks > 0)
space_sinks_sort(s->sinks, sink_dest, &sink_counts[nodeID * nr_nodes],
nr_nodes, 0);
#ifdef SWIFT_DEBUG_CHECKS
/* Verify that the sink have been sorted correctly. */
for (size_t k = 0; k < nr_sinks; k++) {
const struct sink *sink = &s->sinks[k];
if (sink->time_bin == time_bin_inhibited)
error("Inhibited particle found after sorting!");
if (sink->time_bin == time_bin_not_created)
error("Inhibited particle found after sorting!");
/* New cell index */
const int new_cid =
cell_getid(s->cdim, sink->x[0] * s->iwidth[0],
sink->x[1] * s->iwidth[1], sink->x[2] * s->iwidth[2]);
/* New cell of this sink */
const struct cell *c = &s->cells_top[new_cid];
const int new_node = c->nodeID;
if (sink_dest[k] != new_node)
error("sink's new node index not matching sorted index.");
if (sink->x[0] < c->loc[0] || sink->x[0] > c->loc[0] + c->width[0] ||
sink->x[1] < c->loc[1] || sink->x[1] > c->loc[1] + c->width[1] ||
sink->x[2] < c->loc[2] || sink->x[2] > c->loc[2] + c->width[2])
error("sink not sorted into the right top-level cell!");
}
#endif
/* We need to re-link the gpart partners of sinks. */
if (nr_sinks > 0) {
struct savelink_mapper_data savelink_data;
savelink_data.nr_nodes = nr_nodes;
savelink_data.counts = sink_counts;
savelink_data.parts = (void *)sinks;
savelink_data.nodeID = nodeID;
threadpool_map(&e->threadpool, engine_redistribute_savelink_mapper_sink,
nodes, nr_nodes, sizeof(int), threadpool_auto_chunk_size,
&savelink_data);
}
swift_free("sink_dest", sink_dest);
/* Get destination of each g-particle */
int *g_counts;
if ((g_counts = (int *)calloc(nr_nodes * nr_nodes, sizeof(int))) == NULL)
error("Failed to allocate g_gcount temporary buffer.");
int *g_dest;
if ((g_dest = (int *)swift_malloc("g_dest", sizeof(int) * nr_gparts)) == NULL)
error("Failed to allocate g_dest temporary buffer.");
redist_data.counts = g_counts;
redist_data.dest = g_dest;
redist_data.base = (void *)gparts;
threadpool_map(&e->threadpool, engine_redistribute_dest_mapper_gpart, gparts,
nr_gparts, sizeof(struct gpart), threadpool_auto_chunk_size,
&redist_data);
/* Sort the gparticles according to their cell index. */
if (nr_gparts > 0)
space_gparts_sort(s->gparts, s->parts, s->sinks, s->sparts, s->bparts,
g_dest, &g_counts[nodeID * nr_nodes], nr_nodes);
#ifdef SWIFT_DEBUG_CHECKS
/* Verify that the gpart have been sorted correctly. */
for (size_t k = 0; k < nr_gparts; k++) {
const struct gpart *gp = &s->gparts[k];
if (gp->time_bin == time_bin_inhibited)
error("Inhibited particle found after sorting!");
if (gp->time_bin == time_bin_not_created)
error("Inhibited particle found after sorting!");
/* New cell index */
const int new_cid =
cell_getid(s->cdim, gp->x[0] * s->iwidth[0], gp->x[1] * s->iwidth[1],
gp->x[2] * s->iwidth[2]);
/* New cell of this gpart */
const struct cell *c = &s->cells_top[new_cid];
const int new_node = c->nodeID;
if (g_dest[k] != new_node)
error("gpart's new node index not matching sorted index (%d != %d).",
g_dest[k], new_node);
if (gp->x[0] < c->loc[0] || gp->x[0] > c->loc[0] + c->width[0] ||
gp->x[1] < c->loc[1] || gp->x[1] > c->loc[1] + c->width[1] ||
gp->x[2] < c->loc[2] || gp->x[2] > c->loc[2] + c->width[2])
error("gpart not sorted into the right top-level cell!");
}
#endif
swift_free("g_dest", g_dest);
/* Get all the counts from all the nodes. */
if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to allreduce particle transfer counts.");
/* Get all the g_counts from all the nodes. */
if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to allreduce gparticle transfer counts.");
/* Get all the s_counts from all the nodes. */
if (MPI_Allreduce(MPI_IN_PLACE, s_counts, nr_nodes * nr_nodes, MPI_INT,
MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to allreduce sparticle transfer counts.");
/* Get all the b_counts from all the nodes. */
if (MPI_Allreduce(MPI_IN_PLACE, b_counts, nr_nodes * nr_nodes, MPI_INT,
MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to allreduce bparticle transfer counts.");
/* Get all the sink_counts from all the nodes. */
if (MPI_Allreduce(MPI_IN_PLACE, sink_counts, nr_nodes * nr_nodes, MPI_INT,
MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to allreduce sink particle transfer counts.");
/* Report how many particles will be moved. */
if (e->verbose) {
if (e->nodeID == 0) {
size_t total = 0, g_total = 0, s_total = 0, b_total = 0, sink_total = 0;
size_t unmoved = 0, g_unmoved = 0, s_unmoved = 0, b_unmoved = 0,
sink_unmoved = 0;
for (int p = 0, r = 0; p < nr_nodes; p++) {
for (int n = 0; n < nr_nodes; n++) {
total += counts[r];
g_total += g_counts[r];
s_total += s_counts[r];
b_total += b_counts[r];
sink_total += sink_counts[r];
if (p == n) {
unmoved += counts[r];
g_unmoved += g_counts[r];
s_unmoved += s_counts[r];
b_unmoved += b_counts[r];
sink_unmoved += sink_counts[r];
}
r++;
}
}
if (total > 0)
message("%zu of %zu (%.2f%%) of particles moved", total - unmoved,
total, 100.0 * (double)(total - unmoved) / (double)total);
if (g_total > 0)
message("%zu of %zu (%.2f%%) of g-particles moved", g_total - g_unmoved,
g_total,
100.0 * (double)(g_total - g_unmoved) / (double)g_total);
if (s_total > 0)
message("%zu of %zu (%.2f%%) of s-particles moved", s_total - s_unmoved,
s_total,
100.0 * (double)(s_total - s_unmoved) / (double)s_total);
if (b_total > 0)
message("%ld of %ld (%.2f%%) of b-particles moved", b_total - b_unmoved,
b_total,
100.0 * (double)(b_total - b_unmoved) / (double)b_total);
if (sink_total > 0)
message(
"%ld of %ld (%.2f%%) of sink-particles moved",
sink_total - sink_unmoved, sink_total,
100.0 * (double)(sink_total - sink_unmoved) / (double)sink_total);
}
}
/* Now each node knows how many parts, sparts, bparts, sinks and gparts will
* be transferred to every other node. Get the new numbers of particles for
* this node. */
size_t nr_parts_new = 0, nr_gparts_new = 0, nr_sparts_new = 0,
nr_bparts_new = 0, nr_sinks_new = 0;
for (int k = 0; k < nr_nodes; k++)
nr_parts_new += counts[k * nr_nodes + nodeID];
for (int k = 0; k < nr_nodes; k++)
nr_gparts_new += g_counts[k * nr_nodes + nodeID];
for (int k = 0; k < nr_nodes; k++)
nr_sparts_new += s_counts[k * nr_nodes + nodeID];
for (int k = 0; k < nr_nodes; k++)
nr_bparts_new += b_counts[k * nr_nodes + nodeID];
for (int k = 0; k < nr_nodes; k++)
nr_sinks_new += sink_counts[k * nr_nodes + nodeID];
#ifdef WITH_CSDS
const int initial_redistribute = e->ti_current == 0;
if (!initial_redistribute && e->policy & engine_policy_csds) {
/* Log the particles before sending them out */
size_t part_offset = 0;
size_t spart_offset = 0;
size_t gpart_offset = 0;
size_t bpart_offset = 0;
size_t sink_offset = 0;
for (int i = 0; i < nr_nodes; i++) {
const size_t c_ind = engine_rank * nr_nodes + i;
/* No need to log the local particles. */
if (i == engine_rank) {
part_offset += counts[c_ind];
spart_offset += s_counts[c_ind];
gpart_offset += g_counts[c_ind];
bpart_offset += b_counts[c_ind];
sink_offset += sink_counts[c_ind];
continue;
}
/* Log the hydro parts. */
csds_log_parts(e->csds, &parts[part_offset], &xparts[part_offset],
counts[c_ind], e, /* log_all_fields */ 1,
csds_flag_mpi_exit, i);
/* Log the stellar parts. */
csds_log_sparts(e->csds, &sparts[spart_offset], s_counts[c_ind], e,
/* log_all_fields */ 1, csds_flag_mpi_exit, i);
/* Log the gparts */
csds_log_gparts(e->csds, &gparts[gpart_offset], g_counts[c_ind], e,
/* log_all_fields */ 1, csds_flag_mpi_exit, i);
/* Log the bparts */
if (b_counts[c_ind] > 0) {
error("TODO");
}
/* Log the sinks */
if (sink_counts[c_ind] > 0) {
error("TODO");
}
/* Update the counters */
part_offset += counts[c_ind];
spart_offset += s_counts[c_ind];
gpart_offset += g_counts[c_ind];
bpart_offset += b_counts[c_ind];
sink_offset += sink_counts[c_ind];
}
}
#endif
/* Now exchange the particles, type by type to keep the memory required
* under control. */
/* SPH particles. */
void *new_parts = engine_do_redistribute(
"parts", counts, (char *)s->parts, nr_parts_new, sizeof(struct part),
part_align, part_mpi_type, nr_nodes, nodeID, e->syncredist);
swift_free("parts", s->parts);
s->parts = (struct part *)new_parts;
s->nr_parts = nr_parts_new;
s->size_parts = engine_redistribute_alloc_margin * nr_parts_new;
/* Extra SPH particle properties. */
new_parts = engine_do_redistribute(
"xparts", counts, (char *)s->xparts, nr_parts_new, sizeof(struct xpart),
xpart_align, xpart_mpi_type, nr_nodes, nodeID, e->syncredist);
swift_free("xparts", s->xparts);
s->xparts = (struct xpart *)new_parts;
/* Gravity particles. */
new_parts =
engine_do_redistribute("gparts", g_counts, (char *)s->gparts,
nr_gparts_new, sizeof(struct gpart), gpart_align,
gpart_mpi_type, nr_nodes, nodeID, e->syncredist);
swift_free("gparts", s->gparts);
s->gparts = (struct gpart *)new_parts;
s->nr_gparts = nr_gparts_new;
s->size_gparts = engine_redistribute_alloc_margin * nr_gparts_new;
/* Star particles. */
new_parts =
engine_do_redistribute("sparts", s_counts, (char *)s->sparts,
nr_sparts_new, sizeof(struct spart), spart_align,
spart_mpi_type, nr_nodes, nodeID, e->syncredist);
swift_free("sparts", s->sparts);
s->sparts = (struct spart *)new_parts;
s->nr_sparts = nr_sparts_new;
s->size_sparts = engine_redistribute_alloc_margin * nr_sparts_new;
/* Black holes particles. */
new_parts =
engine_do_redistribute("bparts", b_counts, (char *)s->bparts,
nr_bparts_new, sizeof(struct bpart), bpart_align,
bpart_mpi_type, nr_nodes, nodeID, e->syncredist);
swift_free("bparts", s->bparts);
s->bparts = (struct bpart *)new_parts;
s->nr_bparts = nr_bparts_new;
s->size_bparts = engine_redistribute_alloc_margin * nr_bparts_new;
/* Sink particles. */
new_parts = engine_do_redistribute(
"sinks", sink_counts, (char *)s->sinks, nr_sinks_new, sizeof(struct sink),
sink_align, sink_mpi_type, nr_nodes, nodeID, e->syncredist);
swift_free("sinks", s->sinks);
s->sinks = (struct sink *)new_parts;
s->nr_sinks = nr_sinks_new;
s->size_sinks = engine_redistribute_alloc_margin * nr_sinks_new;
/* All particles have now arrived. Time for some final operations on the
stuff we just received */
#ifdef WITH_CSDS
if (!initial_redistribute && e->policy & engine_policy_csds) {
size_t part_offset = 0;
size_t spart_offset = 0;
size_t gpart_offset = 0;
size_t bpart_offset = 0;
size_t sink_offset = 0;
for (int i = 0; i < nr_nodes; i++) {
const size_t c_ind = i * nr_nodes + engine_rank;
/* No need to log the local particles. */
if (i == engine_rank) {
part_offset += counts[c_ind];
spart_offset += s_counts[c_ind];
gpart_offset += g_counts[c_ind];
bpart_offset += b_counts[c_ind];
sink_offset += sink_counts[c_ind];
continue;
}
/* Log the hydro parts. */
csds_log_parts(e->csds, &s->parts[part_offset], &s->xparts[part_offset],
counts[c_ind], e,
/* log_all_fields */ 1, csds_flag_mpi_enter, i);
/* Log the stellar parts. */
csds_log_sparts(e->csds, &s->sparts[spart_offset], s_counts[c_ind], e,
/* log_all_fields */ 1, csds_flag_mpi_enter, i);
/* Log the gparts */
csds_log_gparts(e->csds, &s->gparts[gpart_offset], g_counts[c_ind], e,
/* log_all_fields */ 1, csds_flag_mpi_enter, i);
/* Log the bparts */
if (b_counts[c_ind] > 0) {
error("TODO");
}
/* Log the sinks */
if (sink_counts[c_ind] > 0) {
error("TODO");
}
/* Update the counters */
part_offset += counts[c_ind];
spart_offset += s_counts[c_ind];
gpart_offset += g_counts[c_ind];
bpart_offset += b_counts[c_ind];
sink_offset += sink_counts[c_ind];
}
}
#endif
/* Restore the part<->gpart and spart<->gpart links.
* Generate indices and counts for threadpool tasks. Note we process a node
* at a time. */
struct relink_mapper_data relink_data;
relink_data.s = s;
relink_data.counts = counts;
relink_data.g_counts = g_counts;
relink_data.s_counts = s_counts;
relink_data.b_counts = b_counts;
relink_data.sink_counts = sink_counts;
relink_data.nodeID = nodeID;
relink_data.nr_nodes = nr_nodes;
threadpool_map(&e->threadpool, engine_redistribute_relink_mapper, nodes,
nr_nodes, sizeof(int), 1, &relink_data);
free(nodes);
/* Clean up the counts now we are done. */
free(counts);
free(g_counts);
free(s_counts);
free(b_counts);
free(sink_counts);
#ifdef SWIFT_DEBUG_CHECKS
/* Verify that all parts are in the right place. */
for (size_t k = 0; k < nr_parts_new; k++) {
const int cid = cell_getid(s->cdim, s->parts[k].x[0] * s->iwidth[0],
s->parts[k].x[1] * s->iwidth[1],
s->parts[k].x[2] * s->iwidth[2]);
if (cells[cid].nodeID != nodeID)
error("Received particle (%zu) that does not belong here (nodeID=%i).", k,
cells[cid].nodeID);
}
for (size_t k = 0; k < nr_gparts_new; k++) {
const int cid = cell_getid(s->cdim, s->gparts[k].x[0] * s->iwidth[0],
s->gparts[k].x[1] * s->iwidth[1],
s->gparts[k].x[2] * s->iwidth[2]);
if (cells[cid].nodeID != nodeID)
error("Received g-particle (%zu) that does not belong here (nodeID=%i).",
k, cells[cid].nodeID);
}
for (size_t k = 0; k < nr_sparts_new; k++) {
const int cid = cell_getid(s->cdim, s->sparts[k].x[0] * s->iwidth[0],
s->sparts[k].x[1] * s->iwidth[1],
s->sparts[k].x[2] * s->iwidth[2]);
if (cells[cid].nodeID != nodeID)
error("Received s-particle (%zu) that does not belong here (nodeID=%i).",
k, cells[cid].nodeID);
}
for (size_t k = 0; k < nr_bparts_new; k++) {
const int cid = cell_getid(s->cdim, s->bparts[k].x[0] * s->iwidth[0],
s->bparts[k].x[1] * s->iwidth[1],
s->bparts[k].x[2] * s->iwidth[2]);
if (cells[cid].nodeID != nodeID)
error("Received b-particle (%zu) that does not belong here (nodeID=%i).",
k, cells[cid].nodeID);
}
for (size_t k = 0; k < nr_sinks_new; k++) {
const int cid = cell_getid(s->cdim, s->sinks[k].x[0] * s->iwidth[0],
s->sinks[k].x[1] * s->iwidth[1],
s->sinks[k].x[2] * s->iwidth[2]);
if (cells[cid].nodeID != nodeID)
error(
"Received sink-particle (%zu) that does not belong here (nodeID=%i).",
k, cells[cid].nodeID);
}
/* Verify that the links are correct */
part_verify_links(s->parts, s->gparts, s->sinks, s->sparts, s->bparts,
nr_parts_new, nr_gparts_new, nr_sinks_new, nr_sparts_new,
nr_bparts_new, e->verbose);
#endif
/* Be verbose about what just happened. */
if (e->verbose) {
int my_cells = 0;
for (int k = 0; k < nr_cells; k++)
if (cells[k].nodeID == nodeID) my_cells += 1;
message(
"node %i now has %zu parts, %zu sparts, %zu bparts, %zu sinks and %zu "
"gparts in "
"%i cells.",
nodeID, nr_parts_new, nr_sparts_new, nr_bparts_new, nr_sinks_new,
nr_gparts_new, my_cells);
}
/* Flag that we do not have any extra particles any more */
s->nr_extra_parts = 0;
s->nr_extra_gparts = 0;
s->nr_extra_sparts = 0;
s->nr_extra_bparts = 0;
s->nr_extra_sinks = 0;
/* Flag that a redistribute has taken place */
e->step_props |= engine_step_prop_redistribute;
if (e->verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
clocks_getunit());
#else
error("SWIFT was not compiled with MPI support.");
#endif
}