Commit 330b2d61 authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

Merge branch 'master' into engine_exchange_strays

Conflicts:
	src/engine.c
	src/engine.h
	src/part.c
	src/part.h
	src/proxy.h
	src/scheduler.c
parents 2dce7ff8 0402ed82
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk),
......@@ -79,7 +80,9 @@ int main(int argc, char *argv[]) {
int nr_nodes = 1, myrank = 0;
FILE *file_thread;
int with_outputs = 1;
int verbose = 0, talking;
int with_gravity = 0;
int engine_policies = 0;
int verbose = 0, talking = 0;
unsigned long long cpufreq = 0;
#ifdef WITH_MPI
......@@ -97,12 +100,15 @@ int main(int argc, char *argv[]) {
#endif
#endif
/* Choke on FP-exceptions. */
// feenableexcept( FE_DIVBYZERO | FE_INVALID | FE_OVERFLOW );
/* Choke on FP-exceptions. */
// feenableexcept( FE_DIVBYZERO | FE_INVALID | FE_OVERFLOW );
/* Initialize CPU frequency, this also starts time. */
clocks_set_cpufreq(cpufreq);
#ifdef WITH_MPI
/* Start by initializing MPI. */
int res, prov;
int res = 0, prov = 0;
if ((res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov)) !=
MPI_SUCCESS)
error("Call to MPI_Init failed with error %i.", res);
......@@ -128,9 +134,6 @@ int main(int argc, char *argv[]) {
&initial_partition.grid[1], &initial_partition.grid[0]);
#endif
/* Initialize CPU frequency, this also starts time. */
clocks_set_cpufreq(cpufreq);
/* Greeting message */
if (myrank == 0) greetings();
......@@ -156,7 +159,7 @@ int main(int argc, char *argv[]) {
bzero(&s, sizeof(struct space));
/* Parse the options */
while ((c = getopt(argc, argv, "a:c:d:e:f:h:m:oP:q:R:s:t:v:w:y:z:")) != -1)
while ((c = getopt(argc, argv, "a:c:d:e:f:gh:m:oP:q:R:s:t:v:w:y:z:")) != -1)
switch (c) {
case 'a':
if (sscanf(optarg, "%lf", &scaling) != 1)
......@@ -185,6 +188,9 @@ int main(int argc, char *argv[]) {
case 'f':
if (!strcpy(ICfileName, optarg)) error("Error parsing IC file name.");
break;
case 'g':
with_gravity = 1;
break;
case 'h':
if (sscanf(optarg, "%llu", &cpufreq) != 1)
error("Error parsing CPU frequency.");
......@@ -356,11 +362,11 @@ int main(int argc, char *argv[]) {
if (myrank == 0) clocks_gettime(&tic);
#if defined(WITH_MPI)
#if defined(HAVE_PARALLEL_HDF5)
read_ic_parallel(ICfileName, dim, &parts, &Ngas, &periodic, myrank, nr_nodes,
MPI_COMM_WORLD, MPI_INFO_NULL);
read_ic_parallel(ICfileName, dim, &parts, &gparts, &Ngas, &Ngpart, &periodic,
myrank, nr_nodes, MPI_COMM_WORLD, MPI_INFO_NULL);
#else
read_ic_serial(ICfileName, dim, &parts, &Ngas, &periodic, myrank, nr_nodes,
MPI_COMM_WORLD, MPI_INFO_NULL);
read_ic_serial(ICfileName, dim, &parts, &gparts, &Ngas, &Ngpart, &periodic,
myrank, nr_nodes, MPI_COMM_WORLD, MPI_INFO_NULL);
#endif
#else
read_ic_single(ICfileName, dim, &parts, &gparts, &Ngas, &Ngpart, &periodic);
......@@ -376,6 +382,7 @@ int main(int argc, char *argv[]) {
#if defined(WITH_MPI)
long long N_long[2] = {Ngas, Ngpart};
MPI_Reduce(&N_long, &N_total, 2, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
N_total[1] -= N_total[0];
if (myrank == 0)
message("Read %lld gas particles and %lld DM particles from the ICs",
N_total[0], N_total[1]);
......@@ -383,9 +390,35 @@ int main(int argc, char *argv[]) {
N_total[0] = Ngas;
N_total[1] = Ngpart - Ngas;
message("Read %lld gas particles and %lld DM particles from the ICs",
N_total[0], N_total[1]);
N_total[0], N_total[1]);
#endif
/* MATTHIEU: Temporary fix to preserve master */
if (!with_gravity) {
free(gparts);
gparts = NULL;
for(size_t k = 0; k < Ngas; ++k)
parts[k].gpart = NULL;
Ngpart = 0;
#if defined(WITH_MPI)
N_long[0] = Ngas;
N_long[1] = Ngpart;
MPI_Reduce(&N_long, &N_total, 2, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
if (myrank == 0)
message(
"AFTER FIX: Read %lld gas particles and %lld DM particles from the "
"ICs",
N_total[0], N_total[1]);
#else
N_total[0] = Ngas;
N_total[1] = Ngpart;
message(
"AFTER FIX: Read %lld gas particles and %lld DM particles from the ICs",
N_total[0], N_total[1]);
#endif
}
/* MATTHIEU: End temporary fix */
/* Apply h scaling */
if (scaling != 1.0)
for (size_t k = 0; k < Ngas; k++) parts[k].h *= scaling;
......@@ -448,12 +481,15 @@ int main(int argc, char *argv[]) {
message("nr of cells at depth %i is %i.", data[0], data[1]);
}
/* Construct the engine policy */
engine_policies = ENGINE_POLICY | engine_policy_steal | engine_policy_hydro;
if (with_gravity) engine_policies |= engine_policy_external_gravity;
/* Initialize the engine with this space. */
if (myrank == 0) clocks_gettime(&tic);
if (myrank == 0) message("nr_nodes is %i.", nr_nodes);
engine_init(&e, &s, dt_max, nr_threads, nr_queues, nr_nodes, myrank,
ENGINE_POLICY | engine_policy_steal | engine_policy_hydro, 0,
time_end, dt_min, dt_max, talking);
engine_policies, 0, time_end, dt_min, dt_max, talking);
if (myrank == 0 && verbose) {
clocks_gettime(&toc);
message("engine_init took %.3f %s.", clocks_diff(&tic, &toc),
......
......@@ -45,6 +45,9 @@
#include "kernel.h"
#include "version.h"
const char* particle_type_names[NUM_PARTICLE_TYPES] = {
"Gas", "DM", "Boundary", "Dummy", "Star", "BH"};
/**
* @brief Converts a C data type to the HDF5 equivalent.
*
......@@ -402,52 +405,68 @@ void createXMFfile() {
*snapshot
*
* @param xmfFile The file to write in.
* @param Nparts The number of particles.
* @param hdfFileName The name of the HDF5 file corresponding to this output.
* @param time The current simulation time.
*/
void writeXMFheader(FILE* xmfFile, long long Nparts, char* hdfFileName,
float time) {
void writeXMFoutputheader(FILE* xmfFile, char* hdfFileName, float time) {
/* Write end of file */
fprintf(xmfFile, "<!-- XMF description for file: %s -->\n", hdfFileName);
fprintf(xmfFile,
"<Grid GridType=\"Collection\" CollectionType=\"Spatial\">\n");
fprintf(xmfFile, "<Time Type=\"Single\" Value=\"%f\"/>\n", time);
fprintf(xmfFile, "<Grid Name=\"Gas\" GridType=\"Uniform\">\n");
fprintf(xmfFile,
"<Topology TopologyType=\"Polyvertex\" Dimensions=\"%lld\"/>\n",
Nparts);
fprintf(xmfFile, "<Geometry GeometryType=\"XYZ\">\n");
fprintf(xmfFile,
"<DataItem Dimensions=\"%lld 3\" NumberType=\"Double\" "
"Precision=\"8\" "
"Format=\"HDF\">%s:/PartType0/Coordinates</DataItem>\n",
Nparts, hdfFileName);
fprintf(xmfFile, "</Geometry>");
}
/**
* @brief Writes the end of the XMF file (closes all open markups)
*
* @param xmfFile The file to write in.
* @param output The number of this output.
* @param time The current simulation time.
*/
void writeXMFfooter(FILE* xmfFile) {
void writeXMFoutputfooter(FILE* xmfFile, int output, float time) {
/* Write end of the section of this time step */
fprintf(xmfFile, "\n</Grid>\n");
fprintf(xmfFile, "</Grid>\n");
fprintf(xmfFile, "\n</Grid>\n");
fprintf(xmfFile,
"\n</Grid> <!-- End of meta-data for output=%03i, time=%f -->\n",
output, time);
fprintf(xmfFile, "\n</Grid> <!-- timeSeries -->\n");
fprintf(xmfFile, "</Domain>\n");
fprintf(xmfFile, "</Xdmf>\n");
fclose(xmfFile);
}
void writeXMFgroupheader(FILE* xmfFile, char* hdfFileName, size_t N,
enum PARTICLE_TYPE ptype) {
fprintf(xmfFile, "\n<Grid Name=\"%s\" GridType=\"Uniform\">\n",
particle_type_names[ptype]);
fprintf(xmfFile,
"<Topology TopologyType=\"Polyvertex\" Dimensions=\"%zi\"/>\n", N);
fprintf(xmfFile, "<Geometry GeometryType=\"XYZ\">\n");
fprintf(xmfFile,
"<DataItem Dimensions=\"%zi 3\" NumberType=\"Double\" "
"Precision=\"8\" "
"Format=\"HDF\">%s:/PartType%d/Coordinates</DataItem>\n",
N, hdfFileName, ptype);
fprintf(xmfFile,
"</Geometry>\n <!-- Done geometry for %s, start of particle fields "
"list -->\n",
particle_type_names[ptype]);
}
void writeXMFgroupfooter(FILE* xmfFile, enum PARTICLE_TYPE ptype) {
fprintf(xmfFile, "</Grid> <!-- End of meta-data for parttype=%s -->\n",
particle_type_names[ptype]);
}
/**
* @brief Writes the lines corresponding to an array of the HDF5 output
*
* @param xmfFile The file in which to write
* @param fileName The name of the HDF5 file associated to this XMF descriptor.
* @param partTypeGroupName The name of the group containing the particles in
*the HDF5 file.
* @param name The name of the array in the HDF5 file.
* @param N The number of particles.
* @param dim The dimension of the quantity (1 for scalars, 3 for vectors).
......@@ -455,21 +474,21 @@ void writeXMFfooter(FILE* xmfFile) {
*
* @todo Treat the types in a better way.
*/
void writeXMFline(FILE* xmfFile, char* fileName, char* name, long long N,
int dim, enum DATA_TYPE type) {
void writeXMFline(FILE* xmfFile, char* fileName, char* partTypeGroupName,
char* name, size_t N, int dim, enum DATA_TYPE type) {
fprintf(xmfFile,
"<Attribute Name=\"%s\" AttributeType=\"%s\" Center=\"Node\">\n",
name, dim == 1 ? "Scalar" : "Vector");
if (dim == 1)
fprintf(xmfFile,
"<DataItem Dimensions=\"%lld\" NumberType=\"Double\" "
"Precision=\"%d\" Format=\"HDF\">%s:/PartType0/%s</DataItem>\n",
N, type == FLOAT ? 4 : 8, fileName, name);
"<DataItem Dimensions=\"%zi\" NumberType=\"Double\" "
"Precision=\"%d\" Format=\"HDF\">%s:%s/%s</DataItem>\n",
N, type == FLOAT ? 4 : 8, fileName, partTypeGroupName, name);
else
fprintf(xmfFile,
"<DataItem Dimensions=\"%lld %d\" NumberType=\"Double\" "
"Precision=\"%d\" Format=\"HDF\">%s:/PartType0/%s</DataItem>\n",
N, dim, type == FLOAT ? 4 : 8, fileName, name);
"<DataItem Dimensions=\"%zi %d\" NumberType=\"Double\" "
"Precision=\"%d\" Format=\"HDF\">%s:%s/%s</DataItem>\n",
N, dim, type == FLOAT ? 4 : 8, fileName, partTypeGroupName, name);
fprintf(xmfFile, "</Attribute>\n");
}
......@@ -483,13 +502,14 @@ void writeXMFline(FILE* xmfFile, char* fileName, char* name, long long N,
* @param gparts The array of #gpart freshly read in.
* @param Ndm The number of DM particles read in.
*/
void prepare_dm_gparts(struct gpart* gparts, size_t Ndm) {
void prepare_dm_gparts(struct gpart* const gparts, size_t Ndm) {
/* Let's give all these gparts a negative id */
for (size_t i = 0; i < Ndm; ++i) {
/* 0 or negative ids are not allowed */
if (gparts[i].id <= 0) error("0 or negative ID for DM particle");
if (gparts[i].id <= 0)
error("0 or negative ID for DM particle %zd: ID=%lld", i, gparts[i].id);
gparts[i].id = -gparts[i].id;
}
......@@ -507,8 +527,9 @@ void prepare_dm_gparts(struct gpart* gparts, size_t Ndm) {
* @param Ngas The number of gas particles read in.
* @param Ndm The number of DM particles read in.
*/
void duplicate_hydro_gparts(struct part* parts, struct gpart* gparts,
size_t Ngas, size_t Ndm) {
void duplicate_hydro_gparts(struct part* const parts,
struct gpart* const gparts, size_t Ngas,
size_t Ndm) {
for (size_t i = 0; i < Ngas; ++i) {
......@@ -537,16 +558,19 @@ void duplicate_hydro_gparts(struct part* parts, struct gpart* gparts,
* @param dmparts The array of #gpart containg DM particles to be filled.
* @param Ndm The number of DM particles.
*/
void collect_dm_gparts(struct gpart* gparts, size_t Ntot, struct gpart* dmparts,
size_t Ndm) {
void collect_dm_gparts(const struct gpart* const gparts, size_t Ntot,
struct gpart* const dmparts, size_t Ndm) {
size_t count = 0;
/* Loop over all gparts */
for (size_t i = 0; i < Ntot; ++i) {
/* message("i=%zd count=%zd id=%lld part=%p", i, count, gparts[i].id,
* gparts[i].part); */
/* And collect the DM ones */
if (gparts[i].id < 0) {
if (gparts[i].id < 0LL) {
memcpy(&dmparts[count], &gparts[i], sizeof(struct gpart));
dmparts[count].id = -dmparts[count].id;
count++;
......
......@@ -70,14 +70,20 @@ enum PARTICLE_TYPE {
NUM_PARTICLE_TYPES
};
extern const char* particle_type_names[];
#define FILENAME_BUFFER_SIZE 150
#define PARTICLE_GROUP_BUFFER_SIZE 20
hid_t hdf5Type(enum DATA_TYPE type);
size_t sizeOfType(enum DATA_TYPE type);
void collect_dm_gparts(struct gpart* gparts, size_t Ntot, struct gpart* dmparts,
size_t Ndm);
void prepare_dm_gparts(struct gpart* gparts, size_t Ndm);
void duplicate_hydro_gparts(struct part* parts, struct gpart* gparts,
size_t Ngas, size_t Ndm);
void collect_dm_gparts(const struct gpart* const gparts, size_t Ntot,
struct gpart* const dmparts, size_t Ndm);
void prepare_dm_gparts(struct gpart* const gparts, size_t Ndm);
void duplicate_hydro_gparts(struct part* const parts,
struct gpart* const gparts, size_t Ngas,
size_t Ndm);
void readAttribute(hid_t grp, char* name, enum DATA_TYPE type, void* data);
......@@ -92,10 +98,13 @@ void writeAttribute_s(hid_t grp, char* name, const char* str);
void createXMFfile();
FILE* prepareXMFfile();
void writeXMFfooter(FILE* xmfFile);
void writeXMFheader(FILE* xmfFile, long long N, char* hdfFileName, float time);
void writeXMFline(FILE* xmfFile, char* fileName, char* name, long long N,
int dim, enum DATA_TYPE type);
void writeXMFoutputheader(FILE* xmfFile, char* hdfFileName, float time);
void writeXMFoutputfooter(FILE* xmfFile, int outputCount, float time);
void writeXMFgroupheader(FILE* xmfFile, char* hdfFileName, size_t N,
enum PARTICLE_TYPE ptype);
void writeXMFgroupfooter(FILE* xmfFile, enum PARTICLE_TYPE ptype);
void writeXMFline(FILE* xmfFile, char* fileName, char* partTypeGroupName,
char* name, size_t N, int dim, enum DATA_TYPE type);
void writeCodeDescription(hid_t h_file);
void writeSPHflavour(hid_t h_file);
......
......@@ -139,39 +139,56 @@ void engine_make_ghost_tasks(struct engine *e, struct cell *c,
* @brief Redistribute the particles amongst the nodes according
* to their cell's node IDs.
*
* The strategy here is as follows:
* 1) Each node counts the number of particles it has to send to each other
* node.
* 2) The number of particles of each type is then exchanged.
* 3) The particles to send are placed in a temporary buffer in which the
* part-gpart links are preserved.
* 4) Each node allocates enough space for the new particles.
* 5) (Asynchronous) communications are issued to transfer the data.
*
*
* @param e The #engine.
*/
void engine_redistribute(struct engine *e) {
#ifdef WITH_MPI
int nr_nodes = e->nr_nodes, nodeID = e->nodeID;
const int nr_nodes = e->nr_nodes;
const int nodeID = e->nodeID;
struct space *s = e->s;
int my_cells = 0;
int *cdim = s->cdim;
struct cell *cells = s->cells;
int nr_cells = s->nr_cells;
const int nr_cells = s->nr_cells;
const int *cdim = s->cdim;
const double ih[3] = {s->ih[0], s->ih[1], s->ih[2]};
const double dim[3] = {s->dim[0], s->dim[1], s->dim[2]};
struct part *parts = s->parts;
struct xpart *xparts = s->xparts;
struct gpart *gparts = s->gparts;
ticks tic = getticks();
/* Start by sorting the particles according to their nodes and
getting the counts. The counts array is indexed as
count[from * nr_nodes + to]. */
int *counts;
int *dest;
double ih[3], dim[3];
ih[0] = s->ih[0];
ih[1] = s->ih[1];
ih[2] = s->ih[2];
dim[0] = s->dim[0];
dim[1] = s->dim[1];
dim[2] = s->dim[2];
if ((counts = (int *)malloc(sizeof(int) *nr_nodes *nr_nodes)) == NULL ||
(dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
error("Failed to allocate count and dest buffers.");
/* Allocate temporary arrays to store the counts of particles to be sent
and the destination of each particle */
int *counts, *g_counts;
if ((counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
error("Failed to allocate count temporary buffer.");
if ((g_counts = (int *)malloc(sizeof(int) * nr_nodes * nr_nodes)) == NULL)
error("Failed to allocate gcount temporary buffer.");
bzero(counts, sizeof(int) * nr_nodes * nr_nodes);
struct part *parts = s->parts;
bzero(g_counts, sizeof(int) * nr_nodes * nr_nodes);
// Allocate the destination index arrays.
int *dest, *g_dest;
if ((dest = (int *)malloc(sizeof(int) * s->nr_parts)) == NULL)
error("Failed to allocate dest temporary buffer.");
if ((g_dest = (int *)malloc(sizeof(int) * s->nr_gparts)) == NULL)
error("Failed to allocate g_dest temporary buffer.");
/* Get destination of each particle */
for (size_t k = 0; k < s->nr_parts; k++) {
/* Periodic boundary conditions */
for (int j = 0; j < 3; j++) {
if (parts[k].x[j] < 0.0)
parts[k].x[j] += dim[j];
......@@ -184,36 +201,121 @@ void engine_redistribute(struct engine *e) {
error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].",
cid, k, parts[k].x[0], parts[k].x[1], parts[k].x[2]); */
dest[k] = cells[cid].nodeID;
/* The counts array is indexed as count[from * nr_nodes + to]. */
counts[nodeID * nr_nodes + dest[k]] += 1;
}
/* Sort the particles according to their cell index. */
space_parts_sort(s, dest, s->nr_parts, 0, nr_nodes - 1, e->verbose);
/* We need to re-link the gpart partners of parts. */
int current_dest = dest[0];
size_t count_this_dest = 0;
for (size_t k = 0; k < s->nr_parts; ++k) {
if (s->parts[k].gpart != NULL) {
/* As the addresses will be invalidated by the communications, we will */
/* instead store the absolute index from the start of the sub-array */
/* of particles to be sent to a given node. */
/* Recall that gparts without partners have a negative id. */
/* We will restore the pointers on the receiving node later on. */
if (dest[k] != current_dest) {
current_dest = dest[k];
count_this_dest = 0;
}
/* Debug */
/* if(s->parts[k].gpart->id < 0) */
/* error("Trying to link a partnerless gpart !"); */
s->parts[k].gpart->id = count_this_dest;
count_this_dest++;
}
}
/* Get destination of each g-particle */
for (size_t k = 0; k < s->nr_gparts; k++) {
/* Periodic boundary conditions */
for (int j = 0; j < 3; j++) {
if (gparts[k].x[j] < 0.0)
gparts[k].x[j] += dim[j];
else if (gparts[k].x[j] >= dim[j])
gparts[k].x[j] -= dim[j];
}
const int cid = cell_getid(cdim, gparts[k].x[0] * ih[0],
gparts[k].x[1] * ih[1], gparts[k].x[2] * ih[2]);
/* if (cid < 0 || cid >= s->nr_cells)
error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].",
cid, k, g_parts[k].x[0], g_parts[k].x[1], g_parts[k].x[2]); */
g_dest[k] = cells[cid].nodeID;
/* The counts array is indexed as count[from * nr_nodes + to]. */
g_counts[nodeID * nr_nodes + g_dest[k]] += 1;
}
/* Sort the gparticles according to their cell index. */
space_gparts_sort(gparts, g_dest, s->nr_gparts, 0, nr_nodes - 1);
/* Get all the counts from all the nodes. */
if (MPI_Allreduce(MPI_IN_PLACE, counts, nr_nodes * nr_nodes, MPI_INT, MPI_SUM,
MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to allreduce particle transfer counts.");
/* Get the new number of parts for this node, be generous in allocating. */
size_t nr_parts = 0;
/* Get all the g_counts from all the nodes. */
if (MPI_Allreduce(MPI_IN_PLACE, g_counts, nr_nodes * nr_nodes, MPI_INT,
MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to allreduce gparticle transfer counts.");
/* Each node knows how many parts and gparts will be transferred to every
other node. We can start preparing to receive data */
/* Get the new number of parts and gparts for this node */
size_t nr_parts = 0, nr_gparts = 0;
for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID];
for (int k = 0; k < nr_nodes; k++)
nr_gparts += g_counts[k * nr_nodes + nodeID];
/* Allocate the new arrays with some extra margin */
struct part *parts_new = NULL;
struct xpart *xparts_new = NULL, *xparts = s->xparts;
struct xpart *xparts_new = NULL;
struct gpart *gparts_new = NULL;
if (posix_memalign((void **)&parts_new, part_align,
sizeof(struct part) * nr_parts * 1.2) != 0 ||
posix_memalign((void **)&xparts_new, part_align,
sizeof(struct xpart) * nr_parts * 1.2) != 0)
sizeof(struct part) * nr_parts *
engine_redistribute_alloc_margin) != 0)
error("Failed to allocate new part data.");
/* Emit the sends and recvs for the particle data. */
if (posix_memalign((void **)&xparts_new, xpart_align,
sizeof(struct xpart) * nr_parts *
engine_redistribute_alloc_margin) != 0)
error("Failed to allocate new xpart data.");
if (posix_memalign((void **)&gparts_new, gpart_align,
sizeof(struct gpart) * nr_gparts *
engine_redistribute_alloc_margin) != 0)
error("Failed to allocate new gpart data.");
/* Prepare MPI requests for the asynchronous communications */
MPI_Request *reqs;
if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 4 * nr_nodes)) ==
if ((reqs = (MPI_Request *)malloc(sizeof(MPI_Request) * 6 * nr_nodes)) ==
NULL)
error("Failed to allocate MPI request list.");
for (int k = 0; k < 4 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;
for (size_t offset_send = 0, offset_recv = 0, k = 0; k < nr_nodes; k++) {
int ind_send = nodeID * nr_nodes + k;
int ind_recv = k * nr_nodes + nodeID;
for (int k = 0; k < 6 * nr_nodes; k++) reqs[k] = MPI_REQUEST_NULL;
/* Emit the sends and recvs for the particle and gparticle data. */
size_t offset_send = 0, offset_recv = 0;
size_t g_offset_send = 0, g_offset_recv = 0;
for (int k = 0; k < nr_nodes; k++) {
/* Indices in the count arrays of the node of interest */
const int ind_send = nodeID * nr_nodes + k;
const int ind_recv = k * nr_nodes + nodeID;
/* Are we sending any part/xpart ? */
if (counts[ind_send] > 0) {
/* message("Sending %d part to node %d", counts[ind_send], k); */
/* If the send is to the same node, just copy */
if (k == nodeID) {
memcpy(&parts_new[offset_recv], &s->parts[offset_send],
sizeof(struct part) * counts[ind_recv]);
......@@ -221,36 +323,73 @@ void engine_redistribute(struct engine *e) {
sizeof(struct xpart) * counts[ind_recv]);
offset_send += counts[ind_send];
offset_recv += counts[ind_recv];
/* Else, emit some communications */
} else {
if (MPI_Isend(&s->parts[offset_send], counts[ind_send],
part_mpi_type, k, 2 * ind_send + 0, MPI_COMM_WORLD,
&reqs[4 * k]) != MPI_SUCCESS)
error("Failed to isend parts to node %zi.", k);
if (MPI_Isend(&s->xparts[offset_send], counts[ind_send],
xpart_mpi_type, k, 2 * ind_send + 1, MPI_COMM_WORLD,
&reqs[4 * k + 1]) != MPI_SUCCESS)
error("Failed to isend xparts to node %zi.", k);
if (MPI_Isend(&s->parts[offset_send], counts[ind_send], part_mpi_type,
k, 3 * ind_send + 0, MPI_COMM_WORLD,
&reqs[6 * k]) != MPI_SUCCESS)
error("Failed to isend parts to node %i.", k);
if (MPI_Isend(&s->xparts[offset_send], counts[ind_send], xpart_mpi_type,
k, 3 * ind_send + 1, MPI_COMM_WORLD,
&reqs[6 * k + 1]) != MPI_SUCCESS)
error("Failed to isend xparts to node %i.", k);
offset_send += counts[ind_send];
}
}
/* Are we sending any gpart ? */
if (g_counts[ind_send] > 0) {
/* message("Sending %d gpart to node %d", g_counts[ind_send], k); */
/* If the send is to the same node, just copy */