Commit 16dad332 authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

split-off the particle redistribution as we could use this elsewhere.


Former-commit-id: 694344fc97764270c49352a9785e3e5fddd76a74
parent 52d397ab
......@@ -77,6 +77,118 @@
int engine_rank;
/**
* @brief Redistribute the particles amongst the nodes accorind
* to their cell's node IDs.
*
* @param e The #engine.
*/
void engine_redistribute ( struct engine *e ) {
#ifdef WITH_MPI
int i, j, k, cid;
int nr_nodes = e->nr_nodes, nodeID = e->nodeID;
struct space *s = e->s;
int my_cells = 0;
int *cdim = s->cdim;
struct cell *cells = s->cells;
/* Start by sorting the particles according to their nodes and
getting the counts. */
int *counts, *dest;
struct part *parts = s->parts;
float ih[3];
ih[0] = s->ih[0]; ih[1] = s->ih[1]; ih[2] = s->ih[2];
if ( ( counts = (int *)malloc( sizeof(int) * nr_nodes * nr_nodes ) ) == NULL ||
( dest = (int *)malloc( sizeof(int) * s->nr_parts ) ) == NULL )
error( "Failed to allocate count and dest buffers." );
bzero( counts , sizeof(int) * nr_nodes * nr_nodes );
for ( k = 0 ; k < s->nr_parts ; k++ ) {
cid = cell_getid( cdim , parts[k].x[0]*ih[0] , parts[k].x[1]*ih[1] , parts[k].x[2]*ih[2] );
dest[k] = cells[ cid ].nodeID;
counts[ nodeID*nr_nodes + dest[k] ] += 1;
}
parts_sort( s->parts , s->xparts , dest , s->nr_parts , 0 , nr_nodes-1 );
/* Get all the counts from all the nodes. */
if ( MPI_Allreduce( MPI_IN_PLACE , counts , nr_nodes * nr_nodes , MPI_INT , MPI_SUM , MPI_COMM_WORLD ) != MPI_SUCCESS )
error( "Failed to allreduce particle transfer counts." );
/* Get the new number of parts for this node, be generous in allocating. */
int nr_parts = 0;
for ( k = 0 ; k < nr_nodes ; k++ )
nr_parts += counts[ k*nr_nodes + nodeID ];
struct part *parts_new;
struct xpart *xparts_new, *xparts = s->xparts;
if ( posix_memalign( (void **)&parts_new , part_align , sizeof(struct part) * nr_parts * 2 ) != 0 ||
posix_memalign( (void **)&xparts_new , part_align , sizeof(struct xpart) * nr_parts * 2 ) != 0 )
error( "Failed to allocate new part data." );
/* Emit the sends and recvs for the particle data. */
MPI_Request *reqs;
if ( ( reqs = (MPI_Request *)malloc( sizeof(MPI_Request) * 4 * nr_nodes ) ) == NULL )
error( "Failed to allocate MPI request list." );
for ( k = 0 ; k < 4*nr_nodes ; k++ )
reqs[k] = MPI_REQUEST_NULL;
for ( i = 0 , j = 0 , k = 0 ; k < nr_nodes ; k++ ) {
if ( k == nodeID && counts[ nodeID*nr_nodes + k ] > 0 ) {
memcpy( &parts_new[j] , &parts[i] , sizeof(struct part) * counts[ k*nr_nodes + nodeID ] );
memcpy( &xparts_new[j] , &xparts[i] , sizeof(struct xpart) * counts[ k*nr_nodes + nodeID ] );
i += counts[ nodeID*nr_nodes + k ];
j += counts[ k*nr_nodes + nodeID ];
}
if ( k != nodeID && counts[ nodeID*nr_nodes + k ] > 0 ) {
if ( MPI_Isend( &parts[i] , sizeof(struct part) * counts[ nodeID*nr_nodes + k ] , MPI_BYTE , k , 0 , MPI_COMM_WORLD , &reqs[4*k] ) != MPI_SUCCESS )
error( "Failed to isend parts to node %i." , k );
if ( MPI_Isend( &xparts[i] , sizeof(struct xpart) * counts[ nodeID*nr_nodes + k ] , MPI_BYTE , k , 1 , MPI_COMM_WORLD , &reqs[4*k+1] ) != MPI_SUCCESS )
error( "Failed to isend xparts to node %i." , k );
i += counts[ nodeID*nr_nodes + k ];
}
if ( k != nodeID && counts[ k*nr_nodes + nodeID ] > 0 ) {
if ( MPI_Irecv( &parts_new[j] , sizeof(struct part) * counts[ k*nr_nodes + nodeID ] , MPI_BYTE , k , 0 , MPI_COMM_WORLD , &reqs[4*k+2] ) != MPI_SUCCESS )
error( "Failed to emit irecv of parts from node %i." , k );
if ( MPI_Irecv( &xparts_new[j] , sizeof(struct xpart) * counts[ k*nr_nodes + nodeID ] , MPI_BYTE , k , 1 , MPI_COMM_WORLD , &reqs[4*k+3] ) != MPI_SUCCESS )
error( "Failed to emit irecv of parts from node %i." , k );
j += counts[ k*nr_nodes + nodeID ];
}
}
/* Wait for all the recvs to tumble in. */
if ( MPI_Waitall( 4*nr_nodes , reqs , MPI_STATUSES_IGNORE ) != MPI_SUCCESS )
error( "Failed during waitall for part data." );
/* Verify that all parts are in the right place. */
/* for ( k = 0 ; k < nr_parts ; k++ ) {
cid = cell_getid( cdim , parts_new[k].x[0]*ih[0] , parts_new[k].x[1]*ih[1] , parts_new[k].x[2]*ih[2] );
if ( cells[ cid ].nodeID != nodeID )
error( "Received particle (%i) that does not belong here (nodeID=%i)." , k , cells[ cid ].nodeID );
} */
/* Set the new part data, free the old. */
free( parts );
free( xparts );
s->parts = parts_new;
s->xparts = xparts_new;
s->nr_parts = nr_parts;
s->size_parts = 2*nr_parts;
/* Be verbose about what just happened. */
message( "node %i now has %i parts in %i cells." , nodeID , nr_parts , my_cells );
/* Clean up other stuff. */
free( reqs );
free( counts );
free( dest );
#else
error( "SWIFT was not compiled with MPI and METIS support." );
#endif
}
/**
* @breif Repartition the cells amongst the nodes.
*
......@@ -315,92 +427,8 @@ void engine_repartition ( struct engine *e ) {
the parts array, and emiting the sends and receives.
Finally, the space, tasks, and proxies need to be rebuilt. */
/* Start by sorting the particles according to their nodes and
getting the counts. */
int *counts, *dest;
struct part *parts = s->parts;
float ih[3];
ih[0] = s->ih[0]; ih[1] = s->ih[1]; ih[2] = s->ih[2];
if ( ( counts = (int *)malloc( sizeof(int) * nr_nodes * nr_nodes ) ) == NULL ||
( dest = (int *)malloc( sizeof(int) * s->nr_parts ) ) == NULL )
error( "Failed to allocate count and dest buffers." );
bzero( counts , sizeof(int) * nr_nodes * nr_nodes );
for ( k = 0 ; k < s->nr_parts ; k++ ) {
cid = cell_getid( cdim , parts[k].x[0]*ih[0] , parts[k].x[1]*ih[1] , parts[k].x[2]*ih[2] );
dest[k] = cells[ cid ].nodeID;
counts[ nodeID*nr_nodes + dest[k] ] += 1;
}
parts_sort( s->parts , s->xparts , dest , s->nr_parts , 0 , nr_nodes-1 );
/* Get all the counts from all the nodes. */
if ( MPI_Allreduce( MPI_IN_PLACE , counts , nr_nodes * nr_nodes , MPI_INT , MPI_SUM , MPI_COMM_WORLD ) != MPI_SUCCESS )
error( "Failed to allreduce particle transfer counts." );
/* Get the new number of parts for this node, be generous in allocating. */
int nr_parts = 0;
for ( k = 0 ; k < nr_nodes ; k++ )
nr_parts += counts[ k*nr_nodes + nodeID ];
struct part *parts_new;
struct xpart *xparts_new, *xparts = s->xparts;
if ( posix_memalign( (void **)&parts_new , part_align , sizeof(struct part) * nr_parts * 2 ) != 0 ||
posix_memalign( (void **)&xparts_new , part_align , sizeof(struct xpart) * nr_parts * 2 ) != 0 )
error( "Failed to allocate new part data." );
/* Emit the sends and recvs for the particle data. */
MPI_Request *reqs;
if ( ( reqs = (MPI_Request *)malloc( sizeof(MPI_Request) * 4 * nr_nodes ) ) == NULL )
error( "Failed to allocate MPI request list." );
for ( k = 0 ; k < 4*nr_nodes ; k++ )
reqs[k] = MPI_REQUEST_NULL;
for ( i = 0 , j = 0 , k = 0 ; k < nr_nodes ; k++ ) {
if ( k == nodeID && counts[ nodeID*nr_nodes + k ] > 0 ) {
memcpy( &parts_new[j] , &parts[i] , sizeof(struct part) * counts[ k*nr_nodes + nodeID ] );
memcpy( &xparts_new[j] , &xparts[i] , sizeof(struct xpart) * counts[ k*nr_nodes + nodeID ] );
i += counts[ nodeID*nr_nodes + k ];
j += counts[ k*nr_nodes + nodeID ];
}
if ( k != nodeID && counts[ nodeID*nr_nodes + k ] > 0 ) {
if ( MPI_Isend( &parts[i] , sizeof(struct part) * counts[ nodeID*nr_nodes + k ] , MPI_BYTE , k , 0 , MPI_COMM_WORLD , &reqs[4*k] ) != MPI_SUCCESS )
error( "Failed to isend parts to node %i." , k );
if ( MPI_Isend( &xparts[i] , sizeof(struct xpart) * counts[ nodeID*nr_nodes + k ] , MPI_BYTE , k , 1 , MPI_COMM_WORLD , &reqs[4*k+1] ) != MPI_SUCCESS )
error( "Failed to isend xparts to node %i." , k );
i += counts[ nodeID*nr_nodes + k ];
}
if ( k != nodeID && counts[ k*nr_nodes + nodeID ] > 0 ) {
if ( MPI_Irecv( &parts_new[j] , sizeof(struct part) * counts[ k*nr_nodes + nodeID ] , MPI_BYTE , k , 0 , MPI_COMM_WORLD , &reqs[4*k+2] ) != MPI_SUCCESS )
error( "Failed to emit irecv of parts from node %i." , k );
if ( MPI_Irecv( &xparts_new[j] , sizeof(struct xpart) * counts[ k*nr_nodes + nodeID ] , MPI_BYTE , k , 1 , MPI_COMM_WORLD , &reqs[4*k+3] ) != MPI_SUCCESS )
error( "Failed to emit irecv of parts from node %i." , k );
j += counts[ k*nr_nodes + nodeID ];
}
}
/* Wait for all the recvs to tumble in. */
if ( MPI_Waitall( 4*nr_nodes , reqs , MPI_STATUSES_IGNORE ) != MPI_SUCCESS )
error( "Failed during waitall for part data." );
/* Verify that all parts are in the right place. */
/* for ( k = 0 ; k < nr_parts ; k++ ) {
cid = cell_getid( cdim , parts_new[k].x[0]*ih[0] , parts_new[k].x[1]*ih[1] , parts_new[k].x[2]*ih[2] );
if ( cells[ cid ].nodeID != nodeID )
error( "Received particle (%i) that does not belong here (nodeID=%i)." , k , cells[ cid ].nodeID );
} */
/* Set the new part data, free the old. */
free( parts );
free( xparts );
s->parts = parts_new;
s->xparts = xparts_new;
s->nr_parts = nr_parts;
s->size_parts = 2*nr_parts;
/* Be verbose about what just happened. */
message( "node %i now has %i parts in %i cells." , nodeID , nr_parts , my_cells );
/* Clean up other stuff. */
free( reqs );
free( counts );
free( dest );
/* Redistribute the particles between the nodes. */
engine_redistribute( e );
/* Make the proxies. */
engine_makeproxies( e );
......
......@@ -112,3 +112,4 @@ int engine_exchange_strays ( struct engine *e , struct part *parts , struct xpar
void engine_rebuild ( struct engine *e );
void engine_repartition ( struct engine *e );
void engine_makeproxies ( struct engine *e );
void engine_redistribute ( struct engine *e );
......@@ -667,11 +667,12 @@ void scheduler_reweight ( struct scheduler *s ) {
case task_type_kick2:
t->weight += t->ci->count;
break;
case task_type_send_xv:
case task_type_send_rho:
t->weight *= 2;
break;
}
if ( t->type == task_type_send_xv || t->type == task_type_send_rho )
t->weight *= 10;
if ( t->type == task_type_recv_xv || t->type == task_type_recv_rho )
t->weight *= 2;
}
// message( "weighting tasks took %.3f ms." , (double)( getticks() - tic ) / CPU_TPS * 1000 );
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment