diff --git a/examples/test.c b/examples/test.c index aaa8ac160c731336144a82ddb86bb932ecacef0a..b1f49fde9ef2dcf11f8cbe6236fa4a6eee831a35 100644 --- a/examples/test.c +++ b/examples/test.c @@ -730,6 +730,10 @@ int main(int argc, char *argv[]) { if (scaling != 1.0) for (k = 0; k < N; k++) parts[k].h *= scaling; + /* Mark particle nodes */ + for ( k = 0 ; k < N ; k++ ) + parts[k].lastNodeID = myrank + 1000; + /* Apply shift */ if (shift[0] != 0 || shift[1] != 0 || shift[2] != 0) for (k = 0; k < N; k++) { @@ -785,7 +789,7 @@ int main(int argc, char *argv[]) { tic = getticks(); message("nr_nodes is %i.", nr_nodes); engine_init(&e, &s, dt_max, nr_threads, nr_queues, nr_nodes, myrank , - ENGINE_POLICY | engine_policy_steal | engine_policy_paranoid); + ENGINE_POLICY | engine_policy_steal ); if (myrank == 0) message("engine_init took %.3f ms.", ((double)(getticks() - tic)) / CPU_TPS * 1000); @@ -819,12 +823,13 @@ int main(int argc, char *argv[]) { #endif /* Inauguration speech. */ - if (runs < INT_MAX) + if (runs < INT_MAX) { message("starting for %i steps with %i threads and %i queues...", runs, e.nr_threads, e.sched.nr_queues); - else + } else { message("starting for t=%.3e with %i threads and %i queues...", clock, e.nr_threads, e.sched.nr_queues); + } fflush(stdout); /* Legend. */ diff --git a/src/engine.c b/src/engine.c index 2f1a9ef1e358832b4a9f83d474a485cdf58d33ed..5e4a8b80bad869a908712aa9b207ea2d9c06e890 100644 --- a/src/engine.c +++ b/src/engine.c @@ -260,12 +260,18 @@ void engine_redistribute(struct engine *e) { } struct part *parts_new; struct xpart *xparts_new, *xparts = s->xparts; + int nr_parts_new = nr_parts * 1.2; if (posix_memalign((void **)&parts_new, part_align, - sizeof(struct part) * nr_parts * 1.2) != 0 || + sizeof(struct part) * nr_parts_new) != 0 || posix_memalign((void **)&xparts_new, part_align, - sizeof(struct xpart) * nr_parts * 1.2) != 0) + sizeof(struct xpart) * nr_parts_new) != 0) error("Failed to allocate new part data."); + /* Mark particles with creator node. */ + for (int k = 0; k < nr_parts_new; k++ ) { + parts_new[k].lastNodeID = nodeID + 2000; + } + if ( nodeID == 0 ) { message("counts = "); for (int i = 0; i < nr_nodes; i++) { @@ -298,6 +304,12 @@ void engine_redistribute(struct engine *e) { /* Just copy our own parts */ if ( counts[ind_send] > 0 ) { if ( j == nodeID ) { + + /* Mark particles... */ + for (int z =0; z < counts[ind_recv]; z++ ) { + s->parts[offset_send+z].lastNodeID = nodeID + 3000; + } + message("self copy: %ld", counts[ind_recv] * sizeof(struct part)); memcpy(&parts_new[offset_recv], &s->parts[offset_send], sizeof(struct part) * counts[ind_recv]); @@ -307,8 +319,26 @@ void engine_redistribute(struct engine *e) { offset_recv += counts[ind_recv]; } else { - message("send from %i to %i (tag=%i,counts=%li)", kk, j, - ind_send, counts[ind_send] * sizeof(struct part) ); + + /* Mark particles... */ + for (int z =0; z < counts[ind_send]; z++ ) { + s->parts[offset_send+z].lastNodeID = nodeID + 4000; + } + + message("send from %i to %i (tag=%i,counts=%i/%li,offset=%i)", kk, j, + ind_send, counts[ind_send], counts[ind_send] * sizeof(struct part), + offset_send ); + + /* Check destinations are correct. */ + for (int a = 0; a < counts[ind_send]; a++ ) { + int ia = offset_send + a; + int cid = cell_getid( cdim, s->parts[ia].x[0]*ih[0], s->parts[ia].x[1]*ih[1], + s->parts[ia].x[2]*ih[2]); + if ( cells[cid].nodeID != j ) { + message( "Sending particle to wrong node: %d, %d, %d", ia, j, cells[cid].nodeID ); + } + } + res = MPI_Send(&s->parts[offset_send], sizeof(struct part) * counts[ind_send], MPI_BYTE, j, ind_send, MPI_COMM_WORLD ); @@ -318,7 +348,7 @@ void engine_redistribute(struct engine *e) { } res = MPI_Send(&s->xparts[offset_send], sizeof(struct xpart) * counts[ind_send], - MPI_BYTE, j, ind_send, MPI_COMM_WORLD ); + MPI_BYTE, j, ind_send + (nr_nodes*nr_nodes), MPI_COMM_WORLD ); if ( res != MPI_SUCCESS ) { mpi_error(res,"Failed to send xparts to node %i from %i.", j, nodeID); @@ -330,12 +360,13 @@ void engine_redistribute(struct engine *e) { } } else { - message("receives %i from %i (tag=%i,count=%li)", nodeID, kk, - ind_recv, counts[ind_recv] * sizeof(struct part)); - /* Listen for sends from kk. */ if (counts[ind_recv] > 0) { + message("receives %i from %i (tag=%i,count=%i/%li,offset=%i)", nodeID, kk, + ind_recv, counts[ind_recv], counts[ind_recv] * sizeof(struct part), + offset_recv); + MPI_Status status; res = MPI_Recv(&parts_new[offset_recv], sizeof(struct part) * counts[ind_recv], MPI_BYTE, kk, @@ -345,9 +376,45 @@ void engine_redistribute(struct engine *e) { kk, nodeID); } + /* Check destination is correct. */ + int bad = 0; + int minia = -1; + int maxia = -1; + for (int a = 0; a < counts[ind_recv]; a++ ) { + int ia = offset_recv + a; + int cid = cell_getid( cdim, parts_new[ia].x[0]*ih[0], parts_new[ia].x[1]*ih[1], + parts_new[ia].x[2]*ih[2] ); + if ( cells[cid].nodeID != nodeID ) { + bad = 1; + if ( minia == -1 ) minia = ia; + maxia = ia; + } + } + if ( bad ) { + /* Just range to keep output small... */ + int cid = cell_getid( cdim, + parts_new[minia].x[0]*ih[0], + parts_new[minia].x[1]*ih[1], + parts_new[minia].x[2]*ih[2] ); + message( "First particle %d arrived in wrong node: on %d, target %d, from %d", + minia, nodeID, cells[cid].nodeID, parts_new[minia].lastNodeID ); + cid = cell_getid( cdim, + parts_new[maxia].x[0]*ih[0], + parts_new[maxia].x[1]*ih[1], + parts_new[maxia].x[2]*ih[2] ); + message( "Last particle %d arrived in wrong node: on %d, target %d, from %d", + maxia, nodeID, cells[cid].nodeID, parts_new[maxia].lastNodeID ); + + int count = 0; + MPI_Get_count(&status, MPI_CHAR, &count); + message( "Task %d: Received %d char(s), %ld parts, from task %d with tag %d", + nodeID, count, count/sizeof(struct part), + status.MPI_SOURCE, status.MPI_TAG); + } + res = MPI_Recv(&xparts_new[offset_recv], sizeof(struct xpart) * counts[ind_recv], MPI_BYTE, kk, - ind_recv, MPI_COMM_WORLD, &status); + ind_recv + (nr_nodes*nr_nodes), MPI_COMM_WORLD, &status); if ( res != MPI_SUCCESS ) { mpi_error( res, "Failed to irecv of xparts from node %i to %i.", kk, nodeID); @@ -423,9 +490,11 @@ void engine_redistribute(struct engine *e) { for ( int k = 0 ; k < nr_parts ; k++ ) { int cid = cell_getid(cdim, parts_new[k].x[0]*ih[0], parts_new[k].x[1]*ih[1], parts_new[k].x[2]*ih[2]); - if ( cells[cid].nodeID != nodeID ) - error("Received particle (%i) that does not belong here (nodeID=%i).", - k, cells[ cid ].nodeID); + if ( cells[cid].nodeID != nodeID ) { + printParticle_single( &parts_new[k] ); + error("Received particle (%i/%i) that does not belong here (nodeID=%i).", + k, nr_parts, cells[ cid ].nodeID); + } } /* Set the new part data, free the old. */ @@ -434,7 +503,7 @@ void engine_redistribute(struct engine *e) { s->parts = parts_new; s->xparts = xparts_new; s->nr_parts = nr_parts; - s->size_parts = 1.2 * nr_parts; + s->size_parts = nr_parts_new; /* Be verbose about what just happened. */ for (int k = 0; k < nr_cells; k++) @@ -652,6 +721,7 @@ void engine_repartition(struct engine *e) { error("Failed to allreduce edge weights."); /* As of here, only one node needs to compute the partition. */ + int failed = 0; if (nodeID == 0) { /* Check that the edge weights are fully symmetric. */ @@ -684,10 +754,22 @@ void engine_repartition(struct engine *e) { */ /* Make sure there are no zero weights. */ - for (k = 0; k < 26 * nr_cells; k++) - if (weights_e[k] == 0) weights_e[k] = 1; - for (k = 0; k < nr_cells; k++) - if ((weights_v[k] *= vscale) == 0) weights_v[k] = 1; + int zeroweights = 0; + for (k = 0; k < 26 * nr_cells; k++) { + if (weights_e[k] == 0) { + weights_e[k] = 1; + zeroweights++; + } + } + for (k = 0; k < nr_cells; k++) { + if ((weights_v[k] *= vscale) == 0) { + weights_v[k] = 1; + zeroweights++; + } + } + if ( zeroweights > 0 ) { + message( "Seen %d zero weights", zeroweights ); + } /* Allocate and fill the connection array. */ idx_t *offsets; @@ -701,7 +783,7 @@ void engine_repartition(struct engine *e) { METIS_SetDefaultOptions(options); options[METIS_OPTION_OBJTYPE] = METIS_OBJTYPE_CUT; options[METIS_OPTION_NUMBERING] = 0; - options[METIS_OPTION_CONTIG] = 1; + options[METIS_OPTION_CONTIG] = 1; /* Note this option not used by PGR... */ options[METIS_OPTION_NCUTS] = 10; options[METIS_OPTION_NITER] = 20; // options[ METIS_OPTION_UFACTOR ] = 1; @@ -715,7 +797,7 @@ void engine_repartition(struct engine *e) { if (METIS_PartGraphRecursive(&idx_nr_cells, &one, offsets, inds, weights_v, NULL, weights_e, &idx_nr_nodes, NULL, NULL, options, &objval, nodeIDs) != METIS_OK) - error("Call to METIS_PartGraphKway failed."); + error("Call to METIS_PartGraphRecursive failed."); /* Dump the 3d array of cell IDs. */ message( "nodeIDs = reshape( [" ); @@ -731,16 +813,23 @@ void engine_repartition(struct engine *e) { for ( i = 0; i < cdim[0]*cdim[1]*cdim[2] ; i++ ) { present[nodeIDs[i]]++; } - int stop = 0; for ( i = 0; i < nr_nodes; i++ ) { if ( present[i] == 0 ) { - stop = 1; + failed = 1; message( "Node %d is not present after partition", i ); } } - if ( stop ) { - error( "METIS repartition has failed" ); - } + + } + if ( failed ) { + /* XXX in production skip unnecessary next parts, for now go through the + * motions. */ + /* Reset the initial partition and continue with that. */ + for (k = 0; k < nr_cells; k++) { + nodeIDs[k] = cells[k].nodeID; + } + message( "METIS repartition has failed, continuing with " + "current partition" ); } /* Broadcast the result of the partition. */ @@ -992,6 +1081,12 @@ void engine_exchange_cells(struct engine *e) { if (posix_memalign((void **)&s->parts_foreign, part_align, sizeof(struct part) * s->size_parts_foreign) != 0) error("Failed to allocate foreign part data."); + + /* Mark particles... */ + for (int z =0; z < s->size_parts_foreign; z++ ) { + s->parts_foreign[z].lastNodeID = e->nodeID + 5000; + } + } /* Unpack the cells and link to the particle data. */ @@ -1098,6 +1193,11 @@ int engine_exchange_strays(struct engine *e, int offset, int *ind, int N) { free(s->xparts); s->parts = parts_new; s->xparts = xparts_new; + + /* Mark particles... */ + for (int z =0; z < s->size_parts; z++ ) { + s->parts[z].lastNodeID = e->nodeID + 6000; + } } /* Collect the requests for the particle data from the proxies. */ @@ -2265,6 +2365,11 @@ void engine_split(struct engine *e, int *grid) { free(s->xparts); s->parts = parts_new; s->xparts = xparts_new; + + /* Mark particles... */ + for (int z =0; z < s->size_parts; z++ ) { + s->parts[z].lastNodeID = e->nodeID + 7000; + } } /** diff --git a/src/error.h b/src/error.h index 99a8dcfc301b7967df405641c48f5b027b1ecd62..26c2119fb035377e69631fa56fe53574f11238ce 100644 --- a/src/error.h +++ b/src/error.h @@ -65,6 +65,16 @@ extern int engine_rank; fprintf(stderr, "%s\n\n", buf ); \ MPI_Abort(MPI_COMM_WORLD, -1); \ } + +#define mpi_error_string(res,s, ...) \ + { \ + fprintf(stderr, "[%03i] %s:%s():%i: " s "\n", engine_rank, __FILE__, \ + __FUNCTION__, __LINE__, ##__VA_ARGS__); \ + int len = 1024; \ + char buf[len]; \ + MPI_Error_string( res, buf, &len ); \ + fprintf(stderr, "%s\n\n", buf ); \ + } #endif /** @@ -72,11 +82,17 @@ extern int engine_rank; * */ #ifdef WITH_MPI -extern int engine_rank; -#define message(s, ...) \ - printf("[%03i] %s: " s "\n", engine_rank, __FUNCTION__, ##__VA_ARGS__) +#define message(s, ...) \ + { \ + printf("[%03i] %s: " s "\n", engine_rank, __FUNCTION__, ##__VA_ARGS__); \ + fflush(stdout); \ + } #else -#define message(s, ...) printf("%s: " s "\n", __FUNCTION__, ##__VA_ARGS__) +#define message(s, ...) \ + { \ + printf("%s: " s "\n", __FUNCTION__, ##__VA_ARGS__); \ + fflush(stdout); \ + } #endif #endif /* SWIFT_ERROR_H */ diff --git a/src/part.h b/src/part.h index 380c2dedb2d7847c0d0efe937d0b24feb0a736f0..fa464ded7ffc4ef40f7cd92e42f9381cbf8efdf2 100644 --- a/src/part.h +++ b/src/part.h @@ -159,6 +159,9 @@ struct part { /* Particle ID. */ unsigned long long id; + /* ID of last node */ + int lastNodeID; + /* Associated gravitas. */ struct gpart *gpart; diff --git a/src/proxy.c b/src/proxy.c index bafa185cdcaf2100992398657b650a954daceb91..5afa4b8490979268ad49df707929ef42f2bd0c9b 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -114,8 +114,8 @@ void proxy_cells_exch2(struct proxy *p) { p->nodeID * proxy_tag_shift + proxy_tag_cells, MPI_COMM_WORLD, &p->req_cells_in) != MPI_SUCCESS) error("Failed to irecv part data."); -// message( "irecv pcells (%i) on node %i from node %i." , p->size_pcells_in , -// p->mynodeID , p->nodeID ); fflush(stdout); + // message("irecv pcells (%i) on node %i from node %i, tag=%i.", p->size_pcells_in, + // p->mynodeID, p->nodeID, p->nodeID * proxy_tag_shift + proxy_tag_cells ); fflush(stdout); #else error("SWIFT was not compiled with MPI support."); @@ -199,11 +199,16 @@ void proxy_parts_exch1(struct proxy *p) { p->mynodeID * proxy_tag_shift + proxy_tag_count, MPI_COMM_WORLD, &p->req_parts_count_out) != MPI_SUCCESS) error("Failed to isend nr of parts."); - // message( "isent particle count (%i) from node %i to node %i." , - // p->nr_parts_out , p->mynodeID , p->nodeID ); fflush(stdout); + //message( "isent particle count (%i) from node %i to node %i, tag=%i." , + // p->nr_parts_out, p->mynodeID, p->nodeID, + // p->mynodeID * proxy_tag_shift + proxy_tag_count ); fflush(stdout); /* Send the particle buffers. */ if (p->nr_parts_out > 0) { + for ( int k = 0 ; k < p->nr_parts_out ; k++ ) { + p->parts_out[k].lastNodeID = p->nodeID + 6000; + } + if (MPI_Isend(p->parts_out, sizeof(struct part) * p->nr_parts_out, MPI_BYTE, p->nodeID, p->mynodeID * proxy_tag_shift + proxy_tag_parts, MPI_COMM_WORLD, &p->req_parts_out) != MPI_SUCCESS || @@ -212,12 +217,12 @@ void proxy_parts_exch1(struct proxy *p) { p->mynodeID * proxy_tag_shift + proxy_tag_xparts, MPI_COMM_WORLD, &p->req_xparts_out) != MPI_SUCCESS) error("Failed to isend part data."); - // message( "isent particle data (%i) to node %i." , p->nr_parts_out , - // p->nodeID ); fflush(stdout); - for (int k = 0; k < p->nr_parts_out; k++) - message("sending particle %lli, x=[%.3e %.3e %.3e], h=%.3e, to node %i.", - p->parts_out[k].id, p->parts_out[k].x[0], p->parts_out[k].x[1], - p->parts_out[k].x[2], p->parts_out[k].h, p->nodeID); + // message("isent particle data (%i) to node %i, tag=%i." , p->nr_parts_out , + // p->nodeID, p->mynodeID * proxy_tag_shift + proxy_tag_parts); fflush(stdout); + //for (int k = 0; k < p->nr_parts_out; k++) + // message("sending particle %lli, x=[%.3e %.3e %.3e], h=%.3e, to node %i.", + // p->parts_out[k].id, p->parts_out[k].x[0], p->parts_out[k].x[1], + // p->parts_out[k].x[2], p->parts_out[k].h, p->nodeID); } /* Receive the number of particles. */ @@ -225,8 +230,8 @@ void proxy_parts_exch1(struct proxy *p) { p->nodeID * proxy_tag_shift + proxy_tag_count, MPI_COMM_WORLD, &p->req_parts_count_in) != MPI_SUCCESS) error("Failed to irecv nr of parts."); -// message( "irecv particle count on node %i from node %i." , p->mynodeID , -// p->nodeID ); fflush(stdout); + // message("irecv particle count on node %i from node %i, tag=%i." , p->mynodeID , + // p->nodeID, p->nodeID * proxy_tag_shift + proxy_tag_count); fflush(stdout); #else error("SWIFT was not compiled with MPI support."); @@ -260,8 +265,8 @@ void proxy_parts_exch2(struct proxy *p) { p->nodeID, p->nodeID * proxy_tag_shift + proxy_tag_xparts, MPI_COMM_WORLD, &p->req_xparts_in) != MPI_SUCCESS) error("Failed to irecv part data."); - // message( "irecv particle data (%i) from node %i." , p->nr_parts_in , - // p->nodeID ); fflush(stdout); + //message("irecv particle data (%i) from node %i, tag=%i." , p->nr_parts_in , + // p->nodeID, p->nodeID * proxy_tag_shift + proxy_tag_parts); fflush(stdout); } #else diff --git a/src/scheduler.c b/src/scheduler.c index 8b4c9264548552d00c61c7d3fb66884d1a5338ee..bae8cad5d8ba6535a49a611158024944fd75b2f5 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -971,9 +971,9 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { MPI_Error_string(err, buff, &len); error("Failed to emit irecv for particle data (%s).", buff); } - // message( "recieving %i parts with tag=%i from %i to %i." , - // t->ci->count , t->flags , t->ci->nodeID , s->nodeID ); - // fflush(stdout); + //message("recieving %i parts with tag=%i from %i to %i.", + // t->ci->count, t->flags, t->ci->nodeID, s->nodeID); + //fflush(stdout); qid = 1 % s->nr_queues; #else error("SWIFT was not compiled with MPI support."); @@ -981,6 +981,10 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { break; case task_type_send: #ifdef WITH_MPI + for ( int k = 0 ; k < t->ci->count ; k++ ) { + t->ci->parts[k].lastNodeID = s->nodeID + 10000; + } + if ((err = MPI_Isend(t->ci->parts, sizeof(struct part) * t->ci->count, MPI_BYTE, t->cj->nodeID, t->flags, MPI_COMM_WORLD, &t->req)) != MPI_SUCCESS) { @@ -989,9 +993,9 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { MPI_Error_string(err, buff, &len); error("Failed to emit isend for particle data (%s).", buff); } - // message( "sending %i parts with tag=%i from %i to %i." , - // t->ci->count , t->flags , s->nodeID , t->cj->nodeID ); - // fflush(stdout); + //message( "sending %i parts with tag=%i from %i to %i.", + // t->ci->count, t->flags, s->nodeID, t->cj->nodeID ); + //fflush(stdout); qid = 0; #else error("SWIFT was not compiled with MPI support."); diff --git a/src/space.c b/src/space.c index 157850080b148233168be26789d5be0110dc7938..2d960472115ecc178c4bcd125a59a52ffbf1a447 100644 --- a/src/space.c +++ b/src/space.c @@ -376,9 +376,9 @@ void space_rebuild(struct space *s, double cell_max) { ind[k] = cell_getid(cdim, p->x[0] * ih[0], p->x[1] * ih[1], p->x[2] * ih[2]); cells[ind[k]].count += 1; - /* if ( cells[ ind[k] ].nodeID != nodeID ) - error( "Received part that does not belong to me (nodeID=%i)." , cells[ - ind[k] ].nodeID ); */ + if (cells[ind[k]].nodeID != nodeID) + error("Received part that does not belong to me (nodeID=%i)." , + cells[ind[k] ].nodeID); } nr_parts = s->nr_parts; #endif