Commit 95b97ce3 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'redistribute_gparts' into 'master'

Redistribute gparts

Initial version of `engine_redistribute()` which sends around the gparts as well as the parts + preserves their links.

@nnrw56 what do you think ?
I'll need to update some bits later to take into account your changes to the MPI_types and the new version of `space_part_sort()`

See merge request !124
parents 137cd5fc 78c2c26e
...@@ -139,39 +139,56 @@ void engine_make_ghost_tasks(struct engine *e, struct cell *c, ...@@ -139,39 +139,56 @@ void engine_make_ghost_tasks(struct engine *e, struct cell *c,
* @brief Redistribute the particles amongst the nodes according * @brief Redistribute the particles amongst the nodes according
* to their cell's node IDs. * to their cell's node IDs.
* *
* The strategy here is as follows:
* 1) Each node counts the number of particles it has to send to each other
* node.
* 2) The number of particles of each type is then exchanged.
* 3) The particles to send are placed in a temporary buffer in which the
* part-gpart links are preserved.
* 4) Each node allocates enough space for the new particles.
* 5) (Asynchronous) communications are issued to transfer the data.
*
*
* @param e The #engine. * @param e The #engine.
*/ */
void engine_redistribute(struct engine *e) { void engine_redistribute(struct engine *e) {
#ifdef WITH_MPI #ifdef WITH_MPI
int nr_nodes = e->nr_nodes, nodeID = e->nodeID; const int nr_nodes = e->nr_nodes;
const int nodeID = e->nodeID;
struct space *s = e->s; struct space *s = e->s;
int my_cells = 0;
int *cdim = s->cdim;
struct cell *cells = s->cells; struct cell *cells = s->cells;
int nr_cells = s->nr_cells; const int nr_cells = s->nr_cells;
const int *cdim = s->cdim;
const double ih[3] = {s->ih[0], s->ih[1], s->ih[2]};
const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
struct part *parts = s->parts;
struct xpart *xparts = s->xparts;
struct gpart *gparts = s->gparts;
ticks tic = getticks(); ticks tic = getticks();
/* Start by sorting the particles according to their nodes and /* Allocate temporary arrays to store the counts of particles to be sent
getting the counts. The counts array is indexed as and the destination of each particle */
count[from * nr_nodes + to]. */ int *counts, *g_counts;
int *counts; if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
size_t *dest; error("Failed to allocate count temporary buffer.");
double ih[3], dim[3]; if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
ih[0] = s->ih[0]; error("Failed to allocate gcount temporary buffer.");
ih[1] = s->ih[1];
ih[2] = s->ih[2];
dim[0] = s->dim[0];
dim[1] = s->dim[1];
dim[2] = s->dim[2];
if ((counts = (int *)malloc(sizeof(int) *nr_nodes *nr_nodes)) == NULL ||
(dest = (size_t *)malloc(sizeof(size_t) * s->nr_parts)) == NULL)
error("Failed to allocate count and dest buffers.");
bzero(counts, sizeof(int) * nr_nodes * nr_nodes); bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
struct part *parts = s->parts; bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);
// MATTHIEU: Should be int and not size_t once Pedro's changes are merged.
size_t *dest, *g_dest;
if ((dest = (size_t *)malloc(sizeof(size_t) * s->nr_parts)) == NULL)
error("Failed to allocate dest temporary buffer.");
if ((g_dest = (size_t *)malloc(sizeof(size_t) * s->nr_gparts)) == NULL)
error("Failed to allocate g_dest temporary buffer.");
/* Get destination of each particle */
for (size_t k = 0; k < s->nr_parts; k++) { for (size_t k = 0; k < s->nr_parts; k++) {
/* Periodic boundary conditions */
for (int j = 0; j < 3; j++) { for (int j = 0; j < 3; j++) {
if (parts[k].x[j] < 0.0) if (parts[k].x[j] < 0.0)
parts[k].x[j] += dim[j]; parts[k].x[j] += dim[j];
...@@ -184,36 +201,115 @@ void engine_redistribute(struct engine *e) { ...@@ -184,36 +201,115 @@ void engine_redistribute(struct engine *e) {
error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].", error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].",
cid, k, parts[k].x[0], parts[k].x[1], parts[k].x[2]); */ cid, k, parts[k].x[0], parts[k].x[1], parts[k].x[2]); */
dest[k] = cells[cid].nodeID; dest[k] = cells[cid].nodeID;
/* The counts array is indexed as count[from * nr_nodes + to]. */
counts[nodeID * nr_nodes + dest[k]] += 1; counts[nodeID * nr_nodes + dest[k]] += 1;
} }
/* Sort the particles according to their cell index. */
space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose); space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
/* We need to re-link the gpart partners of parts. */
int current_dest = dest[0];
size_t count_this_dest = 0;
for (size_t k = 0; k < s->nr_parts; ++k) {
if (s->parts[k].gpart != NULL) {
/* As the addresses will be invalidated by the communications, we will */
/* instead store the absolute index from the start of the sub-array */
/* of particles to be sent to a given node. */
/* Recall that gparts without partners have a negative id. */
/* We will restore the pointers on the receiving node later on. */
if (dest[k] != current_dest) {
current_dest = dest[k];
count_this_dest = 0;
}
s->parts[k].gpart->id = count_this_dest;
count_this_dest++;
}
}
/* Get destination of each g-particle */
for (size_t k = 0; k < s->nr_gparts; k++) {
/* Periodic boundary conditions */
for (int j = 0; j < 3; j++) {
if (gparts[k].x[j] < 0.0)
gparts[k].x[j] += dim[j];
else if (gparts[k].x[j] >= dim[j])
gparts[k].x[j] -= dim[j];
}
const int cid = cell_getid(cdim, gparts[k].x[0] * ih[0],
gparts[k].x[1] * ih[1], gparts[k].x[2] * ih[2]);
/* if (cid < 0 || cid >= s->nr_cells)
error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].",
cid, k, g_parts[k].x[0], g_parts[k].x[1], g_parts[k].x[2]); */
g_dest[k] = cells[cid].nodeID;
/* The counts array is indexed as count[from * nr_nodes + to]. */
g_counts[nodeID * nr_nodes + dest[k]] += 1;
}
/* Sort the gparticles according to their cell index. */
space_gparts_sort(gparts, g_dest, s->nr_gparts, 0, nr_nodes - 1);
/* Get all the counts from all the nodes. */ /* Get all the counts from all the nodes. */
if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM, if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
MPI_COMM_WORLD) != MPI_SUCCESS) MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to allreduce particle transfer counts."); error("Failed to allreduce particle transfer counts.");
/* Get the new number of parts for this node, be generous in allocating. */ /* Get all the g_counts from all the nodes. */
size_t nr_parts = 0; if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to allreduce gparticle transfer counts.");
/* Each node knows how many parts and gparts will be transferred to every
other node. We can start preparing to receive data */
/* Get the new number of parts and gparts for this node */
size_t nr_parts = 0, nr_gparts = 0;
for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID]; for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
for (int k = 0; k < nr_nodes; k++)
nr_gparts += g_counts[k * nr_nodes + nodeID];
/* Allocate the new arrays with some extra margin */
struct part *parts_new = NULL; struct part *parts_new = NULL;
struct xpart *xparts_new = NULL, *xparts = s->xparts; struct xpart *xparts_new = NULL;
struct gpart *gparts_new = NULL;
if (posix_memalign((void **)&parts_new, part_align, if (posix_memalign((void **)&parts_new, part_align,
sizeof(struct part) * nr_parts * 1.2) != 0 || sizeof(struct part) * nr_parts *
posix_memalign((void **)&xparts_new, part_align, engine_redistribute_alloc_margin) != 0)
sizeof(struct xpart) * nr_parts * 1.2) != 0)
error("Failed to allocate new part data."); error("Failed to allocate new part data.");
if (posix_memalign((void **)&xparts_new, xpart_align,
/* Emit the sends and recvs for the particle data. */ sizeof(struct xpart) * nr_parts *
engine_redistribute_alloc_margin) != 0)
error("Failed to allocate new xpart data.");
if (posix_memalign((void **)&gparts_new, gpart_align,
sizeof(struct gpart) * nr_gparts *
engine_redistribute_alloc_margin) != 0)
error("Failed to allocate new gpart data.");
/* Prepare MPI requests for the asynchronous communications */
MPI_Request *reqs; MPI_Request *reqs;
if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 4 * nr_nodes)) == if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
NULL) NULL)
error("Failed to allocate MPI request list."); error("Failed to allocate MPI request list.");
for (int k = 0; k < 4 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL; for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;
for (size_t offset_send = 0, offset_recv = 0, k = 0; k < nr_nodes; k++) {
int ind_send = nodeID * nr_nodes + k; /* Emit the sends and recvs for the particle and gparticle data. */
int ind_recv = k * nr_nodes + nodeID; size_t offset_send = 0, offset_recv = 0;
size_t g_offset_send = 0, g_offset_recv = 0;
for (int k = 0; k < nr_nodes; k++) {
/* Indices in the count arrays of the node of interest */
const int ind_send = nodeID * nr_nodes + k;
const int ind_recv = k * nr_nodes + nodeID;
/* Are we sending any part/xpart ? */
if (counts[ind_send] > 0) { if (counts[ind_send] > 0) {
/* If the send is to the same node, just copy */
if (k == nodeID) { if (k == nodeID) {
memcpy(&parts_new[offset_recv], &s->parts[offset_send], memcpy(&parts_new[offset_recv], &s->parts[offset_send],
sizeof(struct part) * counts[ind_recv]); sizeof(struct part) * counts[ind_recv]);
...@@ -221,36 +317,71 @@ void engine_redistribute(struct engine *e) { ...@@ -221,36 +317,71 @@ void engine_redistribute(struct engine *e) {
sizeof(struct xpart) * counts[ind_recv]); sizeof(struct xpart) * counts[ind_recv]);
offset_send += counts[ind_send]; offset_send += counts[ind_send];
offset_recv += counts[ind_recv]; offset_recv += counts[ind_recv];
/* Else, emit some communications */
} else { } else {
if (MPI_Isend(&s->parts[offset_send], counts[ind_send], if (MPI_Isend(&s->parts[offset_send], counts[ind_send],
e->part_mpi_type, k, 2 * ind_send + 0, MPI_COMM_WORLD, e->part_mpi_type, k, 3 * ind_send + 0, MPI_COMM_WORLD,
&reqs[4 * k]) != MPI_SUCCESS) &reqs[6 * k]) != MPI_SUCCESS)
error("Failed to isend parts to node %zi.", k); error("Failed to isend parts to node %i.", k);
if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], if (MPI_Isend(&s->xparts[offset_send], counts[ind_send],
e->xpart_mpi_type, k, 2 * ind_send + 1, MPI_COMM_WORLD, e->xpart_mpi_type, k, 3 * ind_send + 1, MPI_COMM_WORLD,
&reqs[4 * k + 1]) != MPI_SUCCESS) &reqs[6 * k + 1]) != MPI_SUCCESS)
error("Failed to isend xparts to node %zi.", k); error("Failed to isend xparts to node %i.", k);
offset_send += counts[ind_send]; offset_send += counts[ind_send];
} }
} }
/* Are we sending any gpart ? */
if (g_counts[ind_send] > 0) {
/* If the send is to the same node, just copy */
if (k == nodeID) {
memcpy(&gparts_new[g_offset_recv], &s->gparts[offset_send],
sizeof(struct gpart) * g_counts[ind_recv]);
g_offset_send += g_counts[ind_send];
g_offset_recv += g_counts[ind_recv];
/* Else, emit some communications */
} else {
if (MPI_Isend(&s->gparts[g_offset_send], g_counts[ind_send],
e->gpart_mpi_type, k, 3 * ind_send + 2, MPI_COMM_WORLD,
&reqs[6 * k + 2]) != MPI_SUCCESS)
error("Failed to isend gparts to node %i.", k);
g_offset_send += counts[ind_send];
}
}
/* Now emit the corresponding Irecv() */
/* Are we receiving any part/xpart from this node ? */
if (k != nodeID && counts[ind_recv] > 0) { if (k != nodeID && counts[ind_recv] > 0) {
if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], e->part_mpi_type, if (MPI_Irecv(&parts_new[offset_recv], counts[ind_recv], e->part_mpi_type,
k, 2 * ind_recv + 0, MPI_COMM_WORLD, k, 3 * ind_recv + 0, MPI_COMM_WORLD,
&reqs[4 * k + 2]) != MPI_SUCCESS) &reqs[6 * k + 3]) != MPI_SUCCESS)
error("Failed to emit irecv of parts from node %zi.", k); error("Failed to emit irecv of parts from node %i.", k);
if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv], if (MPI_Irecv(&xparts_new[offset_recv], counts[ind_recv],
e->xpart_mpi_type, k, 2 * ind_recv + 1, MPI_COMM_WORLD, e->xpart_mpi_type, k, 3 * ind_recv + 1, MPI_COMM_WORLD,
&reqs[4 * k + 3]) != MPI_SUCCESS) &reqs[6 * k + 4]) != MPI_SUCCESS)
error("Failed to emit irecv of parts from node %zi.", k); error("Failed to emit irecv of xparts from node %i.", k);
offset_recv += counts[ind_recv]; offset_recv += counts[ind_recv];
} }
/* Are we receiving any gpart from this node ? */
if (k != nodeID && g_counts[ind_recv] > 0) {
if (MPI_Irecv(&gparts_new[g_offset_recv], g_counts[ind_recv],
e->gpart_mpi_type, k, 3 * ind_recv + 2, MPI_COMM_WORLD,
&reqs[6 * k + 5]) != MPI_SUCCESS)
error("Failed to emit irecv of gparts from node %i.", k);
g_offset_recv += g_counts[ind_recv];
}
} }
/* Wait for all the sends and recvs to tumble in. */ /* Wait for all the sends and recvs to tumble in. */
MPI_Status stats[4 * nr_nodes]; MPI_Status stats[6 * nr_nodes];
int res; int res;
if ((res = MPI_Waitall(4 * nr_nodes, reqs, stats)) != MPI_SUCCESS) { if ((res = MPI_Waitall(6 * nr_nodes, reqs, stats)) != MPI_SUCCESS) {
for (int k = 0; k < 4 * nr_nodes; k++) { for (int k = 0; k < 6 * nr_nodes; k++) {
char buff[MPI_MAX_ERROR_STRING]; char buff[MPI_MAX_ERROR_STRING];
int res; int res;
MPI_Error_string(stats[k].MPI_ERROR, buff, &res); MPI_Error_string(stats[k].MPI_ERROR, buff, &res);
...@@ -259,6 +390,32 @@ void engine_redistribute(struct engine *e) { ...@@ -259,6 +390,32 @@ void engine_redistribute(struct engine *e) {
error("Failed during waitall for part data."); error("Failed during waitall for part data.");
} }
/* We now need to restore the part<->gpart links */
size_t offset_parts = 0, offset_gparts = 0;
for (int node = 0; node < nr_nodes; ++node) {
const int ind_recv = node * nr_nodes + nodeID;
const size_t count_parts = counts[ind_recv];
const size_t count_gparts = g_counts[ind_recv];
/* Loop over the gparts received from that node */
for (size_t k = offset_gparts; k < offset_gparts + count_gparts; ++k) {
/* Does this gpart have a partner ? */
if (gparts_new[k].id >= 0) {
const size_t partner_index = offset_parts + gparts_new[k].id;
/* Re-link */
gparts_new[k].part = &parts_new[partner_index];
gparts_new[k].part->gpart = &gparts_new[k];
}
}
offset_parts += count_parts;
offset_gparts += count_gparts;
}
/* Verify that all parts are in the right place. */ /* Verify that all parts are in the right place. */
/* for ( int k = 0 ; k < nr_parts ; k++ ) { /* for ( int k = 0 ; k < nr_parts ; k++ ) {
int cid = cell_getid( cdim , parts_new[k].x[0]*ih[0], parts_new[k].x[1]*ih[1], parts_new[k].x[2]*ih[2] ); int cid = cell_getid( cdim , parts_new[k].x[0]*ih[0], parts_new[k].x[1]*ih[1], parts_new[k].x[2]*ih[2] );
...@@ -266,26 +423,45 @@ void engine_redistribute(struct engine *e) { ...@@ -266,26 +423,45 @@ void engine_redistribute(struct engine *e) {
error( "Received particle (%i) that does not belong here (nodeID=%i).", k , cells[ cid ].nodeID ); error( "Received particle (%i) that does not belong here (nodeID=%i).", k , cells[ cid ].nodeID );
} */ } */
/* Verify that the links are correct */
/* MATTHIEU: To be commented out once we are happy */
for (size_t k = 0; k < nr_gparts; ++k) {
if (gparts_new[k].id > 0) {
if (gparts_new[k].x[0] != gparts_new[k].part->x[0] ||
gparts_new[k].x[1] != gparts_new[k].part->x[1] ||
gparts_new[k].x[2] != gparts_new[k].part->x[2])
message("Linked particles are not at the same position !");
}
}
/* Set the new part data, free the old. */ /* Set the new part data, free the old. */
free(parts); free(parts);
free(xparts); free(xparts);
free(gparts);
s->parts = parts_new; s->parts = parts_new;
s->xparts = xparts_new; s->xparts = xparts_new;
s->gparts = gparts_new;
s->nr_parts = nr_parts; s->nr_parts = nr_parts;
s->size_parts = 1.2 * nr_parts; s->nr_gparts = nr_gparts;
s->size_parts = engine_redistribute_alloc_margin * nr_parts;
/* Be verbose about what just happened. */ s->size_gparts = engine_redistribute_alloc_margin * nr_gparts;
for (int k = 0; k < nr_cells; k++)
if (cells[k].nodeID == nodeID) my_cells += 1;
if (e->verbose)
message("node %i now has %zi parts in %i cells.", nodeID, nr_parts,
my_cells);
/* Clean up other stuff. */ /* Clean up the temporary stuff. */
free(reqs); free(reqs);
free(counts); free(counts);
free(dest); free(dest);
/* Be verbose about what just happened. */
if (e->verbose) {
int my_cells = 0;
for (int k = 0; k < nr_cells; k++)
if (cells[k].nodeID == nodeID) my_cells += 1;
message("node %i now has %zi parts and %zi gparts in %i cells.", nodeID,
nr_parts, nr_gparts, my_cells);
}
if (e->verbose) if (e->verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic), message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
clocks_getunit()); clocks_getunit());
...@@ -2175,6 +2351,7 @@ void engine_init(struct engine *e, struct space *s, float dt, int nr_threads, ...@@ -2175,6 +2351,7 @@ void engine_init(struct engine *e, struct space *s, float dt, int nr_threads,
#ifdef WITH_MPI #ifdef WITH_MPI
part_create_mpi_type(&e->part_mpi_type); part_create_mpi_type(&e->part_mpi_type);
xpart_create_mpi_type(&e->xpart_mpi_type); xpart_create_mpi_type(&e->xpart_mpi_type);
gpart_create_mpi_type(&e->gpart_mpi_type);
#endif #endif
/* First of all, init the barrier and lock it. */ /* First of all, init the barrier and lock it. */
......
...@@ -62,6 +62,7 @@ extern const char *engine_policy_names[]; ...@@ -62,6 +62,7 @@ extern const char *engine_policy_names[];
#define engine_maxtaskspercell 96 #define engine_maxtaskspercell 96
#define engine_maxproxies 64 #define engine_maxproxies 64
#define engine_tasksreweight 10 #define engine_tasksreweight 10
#define engine_redistribute_alloc_margin 1.2
/* The rank of the engine as a global variable (for messages). */ /* The rank of the engine as a global variable (for messages). */
extern int engine_rank; extern int engine_rank;
...@@ -165,6 +166,7 @@ struct engine { ...@@ -165,6 +166,7 @@ struct engine {
/* MPI data type for the particle transfers */ /* MPI data type for the particle transfers */
MPI_Datatype part_mpi_type; MPI_Datatype part_mpi_type;
MPI_Datatype xpart_mpi_type; MPI_Datatype xpart_mpi_type;
MPI_Datatype gpart_mpi_type;
#endif #endif
}; };
......
...@@ -65,4 +65,22 @@ void xpart_create_mpi_type(MPI_Datatype* xpart_type) { ...@@ -65,4 +65,22 @@ void xpart_create_mpi_type(MPI_Datatype* xpart_type) {
MPI_Type_commit(xpart_type); MPI_Type_commit(xpart_type);
} }
/**
* @brief Registers and returns an MPI type for the gparticles
*
* @param gpart_type The type container
*/
void gpart_create_mpi_type(MPI_Datatype* gpart_type) {
/* This is not the recommended way of doing this.
One should define the structure field by field
But as long as we don't do serialization via MPI-IO
we don't really care.
Also we would have to modify this function everytime something
is added to the part structure. */
MPI_Type_contiguous(sizeof(struct gpart) / sizeof(unsigned char), MPI_BYTE,
gpart_type);
MPI_Type_commit(gpart_type);
}
#endif #endif
...@@ -54,6 +54,7 @@ ...@@ -54,6 +54,7 @@
#ifdef WITH_MPI #ifdef WITH_MPI
void part_create_mpi_type(MPI_Datatype* part_type); void part_create_mpi_type(MPI_Datatype* part_type);
void xpart_create_mpi_type(MPI_Datatype* xpart_type); void xpart_create_mpi_type(MPI_Datatype* xpart_type);
void gpart_create_mpi_type(MPI_Datatype* gpart_type);
#endif #endif
#endif /* SWIFT_PART_H */ #endif /* SWIFT_PART_H */
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment