diff --git a/src/engine.c b/src/engine.c index 84157e0545729ca4426284861a8a1be9fd109dd2..7ebc5418cc73bc03625570df0147c07794ff03e0 100644 --- a/src/engine.c +++ b/src/engine.c @@ -77,6 +77,118 @@ int engine_rank; +/** + * @brief Redistribute the particles amongst the nodes accorind + * to their cell's node IDs. + * + * @param e The #engine. + */ + +void engine_redistribute ( struct engine *e ) { + +#ifdef WITH_MPI + + int i, j, k, cid; + int nr_nodes = e->nr_nodes, nodeID = e->nodeID; + struct space *s = e->s; + int my_cells = 0; + int *cdim = s->cdim; + struct cell *cells = s->cells; + + /* Start by sorting the particles according to their nodes and + getting the counts. */ + int *counts, *dest; + struct part *parts = s->parts; + float ih[3]; + ih[0] = s->ih[0]; ih[1] = s->ih[1]; ih[2] = s->ih[2]; + if ( ( counts = (int *)malloc( sizeof(int) * nr_nodes * nr_nodes ) ) == NULL || + ( dest = (int *)malloc( sizeof(int) * s->nr_parts ) ) == NULL ) + error( "Failed to allocate count and dest buffers." ); + bzero( counts , sizeof(int) * nr_nodes * nr_nodes ); + for ( k = 0 ; k < s->nr_parts ; k++ ) { + cid = cell_getid( cdim , parts[k].x[0]*ih[0] , parts[k].x[1]*ih[1] , parts[k].x[2]*ih[2] ); + dest[k] = cells[ cid ].nodeID; + counts[ nodeID*nr_nodes + dest[k] ] += 1; + } + parts_sort( s->parts , s->xparts , dest , s->nr_parts , 0 , nr_nodes-1 ); + + /* Get all the counts from all the nodes. */ + if ( MPI_Allreduce( MPI_IN_PLACE , counts , nr_nodes * nr_nodes , MPI_INT , MPI_SUM , MPI_COMM_WORLD ) != MPI_SUCCESS ) + error( "Failed to allreduce particle transfer counts." ); + + /* Get the new number of parts for this node, be generous in allocating. */ + int nr_parts = 0; + for ( k = 0 ; k < nr_nodes ; k++ ) + nr_parts += counts[ k*nr_nodes + nodeID ]; + struct part *parts_new; + struct xpart *xparts_new, *xparts = s->xparts; + if ( posix_memalign( (void **)&parts_new , part_align , sizeof(struct part) * nr_parts * 2 ) != 0 || + posix_memalign( (void **)&xparts_new , part_align , sizeof(struct xpart) * nr_parts * 2 ) != 0 ) + error( "Failed to allocate new part data." ); + + /* Emit the sends and recvs for the particle data. */ + MPI_Request *reqs; + if ( ( reqs = (MPI_Request *)malloc( sizeof(MPI_Request) * 4 * nr_nodes ) ) == NULL ) + error( "Failed to allocate MPI request list." ); + for ( k = 0 ; k < 4*nr_nodes ; k++ ) + reqs[k] = MPI_REQUEST_NULL; + for ( i = 0 , j = 0 , k = 0 ; k < nr_nodes ; k++ ) { + if ( k == nodeID && counts[ nodeID*nr_nodes + k ] > 0 ) { + memcpy( &parts_new[j] , &parts[i] , sizeof(struct part) * counts[ k*nr_nodes + nodeID ] ); + memcpy( &xparts_new[j] , &xparts[i] , sizeof(struct xpart) * counts[ k*nr_nodes + nodeID ] ); + i += counts[ nodeID*nr_nodes + k ]; + j += counts[ k*nr_nodes + nodeID ]; + } + if ( k != nodeID && counts[ nodeID*nr_nodes + k ] > 0 ) { + if ( MPI_Isend( &parts[i] , sizeof(struct part) * counts[ nodeID*nr_nodes + k ] , MPI_BYTE , k , 0 , MPI_COMM_WORLD , &reqs[4*k] ) != MPI_SUCCESS ) + error( "Failed to isend parts to node %i." , k ); + if ( MPI_Isend( &xparts[i] , sizeof(struct xpart) * counts[ nodeID*nr_nodes + k ] , MPI_BYTE , k , 1 , MPI_COMM_WORLD , &reqs[4*k+1] ) != MPI_SUCCESS ) + error( "Failed to isend xparts to node %i." , k ); + i += counts[ nodeID*nr_nodes + k ]; + } + if ( k != nodeID && counts[ k*nr_nodes + nodeID ] > 0 ) { + if ( MPI_Irecv( &parts_new[j] , sizeof(struct part) * counts[ k*nr_nodes + nodeID ] , MPI_BYTE , k , 0 , MPI_COMM_WORLD , &reqs[4*k+2] ) != MPI_SUCCESS ) + error( "Failed to emit irecv of parts from node %i." , k ); + if ( MPI_Irecv( &xparts_new[j] , sizeof(struct xpart) * counts[ k*nr_nodes + nodeID ] , MPI_BYTE , k , 1 , MPI_COMM_WORLD , &reqs[4*k+3] ) != MPI_SUCCESS ) + error( "Failed to emit irecv of parts from node %i." , k ); + j += counts[ k*nr_nodes + nodeID ]; + } + } + + /* Wait for all the recvs to tumble in. */ + if ( MPI_Waitall( 4*nr_nodes , reqs , MPI_STATUSES_IGNORE ) != MPI_SUCCESS ) + error( "Failed during waitall for part data." ); + + /* Verify that all parts are in the right place. */ + /* for ( k = 0 ; k < nr_parts ; k++ ) { + cid = cell_getid( cdim , parts_new[k].x[0]*ih[0] , parts_new[k].x[1]*ih[1] , parts_new[k].x[2]*ih[2] ); + if ( cells[ cid ].nodeID != nodeID ) + error( "Received particle (%i) that does not belong here (nodeID=%i)." , k , cells[ cid ].nodeID ); + } */ + + /* Set the new part data, free the old. */ + free( parts ); + free( xparts ); + s->parts = parts_new; + s->xparts = xparts_new; + s->nr_parts = nr_parts; + s->size_parts = 2*nr_parts; + + /* Be verbose about what just happened. */ + message( "node %i now has %i parts in %i cells." , nodeID , nr_parts , my_cells ); + + /* Clean up other stuff. */ + free( reqs ); + free( counts ); + free( dest ); + +#else + error( "SWIFT was not compiled with MPI and METIS support." ); +#endif + + } + + /** * @breif Repartition the cells amongst the nodes. * @@ -315,92 +427,8 @@ void engine_repartition ( struct engine *e ) { the parts array, and emiting the sends and receives. Finally, the space, tasks, and proxies need to be rebuilt. */ - /* Start by sorting the particles according to their nodes and - getting the counts. */ - int *counts, *dest; - struct part *parts = s->parts; - float ih[3]; - ih[0] = s->ih[0]; ih[1] = s->ih[1]; ih[2] = s->ih[2]; - if ( ( counts = (int *)malloc( sizeof(int) * nr_nodes * nr_nodes ) ) == NULL || - ( dest = (int *)malloc( sizeof(int) * s->nr_parts ) ) == NULL ) - error( "Failed to allocate count and dest buffers." ); - bzero( counts , sizeof(int) * nr_nodes * nr_nodes ); - for ( k = 0 ; k < s->nr_parts ; k++ ) { - cid = cell_getid( cdim , parts[k].x[0]*ih[0] , parts[k].x[1]*ih[1] , parts[k].x[2]*ih[2] ); - dest[k] = cells[ cid ].nodeID; - counts[ nodeID*nr_nodes + dest[k] ] += 1; - } - parts_sort( s->parts , s->xparts , dest , s->nr_parts , 0 , nr_nodes-1 ); - - /* Get all the counts from all the nodes. */ - if ( MPI_Allreduce( MPI_IN_PLACE , counts , nr_nodes * nr_nodes , MPI_INT , MPI_SUM , MPI_COMM_WORLD ) != MPI_SUCCESS ) - error( "Failed to allreduce particle transfer counts." ); - - /* Get the new number of parts for this node, be generous in allocating. */ - int nr_parts = 0; - for ( k = 0 ; k < nr_nodes ; k++ ) - nr_parts += counts[ k*nr_nodes + nodeID ]; - struct part *parts_new; - struct xpart *xparts_new, *xparts = s->xparts; - if ( posix_memalign( (void **)&parts_new , part_align , sizeof(struct part) * nr_parts * 2 ) != 0 || - posix_memalign( (void **)&xparts_new , part_align , sizeof(struct xpart) * nr_parts * 2 ) != 0 ) - error( "Failed to allocate new part data." ); - - /* Emit the sends and recvs for the particle data. */ - MPI_Request *reqs; - if ( ( reqs = (MPI_Request *)malloc( sizeof(MPI_Request) * 4 * nr_nodes ) ) == NULL ) - error( "Failed to allocate MPI request list." ); - for ( k = 0 ; k < 4*nr_nodes ; k++ ) - reqs[k] = MPI_REQUEST_NULL; - for ( i = 0 , j = 0 , k = 0 ; k < nr_nodes ; k++ ) { - if ( k == nodeID && counts[ nodeID*nr_nodes + k ] > 0 ) { - memcpy( &parts_new[j] , &parts[i] , sizeof(struct part) * counts[ k*nr_nodes + nodeID ] ); - memcpy( &xparts_new[j] , &xparts[i] , sizeof(struct xpart) * counts[ k*nr_nodes + nodeID ] ); - i += counts[ nodeID*nr_nodes + k ]; - j += counts[ k*nr_nodes + nodeID ]; - } - if ( k != nodeID && counts[ nodeID*nr_nodes + k ] > 0 ) { - if ( MPI_Isend( &parts[i] , sizeof(struct part) * counts[ nodeID*nr_nodes + k ] , MPI_BYTE , k , 0 , MPI_COMM_WORLD , &reqs[4*k] ) != MPI_SUCCESS ) - error( "Failed to isend parts to node %i." , k ); - if ( MPI_Isend( &xparts[i] , sizeof(struct xpart) * counts[ nodeID*nr_nodes + k ] , MPI_BYTE , k , 1 , MPI_COMM_WORLD , &reqs[4*k+1] ) != MPI_SUCCESS ) - error( "Failed to isend xparts to node %i." , k ); - i += counts[ nodeID*nr_nodes + k ]; - } - if ( k != nodeID && counts[ k*nr_nodes + nodeID ] > 0 ) { - if ( MPI_Irecv( &parts_new[j] , sizeof(struct part) * counts[ k*nr_nodes + nodeID ] , MPI_BYTE , k , 0 , MPI_COMM_WORLD , &reqs[4*k+2] ) != MPI_SUCCESS ) - error( "Failed to emit irecv of parts from node %i." , k ); - if ( MPI_Irecv( &xparts_new[j] , sizeof(struct xpart) * counts[ k*nr_nodes + nodeID ] , MPI_BYTE , k , 1 , MPI_COMM_WORLD , &reqs[4*k+3] ) != MPI_SUCCESS ) - error( "Failed to emit irecv of parts from node %i." , k ); - j += counts[ k*nr_nodes + nodeID ]; - } - } - - /* Wait for all the recvs to tumble in. */ - if ( MPI_Waitall( 4*nr_nodes , reqs , MPI_STATUSES_IGNORE ) != MPI_SUCCESS ) - error( "Failed during waitall for part data." ); - - /* Verify that all parts are in the right place. */ - /* for ( k = 0 ; k < nr_parts ; k++ ) { - cid = cell_getid( cdim , parts_new[k].x[0]*ih[0] , parts_new[k].x[1]*ih[1] , parts_new[k].x[2]*ih[2] ); - if ( cells[ cid ].nodeID != nodeID ) - error( "Received particle (%i) that does not belong here (nodeID=%i)." , k , cells[ cid ].nodeID ); - } */ - - /* Set the new part data, free the old. */ - free( parts ); - free( xparts ); - s->parts = parts_new; - s->xparts = xparts_new; - s->nr_parts = nr_parts; - s->size_parts = 2*nr_parts; - - /* Be verbose about what just happened. */ - message( "node %i now has %i parts in %i cells." , nodeID , nr_parts , my_cells ); - - /* Clean up other stuff. */ - free( reqs ); - free( counts ); - free( dest ); + /* Redistribute the particles between the nodes. */ + engine_redistribute( e ); /* Make the proxies. */ engine_makeproxies( e ); diff --git a/src/engine.h b/src/engine.h index 98e0daab1c902e3ed301ad0dc402fc46e77c3f87..08b4876409d58e16f5aab68ff1742c7bc93d042c 100644 --- a/src/engine.h +++ b/src/engine.h @@ -112,3 +112,4 @@ int engine_exchange_strays ( struct engine *e , struct part *parts , struct xpar void engine_rebuild ( struct engine *e ); void engine_repartition ( struct engine *e ); void engine_makeproxies ( struct engine *e ); +void engine_redistribute ( struct engine *e ); diff --git a/src/scheduler.c b/src/scheduler.c index cf7dc046a707ca01204bbb0d35024d569d3fe1cc..62a119ebb938b655242085eeda888f97acc3a2dc 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -667,11 +667,12 @@ void scheduler_reweight ( struct scheduler *s ) { case task_type_kick2: t->weight += t->ci->count; break; - case task_type_send_xv: - case task_type_send_rho: - t->weight *= 2; break; } + if ( t->type == task_type_send_xv || t->type == task_type_send_rho ) + t->weight *= 10; + if ( t->type == task_type_recv_xv || t->type == task_type_recv_rho ) + t->weight *= 2; } // message( "weighting tasks took %.3f ms." , (double)( getticks() - tic ) / CPU_TPS * 1000 );