Commit 2ec982df authored by Matthieu Schaller's avatar Matthieu Schaller
Browse files

Move the updated synchronised redistribute function to the new engine_redistribute.c file.

parent b52e6c84
......@@ -158,1092 +158,6 @@ void engine_addlink(struct engine *e, struct link **l, struct task *t) {
res->next = atomic_swap(l, res);
}
#ifdef WITH_MPI
/**
* Do the exchange of one type of particles with all the other nodes.
*
* @param label a label for the memory allocations of this particle type.
* @param counts 2D array with the counts of particles to exchange with
* each other node.
* @param parts the particle data to exchange
* @param new_nr_parts the number of particles this node will have after all
* exchanges have completed.
* @param sizeofparts sizeof the particle struct.
* @param alignsize the memory alignment required for this particle type.
* @param mpi_type the MPI_Datatype for these particles.
* @param nr_nodes the number of nodes to exchange with.
* @param nodeID the id of this node.
* @param syncredist whether to use slower more memory friendly synchronous
* exchanges.
*
* @result new particle data constructed from all the exchanges with the
* given alignment.
*/
static void *engine_do_redistribute(const char *label, int *counts, char *parts,
size_t new_nr_parts, size_t sizeofparts,
size_t alignsize, MPI_Datatype mpi_type,
int nr_nodes, int nodeID, int syncredist) {
/* Allocate a new particle array with some extra margin */
char *parts_new = NULL;
if (swift_memalign(
label, (void **)&parts_new, alignsize,
sizeofparts * new_nr_parts * engine_redistribute_alloc_margin) != 0)
error("Failed to allocate new particle data.");
if (syncredist) {
/* Slow synchronous redistribute,. */
size_t offset_send = 0, offset_recv = 0;
/* Only send and receive only "chunk" particles per request.
* Fixing the message size to 2GB. */
const int chunk = INT_MAX / sizeofparts;
int res = 0;
for (int k = 0; k < nr_nodes; k++) {
int kk = k;
/* Rank 0 decides the index of sending node */
MPI_Bcast(&kk, 1, MPI_INT, 0, MPI_COMM_WORLD);
int ind_recv = kk * nr_nodes + nodeID;
if ( nodeID == kk ) {
/* Send out our particles. */
offset_send = 0;
for (int j = 0; j < nr_nodes; j++) {
int ind_send = kk * nr_nodes + j;
/* Just copy our own parts */
if ( counts[ind_send] > 0 ) {
if ( j == nodeID ) {
memcpy(&parts_new[offset_recv * sizeofparts],
&parts[offset_send * sizeofparts],
sizeofparts * counts[ind_recv]);
offset_send += counts[ind_send];
offset_recv += counts[ind_recv];
}
else {
for (int i = 0, n = 0; i < counts[ind_send]; n++) {
/* Count and index, with chunk parts at most. */
size_t sendc = min(chunk, counts[ind_send] - i);
size_t sendo = offset_send + i;
res = MPI_Send(&parts[sendo * sizeofparts], sendc, mpi_type, j,
n, MPI_COMM_WORLD);
if ( res != MPI_SUCCESS ) {
mpi_error(res, "Failed to send parts to node %i from %i.",
j, nodeID);
}
i += sendc;
}
offset_send += counts[ind_send];
}
}
}
}
else {
/* Listen for sends from kk. */
if (counts[ind_recv] > 0) {
for (int i = 0, n = 0; i < counts[ind_recv]; n++) {
/* Count and index, with +chunk parts at most. */
size_t recvc = min(chunk, counts[ind_recv] - i);
size_t recvo = offset_recv + i;
MPI_Status status;
res = MPI_Recv(&parts_new[recvo * sizeofparts], recvc, mpi_type, kk,
n, MPI_COMM_WORLD, &status);
if ( res != MPI_SUCCESS ) {
mpi_error(res,"Failed to recv of parts from node %i to %i.",
kk, nodeID);
}
i += recvc;
}
offset_recv += counts[ind_recv];
}
}
}
} else {
/* Asynchronous redistribute, can take a lot of memory. */
/* Prepare MPI requests for the asynchronous communications */
MPI_Request *reqs;
if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 2 * nr_nodes)) ==
NULL)
error("Failed to allocate MPI request list.");
/* Only send and receive only "chunk" particles per request. So we need to
* loop as many times as necessary here. Make 2Gb/sizeofparts so we only
* send 2Gb packets. */
const int chunk = INT_MAX / sizeofparts;
int sent = 0;
int recvd = 0;
int activenodes = 1;
while (activenodes) {
for (int k = 0; k < 2 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;
/* Emit the sends and recvs for the data. */
size_t offset_send = sent;
size_t offset_recv = recvd;
activenodes = 0;
for (int k = 0; k < nr_nodes; k++) {
/* Indices in the count arrays of the node of interest */
const int ind_send = nodeID * nr_nodes + k;
const int ind_recv = k * nr_nodes + nodeID;
/* Are we sending any data this loop? */
int sending = counts[ind_send] - sent;
if (sending > 0) {
activenodes++;
if (sending > chunk) sending = chunk;
/* If the send and receive is local then just copy. */
if (k == nodeID) {
int receiving = counts[ind_recv] - recvd;
if (receiving > chunk) receiving = chunk;
memcpy(&parts_new[offset_recv * sizeofparts],
&parts[offset_send * sizeofparts], sizeofparts * receiving);
} else {
/* Otherwise send it. */
int res =
MPI_Isend(&parts[offset_send * sizeofparts], sending, mpi_type, k,
ind_send, MPI_COMM_WORLD, &reqs[2 * k + 0]);
if (res != MPI_SUCCESS)
mpi_error(res, "Failed to isend parts to node %i.", k);
}
}
/* If we're sending to this node, then move past it to next. */
if (counts[ind_send] > 0) offset_send += counts[ind_send];
/* Are we receiving any data from this node? Note already done if coming
* from this node. */
if (k != nodeID) {
int receiving = counts[ind_recv] - recvd;
if (receiving > 0) {
activenodes++;
if (receiving > chunk) receiving = chunk;
int res = MPI_Irecv(&parts_new[offset_recv * sizeofparts], receiving,
mpi_type, k, ind_recv, MPI_COMM_WORLD,
&reqs[2 * k + 1]);
if (res != MPI_SUCCESS)
mpi_error(res, "Failed to emit irecv of parts from node %i.", k);
}
}
/* If we're receiving from this node, then move past it to next. */
if (counts[ind_recv] > 0) offset_recv += counts[ind_recv];
}
/* Wait for all the sends and recvs to tumble in. */
MPI_Status stats[2 * nr_nodes];
int res;
if ((res = MPI_Waitall(2 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
for (int k = 0; k < 2 * nr_nodes; k++) {
char buff[MPI_MAX_ERROR_STRING];
MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
message("request from source %i, tag %i has error '%s'.",
stats[k].MPI_SOURCE, stats[k].MPI_TAG, buff);
}
error("Failed during waitall for part data.");
}
/* Move to next chunks. */
sent += chunk;
recvd += chunk;
}
/* Free temps. */
free(reqs);
}
/* And return new memory. */
return parts_new;
}
#endif
#ifdef WITH_MPI /* redist_mapper */
/* Support for engine_redistribute threadpool dest mappers. */
struct redist_mapper_data {
int *counts;
int *dest;
int nodeID;
int nr_nodes;
struct cell *cells;
struct space *s;
void *base;
};
/* Generic function for accumulating counts for TYPE parts. Note
* we use a local counts array to avoid the atomic_add in the parts
* loop. */
#define ENGINE_REDISTRIBUTE_DEST_MAPPER(TYPE) \
engine_redistribute_dest_mapper_##TYPE(void *map_data, int num_elements, \
void *extra_data) { \
struct TYPE *parts = (struct TYPE *)map_data; \
struct redist_mapper_data *mydata = \
(struct redist_mapper_data *)extra_data; \
struct space *s = mydata->s; \
int *dest = \
mydata->dest + (ptrdiff_t)(parts - (struct TYPE *)mydata->base); \
int *lcounts = NULL; \
if ((lcounts = (int *)calloc( \
sizeof(int), mydata->nr_nodes * mydata->nr_nodes)) == NULL) \
error("Failed to allocate counts thread-specific buffer"); \
for (int k = 0; k < num_elements; k++) { \
for (int j = 0; j < 3; j++) { \
if (parts[k].x[j] < 0.0) \
parts[k].x[j] += s->dim[j]; \
else if (parts[k].x[j] >= s->dim[j]) \
parts[k].x[j] -= s->dim[j]; \
} \
const int cid = cell_getid(s->cdim, parts[k].x[0] * s->iwidth[0], \
parts[k].x[1] * s->iwidth[1], \
parts[k].x[2] * s->iwidth[2]); \
dest[k] = s->cells_top[cid].nodeID; \
size_t ind = mydata->nodeID * mydata->nr_nodes + dest[k]; \
lcounts[ind] += 1; \
} \
for (int k = 0; k < (mydata->nr_nodes * mydata->nr_nodes); k++) \
atomic_add(&mydata->counts[k], lcounts[k]); \
free(lcounts); \
}
/**
* @brief Accumulate the counts of particles per cell.
* Threadpool helper for accumulating the counts of particles per cell.
*
* part version.
*/
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(part);
/**
* @brief Accumulate the counts of star particles per cell.
* Threadpool helper for accumulating the counts of particles per cell.
*
* spart version.
*/
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(spart);
/**
* @brief Accumulate the counts of gravity particles per cell.
* Threadpool helper for accumulating the counts of particles per cell.
*
* gpart version.
*/
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(gpart);
/**
* @brief Accumulate the counts of black holes particles per cell.
* Threadpool helper for accumulating the counts of particles per cell.
*
* bpart version.
*/
static void ENGINE_REDISTRIBUTE_DEST_MAPPER(bpart);
#endif /* redist_mapper_data */
#ifdef WITH_MPI /* savelink_mapper_data */
/* Support for saving the linkage between gparts and parts/sparts. */
struct savelink_mapper_data {
int nr_nodes;
int *counts;
void *parts;
int nodeID;
};
/**
* @brief Save the offset of each gravity partner of a part or spart.
*
* The offset is from the start of the sorted particles to be sent to a node.
* This is possible as parts without gravity partners have a positive id.
* These offsets are used to restore the pointers on the receiving node.
*
* CHECKS should be eliminated as dead code when optimizing.
*/
#define ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(TYPE, CHECKS) \
engine_redistribute_savelink_mapper_##TYPE(void *map_data, int num_elements, \
void *extra_data) { \
int *nodes = (int *)map_data; \
struct savelink_mapper_data *mydata = \
(struct savelink_mapper_data *)extra_data; \
int nodeID = mydata->nodeID; \
int nr_nodes = mydata->nr_nodes; \
int *counts = mydata->counts; \
struct TYPE *parts = (struct TYPE *)mydata->parts; \
\
for (int j = 0; j < num_elements; j++) { \
int node = nodes[j]; \
int count = 0; \
size_t offset = 0; \
for (int i = 0; i < node; i++) offset += counts[nodeID * nr_nodes + i]; \
\
for (int k = 0; k < counts[nodeID * nr_nodes + node]; k++) { \
if (parts[k + offset].gpart != NULL) { \
if (CHECKS) \
if (parts[k + offset].gpart->id_or_neg_offset > 0) \
error("Trying to link a partnerless " #TYPE "!"); \
parts[k + offset].gpart->id_or_neg_offset = -count; \
count++; \
} \
} \
} \
}
/**
* @brief Save position of part-gpart links.
* Threadpool helper for accumulating the counts of particles per cell.
*/
#ifdef SWIFT_DEBUG_CHECKS
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 1);
#else
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(part, 0);
#endif
/**
* @brief Save position of spart-gpart links.
* Threadpool helper for accumulating the counts of particles per cell.
*/
#ifdef SWIFT_DEBUG_CHECKS
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 1);
#else
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(spart, 0);
#endif
/**
* @brief Save position of bpart-gpart links.
* Threadpool helper for accumulating the counts of particles per cell.
*/
#ifdef SWIFT_DEBUG_CHECKS
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(bpart, 1);
#else
static void ENGINE_REDISTRIBUTE_SAVELINK_MAPPER(bpart, 0);
#endif
#endif /* savelink_mapper_data */
#ifdef WITH_MPI /* relink_mapper_data */
/* Support for relinking parts, gparts, sparts and bparts after moving between
* nodes. */
struct relink_mapper_data {
int nodeID;
int nr_nodes;
int *counts;
int *s_counts;
int *g_counts;
int *b_counts;
struct space *s;
};
/**
* @brief Restore the part/gpart and spart/gpart links for a list of nodes.
*
* @param map_data address of nodes to process.
* @param num_elements the number nodes to process.
* @param extra_data additional data defining the context (a
* relink_mapper_data).
*/
static void engine_redistribute_relink_mapper(void *map_data, int num_elements,
void *extra_data) {
int *nodes = (int *)map_data;
struct relink_mapper_data *mydata = (struct relink_mapper_data *)extra_data;
int nodeID = mydata->nodeID;
int nr_nodes = mydata->nr_nodes;
int *counts = mydata->counts;
int *g_counts = mydata->g_counts;
int *s_counts = mydata->s_counts;
int *b_counts = mydata->b_counts;
struct space *s = mydata->s;
for (int i = 0; i < num_elements; i++) {
int node = nodes[i];
/* Get offsets to correct parts of the counts arrays for this node. */
size_t offset_parts = 0;
size_t offset_gparts = 0;
size_t offset_sparts = 0;
size_t offset_bparts = 0;
for (int n = 0; n < node; n++) {
int ind_recv = n * nr_nodes + nodeID;
offset_parts += counts[ind_recv];
offset_gparts += g_counts[ind_recv];
offset_sparts += s_counts[ind_recv];
offset_bparts += b_counts[ind_recv];
}
/* Number of gparts sent from this node. */
int ind_recv = node * nr_nodes + nodeID;
const size_t count_gparts = g_counts[ind_recv];
/* Loop over the gparts received from this node */
for (size_t k = offset_gparts; k < offset_gparts + count_gparts; k++) {
/* Does this gpart have a gas partner ? */
if (s->gparts[k].type == swift_type_gas) {
const ptrdiff_t partner_index =
offset_parts - s->gparts[k].id_or_neg_offset;
/* Re-link */
s->gparts[k].id_or_neg_offset = -partner_index;
s->parts[partner_index].gpart = &s->gparts[k];
}
/* Does this gpart have a star partner ? */
else if (s->gparts[k].type == swift_type_stars) {
const ptrdiff_t partner_index =
offset_sparts - s->gparts[k].id_or_neg_offset;
/* Re-link */
s->gparts[k].id_or_neg_offset = -partner_index;
s->sparts[partner_index].gpart = &s->gparts[k];
}
/* Does this gpart have a black hole partner ? */
else if (s->gparts[k].type == swift_type_black_hole) {
const ptrdiff_t partner_index =
offset_bparts - s->gparts[k].id_or_neg_offset;
/* Re-link */
s->gparts[k].id_or_neg_offset = -partner_index;
s->bparts[partner_index].gpart = &s->gparts[k];
}
}
}
}
#endif /* relink_mapper_data */
/**
* @brief Redistribute the particles amongst the nodes according
* to their cell's node IDs.
*
* The strategy here is as follows:
* 1) Each node counts the number of particles it has to send to each other
* node.
* 2) The number of particles of each type is then exchanged.
* 3) The particles to send are placed in a temporary buffer in which the
* part-gpart links are preserved.
* 4) Each node allocates enough space for the new particles.
* 5) Asynchronous or synchronous communications are issued to transfer the data.
*
*
* @param e The #engine.
*/
void engine_redistribute(struct engine *e) {
#ifdef WITH_MPI
const int nr_nodes = e->nr_nodes;
const int nodeID = e->nodeID;
struct space *s = e->s;
struct cell *cells = s->cells_top;
const int nr_cells = s->nr_cells;
struct xpart *xparts = s->xparts;
struct part *parts = s->parts;
struct gpart *gparts = s->gparts;
struct spart *sparts = s->sparts;
struct bpart *bparts = s->bparts;
ticks tic = getticks();
size_t nr_parts = s->nr_parts;
size_t nr_gparts = s->nr_gparts;
size_t nr_sparts = s->nr_sparts;
size_t nr_bparts = s->nr_bparts;
/* Start by moving inhibited particles to the end of the arrays */
for (size_t k = 0; k < nr_parts; /* void */) {
if (parts[k].time_bin == time_bin_inhibited ||
parts[k].time_bin == time_bin_not_created) {
nr_parts -= 1;
/* Swap the particle */
memswap(&parts[k], &parts[nr_parts], sizeof(struct part));
/* Swap the xpart */
memswap(&xparts[k], &xparts[nr_parts], sizeof(struct xpart));
/* Swap the link with the gpart */
if (parts[k].gpart != NULL) {
parts[k].gpart->id_or_neg_offset = -k;
}
if (parts[nr_parts].gpart != NULL) {
parts[nr_parts].gpart->id_or_neg_offset = -nr_parts;
}
} else {
k++;
}
}
/* Now move inhibited star particles to the end of the arrays */
for (size_t k = 0; k < nr_sparts; /* void */) {
if (sparts[k].time_bin == time_bin_inhibited ||
sparts[k].time_bin == time_bin_not_created) {
nr_sparts -= 1;
/* Swap the particle */
memswap(&s->sparts[k], &s->sparts[nr_sparts], sizeof(struct spart));
/* Swap the link with the gpart */
if (s->sparts[k].gpart != NULL) {
s->sparts[k].gpart->id_or_neg_offset = -k;
}
if (s->sparts[nr_sparts].gpart != NULL) {
s->sparts[nr_sparts].gpart->id_or_neg_offset = -nr_sparts;
}
} else {
k++;
}
}
/* Now move inhibited black hole particles to the end of the arrays */
for (size_t k = 0; k < nr_bparts; /* void */) {
if (bparts[k].time_bin == time_bin_inhibited ||
bparts[k].time_bin == time_bin_not_created) {
nr_bparts -= 1;
/* Swap the particle */
memswap(&s->bparts[k], &s->bparts[nr_bparts], sizeof(struct bpart));
/* Swap the link with the gpart */
if (s->bparts[k].gpart != NULL) {
s->bparts[k].gpart->id_or_neg_offset = -k;
}
if (s->bparts[nr_bparts].gpart != NULL) {
s->bparts[nr_bparts].gpart->id_or_neg_offset = -nr_bparts;
}
} else {
k++;
}
}
/* Finally do the same with the gravity particles */
for (size_t k = 0; k < nr_gparts; /* void */) {
if (gparts[k].time_bin == time_bin_inhibited ||
gparts[k].time_bin == time_bin_not_created) {
nr_gparts -= 1;
/* Swap the particle */
memswap(&s->gparts[k], &s->gparts[nr_gparts], sizeof(struct gpart));
/* Swap the link with part/spart */
if (s->gparts[k].type == swift_type_gas) {
s->parts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
} else if (s->gparts[k].type == swift_type_stars) {
s->sparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
} else if (s->gparts[k].type == swift_type_black_hole) {
s->bparts[-s->gparts[k].id_or_neg_offset].gpart = &s->gparts[k];
}
if (s->gparts[nr_gparts].type == swift_type_gas) {
s->parts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
&s->gparts[nr_gparts];
} else if (s->gparts[nr_gparts].type == swift_type_stars) {
s->sparts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
&s->gparts[nr_gparts];
} else if (s->gparts[nr_gparts].type == swift_type_black_hole) {
s->bparts[-s->gparts[nr_gparts].id_or_neg_offset].gpart =
&s->gparts[nr_gparts];
}
} else {
k++;
}
}
/* Now we are ready to deal with real particles and can start the exchange. */
/* Allocate temporary arrays to store the counts of particles to be sent
* and the destination of each particle */
int *counts;
if ((counts = (int *)calloc(sizeof(int), nr_nodes * nr_nodes)) == NULL)
error("Failed to allocate counts temporary buffer.");
int *dest;
if ((dest = (int *)swift_malloc("dest", sizeof(int) * nr_parts)) == NULL)
error("Failed to allocate dest temporary buffer.");
/* Simple index of node IDs, used for mappers over nodes. */
int *nodes = NULL;