diff --git a/src/Makefile.am b/src/Makefile.am index a1683de16484b121fe2d1497aa585bf19b58a795..2fa352cf478bf5c9c77ae9ed34ba9e509099553a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -20,8 +20,8 @@ AUTOMAKE_OPTIONS=gnu # Add the debug flag to the whole thing -AM_CFLAGS = -g -O3 -Wall -Werror -ffast-math -fstrict-aliasing -ftree-vectorize \ - -funroll-loops $(SIMD_FLAGS) $(OPENMP_CFLAGS) \ +AM_CFLAGS = -g -O3 -std=gnu99 -Wall -Werror -ffast-math -fstrict-aliasing \ + -ftree-vectorize -funroll-loops $(SIMD_FLAGS) $(OPENMP_CFLAGS) \ -DTIMER -DCOUNTER -DCPU_TPS=2.30e9 # AM_CFLAGS = -Wall -Werror $(OPENMP_CFLAGS) \ # -DTIMER -DCOUNTER -DCPU_TPS=2.67e9 @@ -31,10 +31,24 @@ AM_LDFLAGS = $(LAPACK_LIBS) $(BLAS_LIBS) $(HDF5_LDFLAGS) -version-info 0:0:0 # Build the libswiftsim library lib_LTLIBRARIES = libswiftsim.la -libswiftsim_la_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \ - io.c timers.c debug.c scheduler.c +# Build a MPI-enabled version too? +if HAVEMPI +lib_LTLIBRARIES += libswiftsim_mpi.la +endif # List required headers include_HEADERS = space.h runner.h queue.h task.h lock.h cell.h part.h const.h \ - engine.h swift.h io.h timers.h debug.h scheduler.h + engine.h swift.h io.h timers.h debug.h scheduler.h proxy.h + +# Common source files +AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \ + io.c timers.c debug.c scheduler.c proxy.c + +# Sources and flags for regular library +libswiftsim_la_SOURCES = $(AM_SOURCES) + +# Sources and flags for MPI library +libswiftsim_mpi_la_SOURCES = $(AM_SOURCES) +libswiftsim_mpi_la_CFLAGS = $(AM_CFLAGS) -DWITH_MPI +libswiftsim_mpi_la_SHORTNAME = mpi diff --git a/src/cell.c b/src/cell.c index e11edbda77d1560bfab8d1398410e342214181f0..23895acbbf7b914a1057c2c1928344698668c632 100644 --- a/src/cell.c +++ b/src/cell.c @@ -29,22 +29,144 @@ #include <limits.h> #include <math.h> +/* MPI headers. */ +#ifdef WITH_MPI + #include <mpi.h> +#endif + /* Switch off timers. */ #ifdef TIMER #undef TIMER #endif /* Local headers. */ +#include "const.h" #include "cycle.h" #include "lock.h" #include "task.h" #include "timers.h" #include "part.h" +#include "space.h" #include "cell.h" #include "error.h" #include "inline.h" +/** + * @brief Get the size of the cell subtree. + * + * @param c The #cell. + */ + +int cell_getsize ( struct cell *c ) { + + int k, count = 1; + + /* Sum up the progeny if split. */ + if ( c->split ) + for ( k = 0 ; k < 8 ; k++ ) + if ( c->progeny[k] != NULL ) + count += cell_getsize( c->progeny[k] ); + + /* Return the final count. */ + return count; + + } + + +/** + * @brief Unpack the data of a given cell and its sub-cells. + * + * @param pc An array of packed #pcell. + * @param c The #cell in which to unpack the #pcell. + * @param s The #space in which the cells are created. + * @param parts The #part array holding the particle data. + * + * @return The number of cells created. + */ + +int cell_unpack ( struct pcell *pc , struct cell *c , struct space *s , struct part *parts ) { + + int k, count = 1; + struct cell *temp; + + /* Unpack the current pcell. */ + c->h_max = pc->h_max; + c->dt_min = pc->dt_min; + c->dt_max = pc->dt_max; + c->count = pc->count; + c->parts = parts; + + /* Fill the progeny recursively, depth-first. */ + for ( k = 0 ; k < 8 ; k++ ) + if ( pc->progeny[k] >= 0 ) { + temp = space_getcell( s ); + temp->count = 0; + temp->loc[0] = c->loc[0]; + temp->loc[1] = c->loc[1]; + temp->loc[2] = c->loc[2]; + temp->h[0] = c->h[0]/2; + temp->h[1] = c->h[1]/2; + temp->h[2] = c->h[2]/2; + temp->dmin = c->dmin/2; + if ( k & 4 ) + temp->loc[0] += temp->h[0]; + if ( k & 2 ) + temp->loc[1] += temp->h[1]; + if ( k & 1 ) + temp->loc[2] += temp->h[2]; + temp->depth = c->depth + 1; + temp->split = 0; + temp->dx_max = 0.0; + temp->nodeID = c->nodeID; + temp->parent = c; + c->progeny[k] = temp; + c->split = 1; + count += cell_unpack( &pc[ pc->progeny[k] ] , temp , s , parts ); + parts = &parts[ temp->count ]; + } + + /* Return the total number of unpacked cells. */ + return count; + + } + + +/** + * @brief Pack the data of the given cell and all it's sub-cells. + * + * @param c The #cell. + * @param pc Pointer to an array of packed cells in which the + * cells will be packed. + * + * @return The number of packed cells. + */ + +int cell_pack ( struct cell *c , struct pcell *pc ) { + + int k, count = 1; + + /* Start by packing the data of the current cell. */ + pc->h_max = c->h_max; + pc->dt_min = c->dt_min; + pc->dt_max = c->dt_max; + pc->count = c->count; + + /* Fill in the progeny, depth-first recursion. */ + for ( k = 0 ; k < 8 ; k++ ) + if ( c->progeny[k] != NULL ) { + pc->progeny[k] = count; + count += cell_pack( c->progeny[k] , &pc[count] ); + } + else + pc->progeny[k] = -1; + + /* Return the number of packed cells used. */ + return count; + + } + + /** * @brief Lock a cell and hold its parents. * @@ -195,7 +317,7 @@ void cell_split ( struct cell *c ) { } /* for ( int kk = left[k] ; kk <= j ; kk++ ) if ( parts[kk].x[1] > pivot[1] ) { - printf( "cell_split: ival=[%i,%i], i=%i, j=%i.\n" , left[k] , right[k] , i , j ); + message( "ival=[%i,%i], i=%i, j=%i." , left[k] , right[k] , i , j ); error( "sorting failed (left)." ); } for ( int kk = i ; kk <= right[k] ; kk++ ) @@ -220,12 +342,12 @@ void cell_split ( struct cell *c ) { } /* for ( int kk = left[k] ; kk <= j ; kk++ ) if ( parts[kk].x[2] > pivot[2] ) { - printf( "cell_split: ival=[%i,%i], i=%i, j=%i.\n" , left[k] , right[k] , i , j ); + message( "ival=[%i,%i], i=%i, j=%i." , left[k] , right[k] , i , j ); error( "sorting failed (left)." ); } for ( int kk = i ; kk <= right[k] ; kk++ ) if ( parts[kk].x[2] < pivot[2] ) { - printf( "cell_split: ival=[%i,%i], i=%i, j=%i.\n" , left[k] , right[k] , i , j ); + message( "ival=[%i,%i], i=%i, j=%i." , left[k] , right[k] , i , j ); error( "sorting failed (right)." ); } */ left[2*k+1] = i; right[2*k+1] = right[k]; diff --git a/src/cell.h b/src/cell.h index b282ecf17970414b809895ad029cf153ddf3e507..793dc9b3572b7c4ebf86c9b2c008f0b2ca3b94b4 100644 --- a/src/cell.h +++ b/src/cell.h @@ -21,6 +21,21 @@ #define cell_sid_dt 13 +/* Packed cell. */ +struct pcell { + + /* Stats on this cell's particles. */ + double h_max, dt_min, dt_max; + + /* Number of particles in this cell. */ + int count; + + /* Relative indices of the cell's progeny. */ + int progeny[8]; + + }; + + /* Structure to store the data of a single cell. */ struct cell { @@ -78,12 +93,15 @@ struct cell { int sortsize; /* The tasks computing this cell's density. */ - struct task *density[27]; - int nr_density; + struct task *density[27], *force[27]; + int nr_density, nr_force; /* The ghost task to link density to interactions. */ struct task *ghost, *kick1, *kick2; + /* Task receiving data. */ + struct task *recv_xv, *recv_rho; + /* Number of tasks that are associated with this cell. */ int nr_tasks; @@ -111,6 +129,16 @@ struct cell { /* Linking pointer for "memory management". */ struct cell *next; + /* ID of the node this cell lives on. */ + int nodeID; + + /* Bit mask of the proxies this cell is registered with. */ + unsigned int sendto; + + /* Pointer to this cell's packed representation. */ + struct pcell *pcell; + int pcell_size; + } __attribute__((aligned (64))); @@ -118,3 +146,6 @@ struct cell { void cell_split ( struct cell *c ); int cell_locktree( struct cell *c ); void cell_unlocktree( struct cell *c ); +int cell_pack ( struct cell *c , struct pcell *pc ); +int cell_unpack ( struct pcell *pc , struct cell *c , struct space *s , struct part *parts ); +int cell_getsize ( struct cell *c ); diff --git a/src/engine.c b/src/engine.c index c8e264282c182def6de25dbe236bec0ffc6d5361..7db4d657167c03738abb2007de5f94485ff20e57 100644 --- a/src/engine.c +++ b/src/engine.c @@ -32,7 +32,13 @@ #include <omp.h> #include <sched.h> +/* MPI headers. */ +#ifdef WITH_MPI + #include <mpi.h> +#endif + /* Local headers. */ +#include "const.h" #include "cycle.h" #include "atomic.h" #include "timers.h" @@ -42,12 +48,13 @@ #include "task.h" #include "part.h" #include "debug.h" -#include "cell.h" #include "space.h" +#include "cell.h" #include "queue.h" #include "scheduler.h" #include "engine.h" #include "runner.h" +#include "proxy.h" #include "error.h" #ifdef LEGACY_GADGET2_SPH @@ -61,6 +68,285 @@ #define cell_getid( cdim , i , j , k ) ( (int)(k) + (cdim)[2]*( (int)(j) + (cdim)[1]*(int)(i) ) ) +/** The rank of the engine as a global variable (for messages). */ +int engine_rank; + + +/** + * @brief Add send tasks to a hierarchy of cells. + * + * @param e The #engine. + * @param c The #cell. + * @param t_xv The send_xv #task, if it has already been created. + * @param t_rho The send_rho #task, if it has already been created. + */ + +void engine_addtasks_send ( struct engine *e , struct cell *ci , struct cell *cj ) { + + int k, tag; + + /* Check if any of the density tasks are for the target node. */ + for ( k = 0 ; k < ci->nr_density ; k++ ) + if ( ci->density[k]->ci->nodeID == cj->nodeID || + ( ci->density[k]->cj != NULL && ci->density[k]->cj->nodeID == cj->nodeID ) ) + break; + + /* If so, attach send tasks. */ + if ( k < ci->nr_density ) { + + /* Compute the cell's tag. */ + tag = (int)( ci->loc[0] / e->s->dim[0] * 1024 ) + + ((int)( ci->loc[1] / e->s->dim[1] * 1024 ) << 10 ) + + ((int)( ci->loc[2] / e->s->dim[2] * 1024 ) << 20 ); + tag = tag*2; + + /* Create the tasks. */ + struct task *t_xv = scheduler_addtask( &e->sched , task_type_send_xv , task_subtype_none , tag , 0 , ci , cj , 0 ); + struct task *t_rho = scheduler_addtask( &e->sched , task_type_send_rho , task_subtype_none , tag + 1 , 0 , ci , cj , 0 ); + + /* The send_rho task depends on the cell's ghost task. */ + task_addunlock( ci->ghost , t_rho ); + + /* The send_rho task should unlock the super-cell's kick2 task. */ + task_addunlock( t_rho , ci->super->kick2 ); + + /* The send_xv task should unlock the super-cell's ghost task. */ + task_addunlock( t_xv , ci->super->ghost ); + + } + + /* Recurse? */ + else if ( ci->split ) + for ( k = 0 ; k < 8 ; k++ ) + if ( ci->progeny[k] != NULL ) + engine_addtasks_send( e , ci->progeny[k] , cj ); + + } + + +/** + * @brief Add recv tasks to a hierarchy of cells. + * + * @param e The #engine. + * @param c The #cell. + * @param t_xv The recv_xv #task, if it has already been created. + * @param t_rho The recv_rho #task, if it has already been created. + */ + +void engine_addtasks_recv ( struct engine *e , struct cell *c , struct task *t_xv , struct task *t_rho ) { + + int k, tag; + + /* Do we need to construct a recv task? */ + if ( t_xv != NULL || c->nr_density > 0 ) { + + /* Compute the cell's tag. */ + tag = (int)( c->loc[0] / e->s->dim[0] * 1024 ) + + ((int)( c->loc[1] / e->s->dim[1] * 1024 ) << 10 ) + + ((int)( c->loc[2] / e->s->dim[2] * 1024 ) << 20 ); + tag = tag*2; + + /* Create the tasks. */ + c->recv_xv = scheduler_addtask( &e->sched , task_type_recv_xv , task_subtype_none , tag , 0 , c , NULL , 0 ); + c->recv_rho = scheduler_addtask( &e->sched , task_type_recv_rho , task_subtype_none , tag + 1 , 0 , c , NULL , 0 ); + + /* If there has been a higher-up recv task, then these tasks + are implicit and depend on the higher-up task. */ + if ( t_xv != NULL ) { + task_addunlock( c->parent->recv_xv , c->recv_xv ); + task_addunlock( c->parent->recv_rho , c->recv_rho ); + c->recv_xv->implicit = 1; + c->recv_rho->implicit = 1; + } + else { + t_xv = c->recv_xv; + t_rho = c->recv_rho; + } + + /* Add dependencies if there are density/force tasks. */ + for ( k = 0 ; k < c->nr_density ; k++ ) { + task_addunlock( c->recv_xv , c->density[k] ); + task_addunlock( c->density[k] , t_rho ); + } + for ( k = 0 ; k < c->nr_force ; k++ ) + task_addunlock( c->recv_rho , c->force[k] ); + if ( c->sorts != NULL ) + task_addunlock( c->recv_xv , c->sorts ); + + } + + /* Recurse? */ + if ( c->split ) + for ( k = 0 ; k < 8 ; k++ ) + if ( c->progeny[k] != NULL ) + engine_addtasks_recv( e , c->progeny[k] , t_xv , t_rho ); + + } + + +/** + * @brief Exchange cell structures with other nodes. + * + * @param e The #engine. + */ + +void engine_exchange_cells ( struct engine *e ) { + +#ifdef WITH_MPI + + int j, k, pid, count = 0; + struct pcell *pcells; + struct cell *cells = e->s->cells; + int nr_cells = e->s->nr_cells; + int offset[ nr_cells ]; + MPI_Request reqs[27]; + MPI_Status status; + struct part *parts = &e->s->parts[ e->s->nr_parts ]; + + /* Run through the cells and get the size of the ones that will be sent off. */ + for ( k = 0 ; k < nr_cells ; k++ ) { + offset[k] = count; + if ( cells[k].sendto ) + count += ( cells[k].pcell_size = cell_getsize( &cells[k] ) ); + } + + /* Allocate the pcells. */ + if ( ( pcells = (struct pcell *)malloc( sizeof(struct pcell) * count ) ) == NULL ) + error( "Failed to allocate pcell buffer." ); + + /* Pack the cells. */ + for ( k = 0 ; k < nr_cells ; k++ ) + if ( cells[k].sendto ) { + cell_pack( &cells[k] , &pcells[ offset[k] ] ); + cells[k].pcell = &pcells[ offset[k] ]; + } + + /* Launch the proxies. */ + for ( k = 0 ; k < e->nr_proxies ; k++ ) { + proxy_cells_exch1( &e->proxies[k] ); + reqs[k] = e->proxies[k].req_cells_count_in; + } + + /* Wait for each count to come in and start the recv. */ + for ( k = 0 ; k < e->nr_proxies ; k++ ) { + if ( MPI_Waitany( e->nr_proxies , reqs , &pid , &status ) != MPI_SUCCESS || + pid == MPI_UNDEFINED ) + error( "MPI_Waitany failed." ); + // message( "request from proxy %i has arrived." , pid ); + reqs[pid] = MPI_REQUEST_NULL; + proxy_cells_exch2( &e->proxies[pid] ); + } + + /* Set the requests for the cells. */ + for ( k = 0 ; k < e->nr_proxies ; k++ ) + reqs[k] = e->proxies[k].req_cells_in; + + /* Wait for each pcell array to come in from the proxies. */ + for ( k = 0 ; k < e->nr_proxies ; k++ ) { + if ( MPI_Waitany( e->nr_proxies , reqs , &pid , &status ) != MPI_SUCCESS || + pid == MPI_UNDEFINED ) + error( "MPI_Waitany failed." ); + // message( "request from proxy %i has arrived." , pid ); + reqs[pid] = MPI_REQUEST_NULL; + for ( count = 0 , j = 0 ; j < e->proxies[pid].nr_cells_in ; j++ ) { + count += cell_unpack( &e->proxies[pid].pcells_in[count] , e->proxies[pid].cells_in[j] , e->s , parts ); + parts = &parts[ e->proxies[pid].cells_in[j]->count ]; + } + } + + /* Free the pcell buffer. */ + free( pcells ); + +#else + error( "SWIFT was not compiled with MPI support." ); +#endif + + } + + +/** + * @brief Exchange straying parts with other nodes. + * + * @param e The #engine. + * @param parts An array of straying parts. + * @param xparts The corresponding xparts. + * @param ind The ID of the foreign #cell. + * @param N The number of stray parts. + * + * @return The number of arrived parts copied to parts and xparts. + */ + +int engine_exchange_strays ( struct engine *e , struct part *parts , struct xpart *xparts , int *ind , int N ) { + +#ifdef WITH_MPI + + int k, pid, count = 0; + MPI_Request reqs[27]; + MPI_Status status; + struct proxy *p; + + /* Re-set the proxies. */ + for ( k = 0 ; k < e->nr_proxies ; k++ ) + e->proxies[k].nr_parts_out = 0; + + /* Put the parts into the corresponding proxies. */ + for ( k = 0 ; k < N ; k++ ) { + pid = e->proxy_ind[ e->s->cells[ ind[k] ].nodeID ]; + if ( pid < 0 ) + error( "Do not have a proxy for the requested nodeID." ); + proxy_parts_load( &e->proxies[pid] , &parts[k] , &xparts[k] , 1 ); + } + + /* Launch the proxies. */ + for ( k = 0 ; k < e->nr_proxies ; k++ ) { + proxy_parts_exch1( &e->proxies[k] ); + reqs[k] = e->proxies[k].req_parts_count_in; + } + + /* Wait for each count to come in and start the recv. */ + for ( k = 0 ; k < e->nr_proxies ; k++ ) { + if ( MPI_Waitany( e->nr_proxies , reqs , &pid , &status ) != MPI_SUCCESS || + pid == MPI_UNDEFINED ) + error( "MPI_Waitany failed." ); + // message( "request from proxy %i has arrived." , pid ); + reqs[pid] = MPI_REQUEST_NULL; + proxy_parts_exch2( &e->proxies[pid] ); + } + + /* Set the requests for the particle data. */ + for ( k = 0 ; k < e->nr_proxies ; k++ ) + reqs[k] = e->proxies[k].req_xparts_in; + + /* Wait for each part array to come in and collect the new + parts from the proxies. */ + for ( k = 0 ; k < e->nr_proxies ; k++ ) { + if ( MPI_Waitany( e->nr_proxies , reqs , &pid , &status ) != MPI_SUCCESS || + pid == MPI_UNDEFINED ) + error( "MPI_Waitany failed." ); + // message( "request from proxy %i has arrived." , pid ); + p = &e->proxies[pid]; + reqs[pid] = MPI_REQUEST_NULL; + MPI_Request_free( &p->req_parts_in ); + memcpy( &parts[count] , p->parts_in , sizeof(struct part) * p->nr_parts_in ); + memcpy( &xparts[count] , p->xparts_in , sizeof(struct xpart) * p->nr_parts_in ); + count += p->nr_parts_in; + /* for ( int k = 0 ; k < p->nr_parts_in ; k++ ) + message( "received particle %lli, x=[%.3e %.3e %.3e], h=%.3e, from node %i." , + p->parts_in[k].id , p->parts_in[k].x[0] , p->parts_in[k].x[1] , p->parts_in[k].x[2] , + p->parts_in[k].h , p->nodeID ); */ + } + + /* Return the number of harvested parts. */ + return count; + +#else + error( "SWIFT was not compiled with MPI support." ); + return 0; +#endif + + } + + /** * @brief Fill the #space's task list. * @@ -89,7 +375,8 @@ void engine_maketasks ( struct engine *e ) { ci = &s->cells[cid]; if ( ci->count == 0 ) continue; - scheduler_addtask( sched , task_type_self , task_subtype_density , 0 , 0 , ci , NULL , 0 ); + if ( ci->nodeID == e->nodeID ) + scheduler_addtask( sched , task_type_self , task_subtype_density , 0 , 0 , ci , NULL , 0 ); for ( ii = -1 ; ii < 2 ; ii++ ) { iii = i + ii; if ( !s->periodic && ( iii < 0 || iii >= cdim[0] ) ) @@ -107,7 +394,8 @@ void engine_maketasks ( struct engine *e ) { kkk = ( kkk + cdim[2] ) % cdim[2]; cjd = cell_getid( cdim , iii , jjj , kkk ); cj = &s->cells[cjd]; - if ( cid >= cjd || cj->count == 0 ) + if ( cid >= cjd || cj->count == 0 || + ( ci->nodeID != e->nodeID && cj->nodeID != e->nodeID ) ) continue; sid = sortlistID[ (kk+1) + 3*( (jj+1) + 3*(ii+1) ) ]; t = scheduler_addtask( sched , task_type_pair , task_subtype_density , sid , 0 , ci , cj , 1 ); @@ -185,37 +473,73 @@ void engine_maketasks ( struct engine *e ) { t2 = scheduler_addtask( sched , task_type_self , task_subtype_force , 0 , 0 , t->ci , NULL , 0 ); task_addunlock( t->ci->ghost , t2 ); task_addunlock( t2 , t->ci->super->kick2 ); + t->ci->force[ atomic_inc( &t->ci->nr_force ) ] = t2; } /* Otherwise, pair interaction? */ else if ( t->type == task_type_pair && t->subtype == task_subtype_density ) { - task_addunlock( t , t->ci->super->ghost ); - if ( t->ci->super != t->cj->super ) - task_addunlock( t , t->cj->super->ghost ); t2 = scheduler_addtask( sched , task_type_pair , task_subtype_force , 0 , 0 , t->ci , t->cj , 0 ); - task_addunlock( t->ci->ghost , t2 ); - task_addunlock( t->cj->ghost , t2 ); - task_addunlock( t2 , t->ci->super->kick2 ); - if ( t->ci->super != t->cj->super ) - task_addunlock( t2 , t->cj->super->kick2 ); + if ( t->ci->nodeID == e->nodeID ) { + task_addunlock( t->ci->ghost , t2 ); + task_addunlock( t , t->ci->super->ghost ); + task_addunlock( t2 , t->ci->super->kick2 ); + } + if ( t->cj->nodeID == e->nodeID ) { + task_addunlock( t->cj->ghost , t2 ); + if ( t->ci->super != t->cj->super ) { + task_addunlock( t , t->cj->super->ghost ); + task_addunlock( t2 , t->cj->super->kick2 ); + } + } + t->ci->force[ atomic_inc( &t->ci->nr_force ) ] = t2; + t->cj->force[ atomic_inc( &t->cj->nr_force ) ] = t2; } /* Otherwise, sub interaction? */ else if ( t->type == task_type_sub && t->subtype == task_subtype_density ) { - task_addunlock( t , t->ci->super->ghost ); - if ( t->cj != NULL && t->ci->super != t->cj->super ) - task_addunlock( t , t->cj->super->ghost ); t2 = scheduler_addtask( sched , task_type_sub , task_subtype_force , t->flags , 0 , t->ci , t->cj , 0 ); - task_addunlock( t->ci->ghost , t2 ); - if ( t->cj != NULL ) + if ( t->ci->nodeID == e->nodeID ) { + task_addunlock( t , t->ci->super->ghost ); + task_addunlock( t->ci->ghost , t2 ); + task_addunlock( t2 , t->ci->super->kick2 ); + } + if ( t->cj != NULL && t->cj->nodeID == e->nodeID ) { task_addunlock( t->cj->ghost , t2 ); - task_addunlock( t2 , t->ci->super->kick2 ); - if ( t->cj != NULL && t->ci->super != t->cj->super ) - task_addunlock( t2 , t->cj->super->kick2 ); + if ( t->ci->super != t->cj->super ) { + task_addunlock( t , t->cj->super->ghost ); + task_addunlock( t2 , t->cj->super->kick2 ); + } + } + t->ci->force[ atomic_inc( &t->ci->nr_force ) ] = t2; + if ( t->cj != NULL ) + t->cj->force[ atomic_inc( &t->cj->nr_force ) ] = t2; } } + /* Add the communication tasks if MPI is being used. */ + #ifdef WITH_MPI + + /* Loop over the proxies. */ + for ( int pid = 0 ; pid < e->nr_proxies ; pid++ ) { + + /* Get a handle on the proxy. */ + struct proxy *p = &e->proxies[pid]; + + /* Loop through the proxy's incomming cells and add the + recv tasks. */ + for ( k = 0 ; k < p->nr_cells_in ; k++ ) + engine_addtasks_recv( e , p->cells_in[k] , NULL , NULL ); + + /* Loop through the proxy's outgoing cells and add the + send tasks. */ + for ( k = 0 ; k < p->nr_cells_in ; k++ ) + engine_addtasks_send( e , p->cells_out[k] , p->cells_in[0] ); + + } + + #endif + /* Rank the tasks. */ scheduler_ranktasks( sched ); @@ -348,7 +672,7 @@ int engine_marktasks ( struct engine *e ) { } - // printf( "engine_marktasks: took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 ); + // message( "took %.3f ms." , (double)(getticks() - tic)/CPU_TPS*1000 ); /* All is well... */ return 0; @@ -371,27 +695,42 @@ void engine_prepare ( struct engine *e ) { /* Run through the tasks and mark as skip or not. */ // tic = getticks(); - rebuild = ( e->step == 0 || engine_marktasks( e ) ); - // printf( "engine_prepare: space_marktasks took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 ); + rebuild = ( e->step % 3 == 0 || engine_marktasks( e ) ); + // message( "space_marktasks took %.3f ms." , (double)(getticks() - tic)/CPU_TPS*1000 ); + /* Collect the values of rebuild from all nodes. */ + #ifdef WITH_MPI + int buff; + if ( MPI_Allreduce( &rebuild , &buff , 1 , MPI_INT , MPI_MAX , MPI_COMM_WORLD ) != MPI_SUCCESS ) + error( "Failed to aggreggate the rebuild flag accross nodes." ); + rebuild = buff; + #endif + /* Did this not go through? */ if ( rebuild ) { /* Re-build the space. */ // tic = getticks(); space_rebuild( e->s , 0.0 ); - // printf( "engine_prepare: space_rebuild took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 ); + // message( "space_rebuild took %.3f ms." , (double)(getticks() - tic)/CPU_TPS*1000 ); + + /* If in parallel, exchange the cell structure. */ + #ifdef WITH_MPI + // tic = getticks(); + engine_exchange_cells( e ); + // message( "engine_exchange_cells took %.3f ms." , (double)(getticks() - tic)/CPU_TPS*1000 ); + #endif /* Re-build the tasks. */ // tic = getticks(); engine_maketasks( e ); - // printf( "engine_prepare: engine_maketasks took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 ); + // message( "engine_maketasks took %.3f ms." , (double)(getticks() - tic)/CPU_TPS*1000 ); /* Run through the tasks and mark as skip or not. */ // tic = getticks(); if ( engine_marktasks( e ) ) error( "engine_marktasks failed after space_rebuild." ); - // printf( "engine_prepare: engine_marktasks took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 ); + // message( "engine_marktasks took %.3f ms." , (double)(getticks() - tic)/CPU_TPS*1000 ); /* Count the number of each task type. */ int counts[ task_type_count+1 ]; @@ -402,10 +741,15 @@ void engine_prepare ( struct engine *e ) { counts[ (int)sched->tasks[k].type ] += 1; else counts[ task_type_count ] += 1; - printf( "engine_prepare: task counts are [ %s=%i" , taskID_names[0] , counts[0] ); + #ifdef WITH_MPI + printf( "engine_prepare[%03i]: task counts are [ %s=%i" , e->nodeID , taskID_names[0] , counts[0] ); + #else + printf( "engine_prepare: task counts are [ %s=%i" , taskID_names[0] , counts[0] ); + #endif for ( k = 1 ; k < task_type_count ; k++ ) printf( " %s=%i" , taskID_names[k] , counts[k] ); - printf( " skipped=%i ]\n" , counts[ task_type_count ] ); fflush(stdout); + printf( " skipped=%i ]\n" , counts[ task_type_count ] ); fflush(stdout); + message( "nr_parts = %i." , e->s->nr_parts ); } @@ -416,8 +760,12 @@ void engine_prepare ( struct engine *e ) { (1 << task_type_pair) | (1 << task_type_sub) | (1 << task_type_ghost) | - (1 << task_type_kick2) ); - // printf( "engine_prepare: scheduler_start took %.3f ms.\n" , (double)(getticks() - tic2)/CPU_TPS*1000 ); + (1 << task_type_kick2) | + (1 << task_type_send_xv) | + (1 << task_type_recv_xv) | + (1 << task_type_send_rho) | + (1 << task_type_recv_rho) ); + // message( "scheduler_start took %.3f ms." , (double)(getticks() - tic2)/CPU_TPS*1000 ); TIMER_TOC( timer_prepare ); @@ -514,105 +862,105 @@ void engine_collect_kick2 ( struct cell *c ) { * @brief Compute the force on a single particle brute-force. */ -void engine_single_density ( double *dim , long long int pid , struct part *__restrict__ parts , int N , int periodic ) { - - int i, k; - double r2, dx[3]; - float fdx[3], ih; - struct part p; - - /* Find "our" part. */ - for ( k = 0 ; k < N && parts[k].id != pid ; k++ ); - if ( k == N ) - error( "Part not found." ); - p = parts[k]; - - /* Clear accumulators. */ - ih = 1.0f / p.h; - p.rho = 0.0f; p.rho_dh = 0.0f; - p.density.wcount = 0.0f; p.density.wcount_dh = 0.0f; - p.density.div_v = 0.0; - for ( k=0 ; k < 3 ; k++) - p.density.curl_v[k] = 0.0; - - /* Loop over all particle pairs (force). */ - for ( k = 0 ; k < N ; k++ ) { - if ( parts[k].id == p.id ) - continue; - for ( i = 0 ; i < 3 ; i++ ) { - dx[i] = p.x[i] - parts[k].x[i]; - if ( periodic ) { - if ( dx[i] < -dim[i]/2 ) - dx[i] += dim[i]; - else if ( dx[i] > dim[i]/2 ) - dx[i] -= dim[i]; - } - fdx[i] = dx[i]; - } - r2 = fdx[0]*fdx[0] + fdx[1]*fdx[1] + fdx[2]*fdx[2]; - if ( r2 < p.h*p.h*kernel_gamma2 ) { - runner_iact_nonsym_density( r2 , fdx , p.h , parts[k].h , &p , &parts[k] ); - } - } - - /* Dump the result. */ - p.rho = ih * ih * ih * ( p.rho + p.mass*kernel_root ); - p.rho_dh = p.rho_dh * ih * ih * ih * ih; - p.density.wcount = ( p.density.wcount + kernel_root ) * ( 4.0f / 3.0 * M_PI * kernel_gamma3 ); - printf( "pairs_single_density: part %lli (h=%e) has wcount=%e, rho=%e, rho_dh=%e.\n" , p.id , p.h , p.density.wcount , p.rho , p.rho_dh ); - fflush(stdout); - - } - +// void engine_single_density ( double *dim , long long int pid , struct part *__restrict__ parts , int N , int periodic ) { +// +// int i, k; +// double r2, dx[3]; +// float fdx[3], ih; +// struct part p; +// +// /* Find "our" part. */ +// for ( k = 0 ; k < N && parts[k].id != pid ; k++ ); +// if ( k == N ) +// error( "Part not found." ); +// p = parts[k]; +// +// /* Clear accumulators. */ +// ih = 1.0f / p.h; +// p.rho = 0.0f; p.rho_dh = 0.0f; +// p.density.wcount = 0.0f; p.density.wcount_dh = 0.0f; +// p.density.div_v = 0.0; +// for ( k=0 ; k < 3 ; k++) +// p.density.curl_v[k] = 0.0; +// +// /* Loop over all particle pairs (force). */ +// for ( k = 0 ; k < N ; k++ ) { +// if ( parts[k].id == p.id ) +// continue; +// for ( i = 0 ; i < 3 ; i++ ) { +// dx[i] = p.x[i] - parts[k].x[i]; +// if ( periodic ) { +// if ( dx[i] < -dim[i]/2 ) +// dx[i] += dim[i]; +// else if ( dx[i] > dim[i]/2 ) +// dx[i] -= dim[i]; +// } +// fdx[i] = dx[i]; +// } +// r2 = fdx[0]*fdx[0] + fdx[1]*fdx[1] + fdx[2]*fdx[2]; +// if ( r2 < p.h*p.h*kernel_gamma2 ) { +// runner_iact_nonsym_density( r2 , fdx , p.h , parts[k].h , &p , &parts[k] ); +// } +// } +// +// /* Dump the result. */ +// p.rho = ih * ih * ih * ( p.rho + p.mass*kernel_root ); +// p.rho_dh = p.rho_dh * ih * ih * ih * ih; +// p.density.wcount = ( p.density.wcount + kernel_root ) * ( 4.0f / 3.0 * M_PI * kernel_gamma3 ); +// message( "part %lli (h=%e) has wcount=%e, rho=%e, rho_dh=%e." , p.id , p.h , p.density.wcount , p.rho , p.rho_dh ); +// fflush(stdout); +// +// } -void engine_single_force ( double *dim , long long int pid , struct part *__restrict__ parts , int N , int periodic ) { - int i, k; - double r2, dx[3]; - float fdx[3]; - struct part p; - - /* Find "our" part. */ - for ( k = 0 ; k < N && parts[k].id != pid ; k++ ); - if ( k == N ) - error( "Part not found." ); - p = parts[k]; - - /* Clear accumulators. */ - p.a[0] = 0.0f; p.a[1] = 0.0f; p.a[2] = 0.0f; - p.force.u_dt = 0.0f; p.force.h_dt = 0.0f; p.force.v_sig = 0.0f; - - /* Loop over all particle pairs (force). */ - for ( k = 0 ; k < N ; k++ ) { - // for ( k = N-1 ; k >= 0 ; k-- ) { - if ( parts[k].id == p.id ) - continue; - for ( i = 0 ; i < 3 ; i++ ) { - dx[i] = p.x[i] - parts[k].x[i]; - if ( periodic ) { - if ( dx[i] < -dim[i]/2 ) - dx[i] += dim[i]; - else if ( dx[i] > dim[i]/2 ) - dx[i] -= dim[i]; - } - fdx[i] = dx[i]; - } - r2 = fdx[0]*fdx[0] + fdx[1]*fdx[1] + fdx[2]*fdx[2]; - if ( r2 < p.h*p.h*kernel_gamma2 || r2 < parts[k].h*parts[k].h*kernel_gamma2 ) { - p.a[0] = 0.0f; p.a[1] = 0.0f; p.a[2] = 0.0f; - p.force.u_dt = 0.0f; p.force.h_dt = 0.0f; p.force.v_sig = 0.0f; - runner_iact_nonsym_force( r2 , fdx , p.h , parts[k].h , &p , &parts[k] ); - double dvdr = ( (p.v[0]-parts[k].v[0])*fdx[0] + (p.v[1]-parts[k].v[1])*fdx[1] + (p.v[2]-parts[k].v[2])*fdx[2] ) / sqrt(r2); - printf( "pairs_single_force: part %lli and %lli interact (r=%.3e,dvdr=%.3e) with a=[%.3e,%.3e,%.3e], dudt=%.3e.\n" , - p.id , parts[k].id , sqrt(r2) , dvdr , p.a[0] , p.a[1], p.a[2] , p.force.u_dt ); - } - } - - /* Dump the result. */ - // printf( "pairs_single_force: part %lli (h=%e) has a=[%.3e,%.3e,%.3e], udt=%e.\n" , p.id , p.h , p.a[0] , p.a[1] , p.a[2] , p.force.u_dt ); - fflush(stdout); - - } +// void engine_single_force ( double *dim , long long int pid , struct part *__restrict__ parts , int N , int periodic ) { +// +// int i, k; +// double r2, dx[3]; +// float fdx[3]; +// struct part p; +// +// /* Find "our" part. */ +// for ( k = 0 ; k < N && parts[k].id != pid ; k++ ); +// if ( k == N ) +// error( "Part not found." ); +// p = parts[k]; +// +// /* Clear accumulators. */ +// p.a[0] = 0.0f; p.a[1] = 0.0f; p.a[2] = 0.0f; +// p.force.u_dt = 0.0f; p.force.h_dt = 0.0f; p.force.v_sig = 0.0f; +// +// /* Loop over all particle pairs (force). */ +// for ( k = 0 ; k < N ; k++ ) { +// // for ( k = N-1 ; k >= 0 ; k-- ) { +// if ( parts[k].id == p.id ) +// continue; +// for ( i = 0 ; i < 3 ; i++ ) { +// dx[i] = p.x[i] - parts[k].x[i]; +// if ( periodic ) { +// if ( dx[i] < -dim[i]/2 ) +// dx[i] += dim[i]; +// else if ( dx[i] > dim[i]/2 ) +// dx[i] -= dim[i]; +// } +// fdx[i] = dx[i]; +// } +// r2 = fdx[0]*fdx[0] + fdx[1]*fdx[1] + fdx[2]*fdx[2]; +// if ( r2 < p.h*p.h*kernel_gamma2 || r2 < parts[k].h*parts[k].h*kernel_gamma2 ) { +// p.a[0] = 0.0f; p.a[1] = 0.0f; p.a[2] = 0.0f; +// p.force.u_dt = 0.0f; p.force.h_dt = 0.0f; p.force.v_sig = 0.0f; +// runner_iact_nonsym_force( r2 , fdx , p.h , parts[k].h , &p , &parts[k] ); +// double dvdr = ( (p.v[0]-parts[k].v[0])*fdx[0] + (p.v[1]-parts[k].v[1])*fdx[1] + (p.v[2]-parts[k].v[2])*fdx[2] ) / sqrt(r2); +// message( "part %lli and %lli interact (r=%.3e,dvdr=%.3e) with a=[%.3e,%.3e,%.3e], dudt=%.3e." , +// p.id , parts[k].id , sqrt(r2) , dvdr , p.a[0] , p.a[1], p.a[2] , p.force.u_dt ); +// } +// } +// +// /* Dump the result. */ +// // message( "part %lli (h=%e) has a=[%.3e,%.3e,%.3e], udt=%e." , p.id , p.h , p.a[0] , p.a[1] , p.a[2] , p.force.u_dt ); +// fflush(stdout); +// +// } /** @@ -670,7 +1018,7 @@ void engine_step ( struct engine *e ) { /* Set the maximum dt. */ e->dt_step = dt_step; e->s->dt_step = dt_step; - // printf( "engine_step: dt_step set to %.3e (dt=%.3e).\n" , dt_step , e->dt ); fflush(stdout); + // message( "dt_step set to %.3e (dt=%.3e)." , dt_step , e->dt ); fflush(stdout); // printParticle( parts , 432626 ); @@ -711,17 +1059,40 @@ void engine_step ( struct engine *e ) { // printParticle( e->s->parts , 8328423931905 , e->s->nr_parts ); /* Collect the cell data from the second kick. */ - for ( k = 0 ; k < s->nr_cells ; k++ ) { - c = &s->cells[k]; - engine_collect_kick2( c ); - dt_min = fminf( dt_min , c->dt_min ); - dt_max = fmaxf( dt_max , c->dt_max ); - ekin += c->ekin; - epot += c->epot; - count += c->updated; - mom[0] += c->mom[0]; mom[1] += c->mom[1]; mom[2] += c->mom[2]; - ang[0] += c->ang[0]; ang[1] += c->ang[1]; ang[2] += c->ang[2]; - } + for ( k = 0 ; k < s->nr_cells ; k++ ) + if ( s->cells[k].nodeID == e->nodeID ) { + c = &s->cells[k]; + engine_collect_kick2( c ); + dt_min = fminf( dt_min , c->dt_min ); + dt_max = fmaxf( dt_max , c->dt_max ); + ekin += c->ekin; + epot += c->epot; + count += c->updated; + mom[0] += c->mom[0]; mom[1] += c->mom[1]; mom[2] += c->mom[2]; + ang[0] += c->ang[0]; ang[1] += c->ang[1]; ang[2] += c->ang[2]; + } + + /* Aggregate the data from the different nodes. */ + #ifdef WITH_MPI + double in[3], out[3]; + out[0] = dt_min; + if ( MPI_Allreduce( out , in , 1 , MPI_DOUBLE , MPI_MIN , MPI_COMM_WORLD ) != MPI_SUCCESS ) + error( "Failed to aggregate dt_min." ); + dt_min = in[0]; + out[0] = dt_max; + if ( MPI_Allreduce( out , in , 1 , MPI_DOUBLE , MPI_MAX , MPI_COMM_WORLD ) != MPI_SUCCESS ) + error( "Failed to aggregate dt_max." ); + dt_max = in[0]; + out[0] = count; out[1] = ekin; out[2] = epot; + if ( MPI_Allreduce( out , in , 3 , MPI_DOUBLE , MPI_SUM , MPI_COMM_WORLD ) != MPI_SUCCESS ) + error( "Failed to aggregate energies." ); + count = in[0]; ekin = in[1]; epot = in[2]; + /* int nr_parts; + if ( MPI_Allreduce( &s->nr_parts , &nr_parts , 1 , MPI_INT , MPI_SUM , MPI_COMM_WORLD ) != MPI_SUCCESS ) + error( "Failed to aggregate particle count." ); + if ( e->nodeID == 0 ) + message( "nr_parts=%i." , nr_parts ); */ + #endif e->dt_min = dt_min; e->dt_max = dt_max; @@ -729,11 +1100,11 @@ void engine_step ( struct engine *e ) { e->ekin = ekin; e->epot = epot; // printParticle( e->s->parts , 382557 , e->s->nr_parts ); - // printf( "engine_step: dt_min/dt_max is %e/%e.\n" , dt_min , dt_max ); fflush(stdout); - // printf( "engine_step: etot is %e (ekin=%e, epot=%e).\n" , ekin+epot , ekin , epot ); fflush(stdout); - // printf( "engine_step: total momentum is [ %e , %e , %e ].\n" , mom[0] , mom[1] , mom[2] ); fflush(stdout); - // printf( "engine_step: total angular momentum is [ %e , %e , %e ].\n" , ang[0] , ang[1] , ang[2] ); fflush(stdout); - // printf( "engine_step: updated %i parts (dt_step=%.3e).\n" , count , dt_step ); fflush(stdout); + // message( "dt_min/dt_max is %e/%e." , dt_min , dt_max ); fflush(stdout); + // message( "etot is %e (ekin=%e, epot=%e)." , ekin+epot , ekin , epot ); fflush(stdout); + // message( "total momentum is [ %e , %e , %e ]." , mom[0] , mom[1] , mom[2] ); fflush(stdout); + // message( "total angular momentum is [ %e , %e , %e ]." , ang[0] , ang[1] , ang[2] ); fflush(stdout); + // message( "updated %i parts (dt_step=%.3e)." , count , dt_step ); fflush(stdout); /* Increase the step. */ e->step += 1; @@ -764,20 +1135,20 @@ void engine_step ( struct engine *e ) { s->parts[k].dt = dt; s->xparts[k].dt_curr = dt; } - // printf( "engine_step: dt_min=%.3e, adjusting time step to dt=%e.\n" , dt_min , e->dt ); + // message( "dt_min=%.3e, adjusting time step to dt=%e." , dt_min , e->dt ); } else { while ( dt_min < dt ) { dt *= 0.5; e->step *= 2; e->nullstep *= 2; - // printf( "engine_step: dt_min dropped below time step, adjusting to dt=%e.\n" , e->dt ); + // message( "dt_min dropped below time step, adjusting to dt=%e." , e->dt ); } while ( dt_min > 2*dt && (e->step & 1) == 0 ) { dt *= 2.0; e->step /= 2; e->nullstep /= 2; - // printf( "engine_step: dt_min is larger than twice the time step, adjusting to dt=%e.\n" , e->dt ); + // message( "dt_min is larger than twice the time step, adjusting to dt=%e." , e->dt ); } } } @@ -791,6 +1162,138 @@ void engine_step ( struct engine *e ) { } +/** + * @brief Split the underlying space according to the given grid. + * + * @param e The #engine. + * @param grid The grid. + */ + +void engine_split ( struct engine *e , int *grid ) { + + int i, j, k, ii, jj, kk, jd; + float scale[3]; + int cid, cjd, pid, ind[3], jnd[3], *cdim = e->s->cdim; + struct space *s = e->s; + struct cell *c; + struct part *p; + + /* If we've got the wrong number of nodes, fail. */ + if ( e->nr_nodes != grid[0]*grid[1]*grid[2] ) + error( "Grid size does not match number of nodes." ); + + /* Prepare the proxies and the proxy index. */ + if ( e->proxy_ind != NULL ) + free( e->proxy_ind ); + if ( ( e->proxy_ind = (int *)malloc( sizeof(int) * e->nr_nodes ) ) == NULL ) + error( "Failed to allocate proxy index." ); + for ( k = 0 ; k < e->nr_nodes ; k++ ) + e->proxy_ind[k] = -1; + e->nr_proxies = 0; + + /* Get the scale. */ + for ( j = 0 ; j < 3 ; j++ ) + scale[j] = ((float)grid[j]) / s->cdim[j]; + + /* Run through the cells and set their nodeID. */ + for ( k = 0 ; k < s->nr_cells ; k++ ) { + c = &s->cells[k]; + for ( j = 0 ; j < 3 ; j++ ) + ind[j] = c->loc[j] * s->ih[j] * scale[j]; + c->nodeID = ind[0] + grid[0]*( ind[1] + grid[1]*ind[2] ); + } + + /* Identify the neighbours of this proxy. */ + ind[0] = e->nodeID % grid[0]; + ind[1] = ( e->nodeID / grid[0] ) % grid[1]; + ind[2] = e->nodeID / ( grid[0]*grid[1] ); + message( "node %i is [ %i %i %i ] on grid [ %i %i %i ]." , + e->nodeID , ind[0] , ind[1] , ind[2] , grid[0] , grid[1] , grid[2] ); + for ( i = -1 ; i <= 1 ; i++ ) { + jnd[0] = ind[0] + i; + if ( jnd[0] < 0 ) jnd[0] += grid[0]; + if ( jnd[0] >= grid[0] ) jnd[0] -= grid[0]; + for ( j = -1 ; j <= 1 ; j++ ) { + jnd[1] = ind[1] + j; + if ( jnd[1] < 0 ) jnd[1] += grid[1]; + if ( jnd[1] >= grid[1] ) jnd[1] -= grid[1]; + for ( k = -1 ; k <= 1 ; k++ ) { + jnd[2] = ind[2] + k; + if ( jnd[2] < 0 ) jnd[2] += grid[2]; + if ( jnd[2] >= grid[2] ) jnd[2] -= grid[2]; + + /* Are ind and jnd the same node? */ + jd = jnd[0] + grid[0]*( jnd[1] + grid[1]*jnd[2] ); + if ( jd == e->nodeID ) + continue; + + /* Add jnd? */ + if ( e->proxy_ind[jd] < 0 ) { + proxy_init( &e->proxies[ e->nr_proxies ] , e->nodeID , jd ); + e->proxy_ind[jd] = e->nr_proxies; + e->nr_proxies += 1; + } + + } + } + } + + /* Identify the neighbouring highest-level cells and add them to + the respective proxies. */ + for ( ind[0] = 0 ; ind[0] < cdim[0] ; ind[0]++ ) + for ( ind[1] = 0 ; ind[1] < cdim[1] ; ind[1]++ ) + for ( ind[2] = 0 ; ind[2] < cdim[2] ; ind[2]++ ) { + cid = cell_getid( cdim , ind[0] , ind[1] , ind[2] ); + for ( i = -1 ; i <= 1 ; i++ ) { + ii = ind[0] + i; + if ( ii >= cdim[0] ) + ii -= cdim[0]; + else if ( ii < 0 ) + ii += cdim[0]; + for ( j = -1 ; j <= 1 ; j++ ) { + jj = ind[1] + j; + if ( jj >= cdim[1] ) + jj -= cdim[1]; + else if ( jj < 0 ) + jj += cdim[1]; + for ( k = -1 ; k <= 1 ; k++ ) { + kk = ind[2] + k; + if ( kk >= cdim[2] ) + kk -= cdim[2]; + else if ( kk < 0 ) + kk += cdim[2]; + cjd = cell_getid( cdim , ii , jj , kk ); + if ( s->cells[cid].nodeID == e->nodeID && s->cells[cjd].nodeID != e->nodeID ) { + pid = e->proxy_ind[ s->cells[cjd].nodeID ]; + proxy_addcell_in( &e->proxies[pid] , &s->cells[cjd] ); + proxy_addcell_out( &e->proxies[pid] , &s->cells[cid] ); + s->cells[cid].sendto |= ( 1 << pid ); + } + if ( s->cells[cjd].nodeID == e->nodeID && s->cells[cid].nodeID != e->nodeID ) { + pid = e->proxy_ind[ s->cells[cid].nodeID ]; + proxy_addcell_in( &e->proxies[pid] , &s->cells[cid] ); + proxy_addcell_out( &e->proxies[pid] , &s->cells[cjd] ); + s->cells[cjd].sendto |= ( 1 << pid ); + } + } + } + } + } + + /* For now, just kill any particle outside of our grid. */ + for ( k = 0 ; k < s->nr_parts ; k++ ) { + p = &s->parts[k]; + if ( s->cells[ cell_getid( s->cdim , p->x[0]*s->ih[0] , p->x[1]*s->ih[1] , p->x[2]*s->ih[2] ) ].nodeID != e->nodeID ) { + s->nr_parts -= 1; + s->parts[k] = s->parts[ s->nr_parts ]; + s->xparts[k] = s->xparts[ s->nr_parts ]; + k -= 1; + } + } + + } + + /** * @brief init an engine with the given number of threads, queues, and * the given policy. @@ -803,7 +1306,7 @@ void engine_step ( struct engine *e ) { * @param policy The queueing policy to use. */ -void engine_init ( struct engine *e , struct space *s , float dt , int nr_threads , int nr_queues , int policy ) { +void engine_init ( struct engine *e , struct space *s , float dt , int nr_threads , int nr_queues , int nr_nodes , int nodeID , int policy ) { int k; float dt_min = dt; @@ -821,7 +1324,11 @@ void engine_init ( struct engine *e , struct space *s , float dt , int nr_thread for ( i = 1 ; i < nr_cores ; i *= 2 ) for ( j = nr_cores / i / 2 ; j < nr_cores ; j += nr_cores / i ) cpuid[k++] = j; - printf( "engine_init: cpu map is [ " ); + #ifdef WITHMPI + printf( "engine_init: cpu map is [ " ); + #else + printf( "engine_init[%03i]: cpu map is [ " , nodeID ); + #endif for ( i = 0 ; i < nr_cores ; i++ ) printf( "%i " , cpuid[i] ); printf( "].\n" ); @@ -835,6 +1342,26 @@ void engine_init ( struct engine *e , struct space *s , float dt , int nr_thread e->step = 0; e->nullstep = 0; e->time = 0.0; + e->nr_nodes = nr_nodes; + e->nodeID = nodeID; + e->proxy_ind = NULL; + e->nr_proxies = 0; + engine_rank = nodeID; + + /* Make the space link back to the engine. */ + s->e = e; + + /* Are we doing stuff in parallel? */ + if ( nr_nodes > 1 ) { + #if !defined(HAVE_MPI) || !defined(WITH_MPI) + error( "SWIFT was not compiled with MPI support." ); + #endif + e->policy |= engine_policy_mpi; + if ( ( e->proxies = (struct proxy *)malloc( sizeof(struct proxy) * 26 ) ) == NULL ) + error( "Failed to allocate memory for proxies." ); + bzero( e->proxies , sizeof(struct proxy) * 26 ); + e->nr_proxies = 0; + } /* First of all, init the barrier and lock it. */ if ( pthread_mutex_init( &e->barrier_mutex , NULL ) != 0 ) @@ -860,7 +1387,7 @@ void engine_init ( struct engine *e , struct space *s , float dt , int nr_thread e->dt = dt; /* Init the scheduler. */ - scheduler_init( &e->sched , e->s , nr_queues , scheduler_flag_steal ); + scheduler_init( &e->sched , e->s , nr_queues , scheduler_flag_steal , e->nodeID ); s->nr_queues = nr_queues; /* Append a kick1 task to each cell. */ @@ -877,28 +1404,33 @@ void engine_init ( struct engine *e , struct space *s , float dt , int nr_thread e->barrier_running += 1; if ( pthread_create( &e->runners[k].thread , NULL , &runner_main , &e->runners[k] ) != 0 ) error( "Failed to create runner thread." ); - #if defined(HAVE_SETAFFINITY) - - /* Set a reasonable queue ID. */ - e->runners[k].cpuid = cpuid[ k % nr_cores ]; - if ( nr_queues < nr_threads ) - e->runners[k].qid = cpuid[ k % nr_cores ] * nr_queues / nr_cores; - else - e->runners[k].qid = k; - - /* Set the cpu mask to zero | e->id. */ - CPU_ZERO( &cpuset ); - CPU_SET( cpuid[ k % nr_cores ] , &cpuset ); + if ( e->policy & engine_policy_setaffinity ) { + #if defined(HAVE_SETAFFINITY) - /* Apply this mask to the runner's pthread. */ - if ( pthread_setaffinity_np( e->runners[k].thread , sizeof(cpu_set_t) , &cpuset ) != 0 ) - error( "Failed to set thread affinity." ); - - #else + /* Set a reasonable queue ID. */ + e->runners[k].cpuid = cpuid[ k % nr_cores ]; + if ( nr_queues < nr_threads ) + e->runners[k].qid = cpuid[ k % nr_cores ] * nr_queues / nr_cores; + else + e->runners[k].qid = k; + + /* Set the cpu mask to zero | e->id. */ + CPU_ZERO( &cpuset ); + CPU_SET( cpuid[ k % nr_cores ] , &cpuset ); + + /* Apply this mask to the runner's pthread. */ + if ( pthread_setaffinity_np( e->runners[k].thread , sizeof(cpu_set_t) , &cpuset ) != 0 ) + error( "Failed to set thread affinity." ); + + #else + error( "SWIFT was not compiled with affinity enabled." ); + #endif + } + else { e->runners[k].cpuid = k; e->runners[k].qid = k * nr_queues / nr_threads; - #endif - // printf( "engine_init: runner %i on cpuid=%i with qid=%i.\n" , e->runners[k].id , e->runners[k].cpuid , e->runners[k].qid ); + } + // message( "runner %i on cpuid=%i with qid=%i." , e->runners[k].id , e->runners[k].cpuid , e->runners[k].qid ); } /* Wait for the runner threads to be in place. */ diff --git a/src/engine.h b/src/engine.h index b8c17f74161e94b64f0121cd6da47cc0a20782fe..401f943792aed147e73e13a1f11f1c283a23f81e 100644 --- a/src/engine.h +++ b/src/engine.h @@ -28,11 +28,17 @@ #define engine_policy_fixdt 16 #define engine_policy_multistep 32 #define engine_policy_cputight 64 +#define engine_policy_mpi 128 +#define engine_policy_setaffinity 256 #define engine_queue_scale 1.2 #define engine_maxtaskspercell 32 +/* The rank of the engine as a global variable (for messages). */ +extern int engine_rank; + + /* Data structure for the engine. */ struct engine { @@ -77,12 +83,21 @@ struct engine { pthread_cond_t barrier_cond; volatile int barrier_running, barrier_launch, barrier_launchcount; + /* ID of the node this engine lives on. */ + int nr_nodes, nodeID; + + /* Proxies for the other nodes in this simulation. */ + struct proxy *proxies; + int nr_proxies, *proxy_ind; + }; /* Function prototypes. */ void engine_barrier( struct engine *e , int tid ); -void engine_init ( struct engine *e , struct space *s , float dt , int nr_threads , int nr_queues , int policy ); +void engine_init ( struct engine *e , struct space *s , float dt , int nr_threads , int nr_queues , int nr_nodes , int nodeID , int policy ); void engine_prepare ( struct engine *e ); void engine_step ( struct engine *e ); void engine_maketasks ( struct engine *e ); +void engine_split ( struct engine *e , int *grid ); +int engine_exchange_strays ( struct engine *e , struct part *parts , struct xpart *xparts , int *ind , int N ); diff --git a/src/error.h b/src/error.h index 4d4aa5c76c55ba5e1d7c2f6ba6d6b0eb1e38c5da..c6e7bc229fb65cf1cc8aa43a15a5a7ea9421e385 100644 --- a/src/error.h +++ b/src/error.h @@ -20,8 +20,26 @@ #include <stdio.h> + /** * @brief Error macro. Prints the message given in argument and aborts. * */ -#define error(s) { fprintf( stderr , "%s:%s():%i: %s\n" , __FILE__ , __FUNCTION__ , __LINE__ , s ); abort(); } +#ifdef WITH_MPI + extern int engine_rank; + #define error(s, ...) { fprintf( stderr , "[%03i] %s:%s():%i: " s "\n" , engine_rank , __FILE__ , __FUNCTION__ , __LINE__ , ##__VA_ARGS__ ); MPI_Abort( MPI_COMM_WORLD , 0 ); abort(); } +#else + #define error(s, ...) { fprintf( stderr , "%s:%s():%i: " s "\n" , __FILE__ , __FUNCTION__ , __LINE__ , ##__VA_ARGS__ ); abort(); } +#endif + + +/** + * @brief Macro to print a localized message with variable arguments. + * + */ +#ifdef WITH_MPI + extern int engine_rank; + #define message(s, ...) printf( "%s[%03i]: " s "\n" , __FUNCTION__ , engine_rank , ##__VA_ARGS__ ) +#else + #define message(s, ...) printf( "%s: " s "\n" , __FUNCTION__ , ##__VA_ARGS__ ) +#endif diff --git a/src/io.c b/src/io.c index f5773335b7c07aa6e7efdc45a7ff887048ee557d..99eaf1127b8a0485231ddd2ce5014975e50852ec 100644 --- a/src/io.c +++ b/src/io.c @@ -32,6 +32,12 @@ #include <hdf5.h> #include <math.h> +/* MPI headers. */ +#ifdef WITH_MPI + #include <mpi.h> +#endif + +#include "const.h" #include "cycle.h" #include "lock.h" #include "task.h" @@ -120,17 +126,13 @@ void readAttribute(hid_t grp, char* name, enum DATA_TYPE type, void* data) h_attr = H5Aopen(grp, name, H5P_DEFAULT); if(h_attr < 0) { - char buf[100]; - sprintf(buf, "Error while opening attribute '%s'\n", name); - error(buf); + error( "Error while opening attribute '%s'" , name ); } h_err = H5Aread(h_attr, hdf5Type(type), data); if(h_err < 0) { - char buf[100]; - sprintf(buf, "Error while reading attribute '%s'\n", name); - error(buf); + error( "Error while reading attribute '%s'" , name ); } H5Aclose(h_attr); @@ -168,21 +170,17 @@ void readArrayBackEnd(hid_t grp, char* name, enum DATA_TYPE type, int N, int dim exist = H5Lexists(grp, name, 0); if(exist < 0) { - char buf[100]; - sprintf(buf, "Error while checking the existence of data set '%s'\n", name); - error(buf); + error( "Error while checking the existence of data set '%s'." , name ); } else if(exist == 0) { if(importance == COMPULSORY) { - char buf[100]; - sprintf(buf, "Compulsory data set '%s' not present in the file.\n", name); - error(buf); + error( "Compulsory data set '%s' not present in the file." , name ); } else { - /* printf("readArray: Optional data set '%s' not present. Zeroing this particle field...\n", name); */ + /* message("Optional data set '%s' not present. Zeroing this particle field...", name); */ for(i=0; i<N; ++i) memset(part_c+i*partSize, 0, copySize); @@ -191,15 +189,13 @@ void readArrayBackEnd(hid_t grp, char* name, enum DATA_TYPE type, int N, int dim } } - /* printf("readArray: Reading %s '%s' array...\n", importance == COMPULSORY ? "compulsory": "optional ", name); */ + /* message( "Reading %s '%s' array...", importance == COMPULSORY ? "compulsory": "optional ", name); */ /* Open data space */ h_data = H5Dopen1(grp, name); if(h_data < 0) { - char buf[100]; - sprintf(buf, "Error while opening data space '%s'\n", name); - error(buf); + error( "Error while opening data space '%s'." , name ); } /* Check data type */ @@ -220,9 +216,7 @@ void readArrayBackEnd(hid_t grp, char* name, enum DATA_TYPE type, int N, int dim h_err = H5Dread(h_data, hdf5Type(type), H5S_ALL, H5S_ALL, H5P_DEFAULT, temp); if(h_err < 0) { - char buf[100]; - sprintf(buf, "Error while reading data array '%s'\n", name); - error(buf); + error( "Error while reading data array '%s'." , name ); } /* Copy temporary buffer to particle data */ @@ -277,17 +271,15 @@ void read_ic ( char* fileName, double dim[3], struct part **parts, int* N, int* int numParticles[6]={0}; /* GADGET has 6 particle types. We only keep the type 0*/ /* Open file */ - /* printf("read_ic: Opening file '%s' as IC.\n", fileName); */ + /* message("Opening file '%s' as IC.", fileName); */ h_file = H5Fopen(fileName, H5F_ACC_RDONLY, H5P_DEFAULT); if(h_file < 0) { - char buf[200]; - sprintf(buf, "Error while opening file '%s'", fileName); - error(buf); + error( "Error while opening file '%s'." , fileName ); } /* Open header to read simulation properties */ - /* printf("read_ic: Reading runtime parameters...\n"); */ + /* message("Reading runtime parameters..."); */ h_grp = H5Gopen1(h_file, "/RuntimePars"); if(h_grp < 0) error("Error while opening runtime parameters\n"); @@ -299,7 +291,7 @@ void read_ic ( char* fileName, double dim[3], struct part **parts, int* N, int* H5Gclose(h_grp); /* Open header to read simulation properties */ - /* printf("read_ic: Reading file header...\n"); */ + /* message("Reading file header..."); */ h_grp = H5Gopen1(h_file, "/Header"); if(h_grp < 0) error("Error while opening file header\n"); @@ -313,7 +305,7 @@ void read_ic ( char* fileName, double dim[3], struct part **parts, int* N, int* dim[1] = ( boxSize[1] < 0 ) ? boxSize[0] : boxSize[1]; dim[2] = ( boxSize[2] < 0 ) ? boxSize[0] : boxSize[2]; - /* printf("read_ic: Found %d particles in a %speriodic box of size [%f %f %f]\n", */ + /* message("Found %d particles in a %speriodic box of size [%f %f %f].", */ /* *N, (periodic ? "": "non-"), dim[0], dim[1], dim[2]); */ /* Close header */ @@ -324,10 +316,10 @@ void read_ic ( char* fileName, double dim[3], struct part **parts, int* N, int* error("Error while allocating memory for particles"); bzero( *parts , *N * sizeof(struct part) ); - /* printf("read_ic: Allocated %8.2f MB for particles.\n", *N * sizeof(struct part) / (1024.*1024.)); */ + /* message("Allocated %8.2f MB for particles.", *N * sizeof(struct part) / (1024.*1024.)); */ /* Open SPH particles group */ - /* printf("read_ic: Reading particle arrays...\n"); */ + /* message("Reading particle arrays..."); */ h_grp = H5Gopen1(h_file, "/PartType0"); if(h_grp < 0) error( "Error while opening particle group.\n"); @@ -346,7 +338,7 @@ void read_ic ( char* fileName, double dim[3], struct part **parts, int* N, int* /* Close particle group */ H5Gclose(h_grp); - /* printf("read_ic: Done Reading particles...\n"); */ + /* message("Done Reading particles..."); */ /* Close file */ H5Fclose(h_file); @@ -385,33 +377,25 @@ void writeAttribute(hid_t grp, char* name, enum DATA_TYPE type, void* data, int h_space = H5Screate(H5S_SIMPLE); if(h_space < 0) { - char buf[100]; - sprintf(buf, "Error while creating dataspace for attribute '%s'\n", name); - error(buf); + error( "Error while creating dataspace for attribute '%s'." , name ); } h_err = H5Sset_extent_simple(h_space, 1, dim, NULL); if(h_err < 0) { - char buf[100]; - sprintf(buf, "Error while changing dataspace shape for attribute '%s'\n", name); - error(buf); + error( "Error while changing dataspace shape for attribute '%s'." , name ); } h_attr = H5Acreate1(grp, name, hdf5Type(type), h_space, H5P_DEFAULT); if(h_attr < 0) { - char buf[100]; - sprintf(buf, "Error while creating attribute '%s'\n", name); - error(buf); + error( "Error while creating attribute '%s'.", name ); } h_err = H5Awrite(h_attr, hdf5Type(type), data); if(h_err < 0) { - char buf[100]; - sprintf(buf, "Error while reading attribute '%s'\n", name); - error(buf); + error( "Error while reading attribute '%s'." , name ); } H5Sclose(h_space); @@ -435,41 +419,31 @@ void writeStringAttribute(hid_t grp, char* name, char* str, int length) h_space = H5Screate(H5S_SCALAR); if(h_space < 0) { - char buf[100]; - sprintf(buf, "Error while creating dataspace for attribute '%s'\n", name); - error(buf); + error( "Error while creating dataspace for attribute '%s'." , name ); } h_type = H5Tcopy(H5T_C_S1); if(h_type < 0) { - char buf[100]; - sprintf(buf, "Error while copying datatype 'H5T_C_S1'\n"); - error(buf); + error( "Error while copying datatype 'H5T_C_S1'." ); } h_err = H5Tset_size(h_type, length); if(h_err < 0) { - char buf[100]; - sprintf(buf, "Error while resizing attribute tyep to '%i'\n", length); - error(buf); + error( "Error while resizing attribute tyep to '%i'." , length ); } h_attr = H5Acreate1(grp, name, h_type, h_space, H5P_DEFAULT); if(h_attr < 0) { - char buf[100]; - sprintf(buf, "Error while creating attribute '%s'\n", name); - error(buf); + error( "Error while creating attribute '%s'." , name ); } h_err = H5Awrite(h_attr, h_type, str ); if(h_err < 0) { - char buf[100]; - sprintf(buf, "Error while reading attribute '%s'\n", name); - error(buf); + error( "Error while reading attribute '%s'." , name ); } H5Tclose(h_type); @@ -562,7 +536,7 @@ void writeArrayBackEnd(hid_t grp, char* fileName, FILE* xmfFile, char* name, enu char* temp_c = 0; hsize_t shape[2]; - /* printf("writeArray: Writing '%s' array...\n", name); */ + /* message("Writing '%s' array...", name); */ /* Allocate temporary buffer */ temp = malloc(N * dim * sizeOfType(type)); @@ -578,9 +552,7 @@ void writeArrayBackEnd(hid_t grp, char* fileName, FILE* xmfFile, char* name, enu h_space = H5Screate(H5S_SIMPLE); if(h_space < 0) { - char buf[100]; - sprintf(buf, "Error while creating data space for field '%s'\n", name); - error(buf); + error( "Error while creating data space for field '%s'." , name ); } if(dim > 1) @@ -598,27 +570,21 @@ void writeArrayBackEnd(hid_t grp, char* fileName, FILE* xmfFile, char* name, enu h_err = H5Sset_extent_simple(h_space, rank, shape, NULL); if(h_err < 0) { - char buf[100]; - sprintf(buf, "Error while changing data space shape for field '%s'\n", name); - error(buf); + error( "Error while changing data space shape for field '%s'." , name ); } /* Create dataset */ h_data = H5Dcreate1(grp, name, hdf5Type(type), h_space, H5P_DEFAULT); if(h_data < 0) { - char buf[100]; - sprintf(buf, "Error while creating dataspace '%s'\n", name); - error(buf); + error( "Error while creating dataspace '%s'." , name ); } /* Write temporary buffer to HDF5 dataspace */ h_err = H5Dwrite(h_data, hdf5Type(type), h_space, H5S_ALL, H5P_DEFAULT, temp); if(h_err < 0) { - char buf[100]; - sprintf(buf, "Error while reading data array '%s'\n", name); - error(buf); + error( "Error while reading data array '%s'." , name ); } /* Write XMF description for this data set */ @@ -689,17 +655,15 @@ void write_output (struct engine *e) /* Open file */ - /* printf("write_output: Opening file '%s'.\n", fileName); */ + /* message("Opening file '%s'.", fileName); */ h_file = H5Fcreate(fileName, H5F_ACC_TRUNC, H5P_DEFAULT,H5P_DEFAULT); if(h_file < 0) { - char buf[200]; - sprintf(buf, "Error while opening file '%s'", fileName); - error(buf); + error( "Error while opening file '%s'." , fileName ); } /* Open header to read simulation properties */ - /* printf("write_output: Writing runtime parameters...\n"); */ + /* message("Writing runtime parameters..."); */ h_grp = H5Gcreate1(h_file, "/RuntimePars", 0); if(h_grp < 0) error("Error while creating runtime parameters group\n"); @@ -711,7 +675,7 @@ void write_output (struct engine *e) H5Gclose(h_grp); /* Open header to write simulation properties */ - /* printf("write_output: Writing file header...\n"); */ + /* message("Writing file header..."); */ h_grp = H5Gcreate1(h_file, "/Header", 0); if(h_grp < 0) error("Error while creating file header\n"); @@ -765,7 +729,7 @@ void write_output (struct engine *e) H5Gclose(h_grp); /* Create SPH particles group */ - /* printf("write_output: Writing particle arrays...\n"); */ + /* message("Writing particle arrays..."); */ h_grp = H5Gcreate1(h_file, "/PartType0", 0); if(h_grp < 0) error( "Error while creating particle group.\n"); @@ -787,7 +751,7 @@ void write_output (struct engine *e) /* Write LXMF file descriptor */ writeXMFfooter(xmfFile); - /* printf("write_output: Done writing particles...\n"); */ + /* message("Done writing particles..."); */ /* Close file */ H5Fclose(h_file); diff --git a/src/kernel.h b/src/kernel.h index 5c7711e2d5a22aa3f1129fbc7f27532bf3162905..f2ff4e329b42095f018c36b0878cb7bee54c47f9 100644 --- a/src/kernel.h +++ b/src/kernel.h @@ -27,7 +27,6 @@ */ #include "vector.h" -#include "const.h" /* -------------------------------------------------------------------------------------------------------------------- */ diff --git a/src/proxy.c b/src/proxy.c new file mode 100644 index 0000000000000000000000000000000000000000..ed383bf02f491ed726e1da88a9b3f825b599042f --- /dev/null +++ b/src/proxy.c @@ -0,0 +1,346 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Coypright (c) 2013 Pedro Gonnet (pedro.gonnet@durham.ac.uk) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +/* Config parameters. */ +#include "../config.h" + +/* Some standard headers. */ +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <pthread.h> +#include <math.h> +#include <float.h> +#include <limits.h> +#include <omp.h> +#include <sched.h> + +/* MPI headers. */ +#ifdef WITH_MPI + #include <mpi.h> +#endif + +/* Local headers. */ +#include "const.h" +#include "cycle.h" +#include "atomic.h" +#include "timers.h" +#include "const.h" +#include "vector.h" +#include "lock.h" +#include "space.h" +#include "cell.h" +#include "task.h" +#include "part.h" +#include "debug.h" +#include "proxy.h" +#include "error.h" + + +/** + * @brief Exchange cells with a remote node. + * + * @paam p The #proxy. + */ + +void proxy_cells_exch1 ( struct proxy *p ) { + +#ifdef WITH_MPI + + int k, ind; + + /* Get the number of pcells we will need to send. */ + p->size_pcells_out = 0; + for ( k = 0 ; k < p->nr_cells_out ; k++ ) + p->size_pcells_out += p->cells_out[k]->pcell_size; + + /* Send the number of pcells. */ + if ( MPI_Isend( &p->size_pcells_out , 1 , MPI_INT , p->nodeID , p->mynodeID*proxy_tag_shift + proxy_tag_count , MPI_COMM_WORLD , &p->req_cells_count_out ) != MPI_SUCCESS ) + error( "Failed to isend nr of pcells." ); + MPI_Request_free( &p->req_cells_count_out ); + // message( "isent pcell count (%i) from node %i to node %i." , p->size_pcells_out , p->mynodeID , p->nodeID ); fflush(stdout); + + /* Allocate and fill the pcell buffer. */ + if ( p->pcells_out != NULL ) + free( p->pcells_out ); + if ( ( p->pcells_out = malloc( sizeof(struct pcell) * p->size_pcells_out ) ) == NULL ) + error( "Failed to allocate pcell_out buffer." ); + for ( ind = 0 , k = 0 ; k < p->nr_cells_out ; k++ ) { + memcpy( &p->pcells_out[ind] , p->cells_out[k]->pcell , sizeof(struct pcell) * p->cells_out[k]->pcell_size ); + ind += p->cells_out[k]->pcell_size; + } + + /* Send the pcell buffer. */ + if ( MPI_Isend( p->pcells_out , sizeof(struct pcell)*p->size_pcells_out , MPI_BYTE , p->nodeID , p->mynodeID*proxy_tag_shift + proxy_tag_cells , MPI_COMM_WORLD , &p->req_cells_out ) != MPI_SUCCESS ) + error( "Failed to pcell_out buffer." ); + MPI_Request_free( &p->req_cells_out ); + // message( "isent pcells (%i) from node %i to node %i." , p->size_pcells_out , p->mynodeID , p->nodeID ); fflush(stdout); + + /* Receive the number of pcells. */ + if ( MPI_Irecv( &p->size_pcells_in , 1 , MPI_INT , p->nodeID , p->nodeID*proxy_tag_shift + proxy_tag_count , MPI_COMM_WORLD , &p->req_cells_count_in ) != MPI_SUCCESS ) + error( "Failed to irecv nr of pcells." ); + // message( "irecv pcells count on node %i from node %i." , p->mynodeID , p->nodeID ); fflush(stdout); + +#else + error( "SWIFT was not compiled with MPI support." ); +#endif + + } + + +void proxy_cells_exch2 ( struct proxy *p ) { + +#ifdef WITH_MPI + + /* Re-allocate the pcell_in buffer. */ + if ( p->pcells_in != NULL ) + free( p->pcells_in ); + if ( ( p->pcells_in = (struct pcell *)malloc( sizeof(struct pcell) * p->size_pcells_in ) ) == NULL ) + error( "Failed to allocate pcell_in buffer." ); + + /* Receive the particle buffers. */ + if ( MPI_Irecv( p->pcells_in , sizeof(struct pcell)*p->size_pcells_in , MPI_BYTE , p->nodeID , p->nodeID*proxy_tag_shift + proxy_tag_cells , MPI_COMM_WORLD , &p->req_cells_in ) != MPI_SUCCESS ) + error( "Failed to irecv part data." ); + // message( "irecv pcells (%i) on node %i from node %i." , p->size_pcells_in , p->mynodeID , p->nodeID ); fflush(stdout); + +#else + error( "SWIFT was not compiled with MPI support." ); +#endif + + } + + +/** + * @brief Add a cell to the given proxy's input list. + * + * @param p The #proxy. + * @param c The #cell. + */ + +void proxy_addcell_in ( struct proxy *p , struct cell *c ) { + + int k; + struct cell **temp; + + /* Check if the cell is already registered with the proxy. */ + for ( k = 0 ; k < p->nr_cells_in ; k++ ) + if ( p->cells_in[k] == c ) + return; + + /* Do we need to grow the number of in cells? */ + if ( p->nr_cells_in == p->size_cells_in ) { + p->size_cells_in *= proxy_buffgrow; + if ( ( temp = malloc( sizeof(struct cell *) * p->size_cells_in ) ) == NULL ) + error( "Failed to allocate ingoing cell list." ); + memcpy( temp , p->cells_in , sizeof(struct cell *) * p->nr_cells_in ); + free( p->cells_in ); + p->cells_in = temp; + } + + /* Add the cell. */ + p->cells_in[ p->nr_cells_in ] = c; + p->nr_cells_in += 1; + + } + + +/** + * @brief Add a cell to the given proxy's output list. + * + * @param p The #proxy. + * @param c The #cell. + */ + +void proxy_addcell_out ( struct proxy *p , struct cell *c ) { + + int k; + struct cell **temp; + + /* Check if the cell is already registered with the proxy. */ + for ( k = 0 ; k < p->nr_cells_out ; k++ ) + if ( p->cells_out[k] == c ) + return; + + /* Do we need to grow the number of out cells? */ + if ( p->nr_cells_out == p->size_cells_out ) { + p->size_cells_out *= proxy_buffgrow; + if ( ( temp = malloc( sizeof(struct cell *) * p->size_cells_out ) ) == NULL ) + error( "Failed to allocate outgoing cell list." ); + memcpy( temp , p->cells_out , sizeof(struct cell *) * p->nr_cells_out ); + free( p->cells_out ); + p->cells_out = temp; + } + + /* Add the cell. */ + p->cells_out[ p->nr_cells_out ] = c; + p->nr_cells_out += 1; + + } + + +/** + * @brief Exchange particles with a remote node. + * + * @paam p The #proxy. + */ + +void proxy_parts_exch1 ( struct proxy *p ) { + +#ifdef WITH_MPI + + /* Send the number of particles. */ + if ( MPI_Isend( &p->nr_parts_out , 1 , MPI_INT , p->nodeID , p->mynodeID*proxy_tag_shift + proxy_tag_count , MPI_COMM_WORLD , &p->req_parts_count_out ) != MPI_SUCCESS ) + error( "Failed to isend nr of parts." ); + MPI_Request_free( &p->req_parts_count_out ); + // message( "isent particle count (%i) from node %i to node %i." , p->nr_parts_out , p->mynodeID , p->nodeID ); fflush(stdout); + + /* Send the particle buffers. */ + if ( MPI_Isend( p->parts_out , sizeof(struct part)*p->nr_parts_out , MPI_BYTE , p->nodeID , p->mynodeID*proxy_tag_shift + proxy_tag_parts , MPI_COMM_WORLD , &p->req_parts_out ) != MPI_SUCCESS || + MPI_Isend( p->xparts_out , sizeof(struct xpart)*p->nr_parts_out , MPI_BYTE , p->nodeID , p->mynodeID*proxy_tag_shift + proxy_tag_xparts , MPI_COMM_WORLD , &p->req_xparts_out ) != MPI_SUCCESS ) + error( "Failed to isend part data." ); + MPI_Request_free( &p->req_parts_out ); + MPI_Request_free( &p->req_xparts_out ); + // message( "isent particle data (%i) to node %i." , p->nr_parts_out , p->nodeID ); fflush(stdout); + /* for ( int k = 0 ; k < p->nr_parts_out ; k++ ) + message( "sending particle %lli, x=[%.3e %.3e %.3e], h=%.3e, to node %i." , + p->parts_out[k].id , p->parts_out[k].x[0] , p->parts_out[k].x[1] , p->parts_out[k].x[2] , + p->parts_out[k].h , p->nodeID ); */ + + /* Receive the number of particles. */ + if ( MPI_Irecv( &p->nr_parts_in , 1 , MPI_INT , p->nodeID , p->nodeID*proxy_tag_shift + proxy_tag_count , MPI_COMM_WORLD , &p->req_parts_count_in ) != MPI_SUCCESS ) + error( "Failed to irecv nr of parts." ); + // message( "irecv particle count on node %i from node %i." , p->mynodeID , p->nodeID ); fflush(stdout); + +#else + error( "SWIFT was not compiled with MPI support." ); +#endif + + } + + +void proxy_parts_exch2 ( struct proxy *p ) { + +#ifdef WITH_MPI + + /* Is there enough space in the buffer? */ + if ( p->nr_parts_in > p->size_parts_in ) { + do { + p->size_parts_in *= proxy_buffgrow; + } while ( p->nr_parts_in > p->size_parts_in ); + free( p->parts_in ); free( p->xparts_in ); + if ( ( p->parts_in = (struct part *)malloc( sizeof(struct part) * p->size_parts_in ) ) == NULL || + ( p->xparts_in = (struct xpart *)malloc( sizeof(struct xpart) * p->size_parts_in ) ) == NULL ) + error( "Failed to re-allocate parts_in buffers." ); + } + + /* Receive the particle buffers. */ + if ( MPI_Irecv( p->parts_in , sizeof(struct part)*p->nr_parts_in , MPI_BYTE , p->nodeID , p->nodeID*proxy_tag_shift + proxy_tag_parts , MPI_COMM_WORLD , &p->req_parts_in ) != MPI_SUCCESS || + MPI_Irecv( p->xparts_in , sizeof(struct xpart)*p->nr_parts_in , MPI_BYTE , p->nodeID , p->nodeID*proxy_tag_shift + proxy_tag_xparts , MPI_COMM_WORLD , &p->req_xparts_in ) != MPI_SUCCESS ) + error( "Failed to irecv part data." ); + // message( "irecv particle data (%i) from node %i." , p->nr_parts_in , p->nodeID ); fflush(stdout); + +#else + error( "SWIFT was not compiled with MPI support." ); +#endif + + } + + +/** + * @brief Load parts onto a proxy for exchange. + * + * @param p The #proxy. + * @param parts Pointer to an array of #part to send. + * @param xparts Pointer to an array of #xpart to send. + * @param N The number of parts. + */ + +void proxy_parts_load ( struct proxy *p , struct part *parts , struct xpart *xparts , int N ) { + + /* Is there enough space in the buffer? */ + if ( p->nr_parts_out + N > p->size_parts_out ) { + do { + p->size_parts_out *= proxy_buffgrow; + } while ( p->nr_parts_out + N > p->size_parts_out ); + struct part *tp; + struct xpart *txp; + if ( ( tp = (struct part *)malloc( sizeof(struct part) * p->size_parts_out ) ) == NULL || + ( txp = (struct xpart *)malloc( sizeof(struct xpart) * p->size_parts_out ) ) == NULL ) + error( "Failed to re-allocate parts_out buffers." ); + memcpy( tp , p->parts_out , sizeof(struct part) * p->nr_parts_out ); + memcpy( txp , p->xparts_out , sizeof(struct part) * p->nr_parts_out ); + free( p->parts_out ); free( p->xparts_out ); + p->parts_out = tp; p->xparts_out = txp; + } + + /* Copy the parts and xparts data to the buffer. */ + memcpy( &p->parts_out[ p->nr_parts_out ] , parts , sizeof(struct part) * N ); + memcpy( &p->xparts_out[ p->nr_parts_out ] , xparts , sizeof(struct xpart) * N ); + + /* Increase the counters. */ + p->nr_parts_out += N; + + } + + +/** + * @brief Initialize the given proxy. + * + * @param p The #proxy. + * @param nodeID The node with which this proxy will communicate. + */ + +void proxy_init ( struct proxy *p , int mynodeID , int nodeID ) { + + /* Set the nodeID. */ + p->mynodeID = mynodeID; + p->nodeID = nodeID; + + /* Allocate the cell send and receive buffers, if needed. */ + if ( p->cells_in == NULL ) { + p->size_cells_in = proxy_buffinit; + if ( ( p->cells_in = (struct cell **)malloc( sizeof(void *) * p->size_cells_in ) ) == NULL ) + error( "Failed to allocate cells_in buffer." ); + } + p->nr_cells_in = 0; + if ( p->cells_out == NULL ) { + p->size_cells_out = proxy_buffinit; + if ( ( p->cells_out = (struct cell **)malloc( sizeof(void *) * p->size_cells_out ) ) == NULL ) + error( "Failed to allocate cells_out buffer." ); + } + p->nr_cells_out = 0; + + /* Allocate the part send and receive buffers, if needed. */ + if ( p->parts_in == NULL ) { + p->size_parts_in = proxy_buffinit; + if ( ( p->parts_in = (struct part *)malloc( sizeof(struct part) * p->size_parts_in ) ) == NULL || + ( p->xparts_in = (struct xpart *)malloc( sizeof(struct xpart) * p->size_parts_in ) ) == NULL ) + error( "Failed to allocate parts_in buffers." ); + } + p->nr_parts_in = 0; + if ( p->parts_out == NULL ) { + p->size_parts_out = proxy_buffinit; + if ( ( p->parts_out = (struct part *)malloc( sizeof(struct part) * p->size_parts_out ) ) == NULL || + ( p->xparts_out = (struct xpart *)malloc( sizeof(struct xpart) * p->size_parts_out ) ) == NULL ) + error( "Failed to allocate parts_out buffers." ); + } + p->nr_parts_out = 0; + + } diff --git a/src/proxy.h b/src/proxy.h new file mode 100644 index 0000000000000000000000000000000000000000..4710f5a8909bbab3b85fb71e15ae762a602a9dad --- /dev/null +++ b/src/proxy.h @@ -0,0 +1,76 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Coypright (c) 2013 Pedro Gonnet (pedro.gonnet@durham.ac.uk) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + + + +/* Some constants. */ +#define proxy_buffgrow 1.5 +#define proxy_buffinit 100 + +/* Proxy tag arithmetic. */ +#define proxy_tag_shift 8 +#define proxy_tag_count 0 +#define proxy_tag_parts 1 +#define proxy_tag_xparts 2 +#define proxy_tag_cells 3 + + +/* Data structure for the proxy. */ +struct proxy { + + /* ID of the node this proxy represents. */ + int mynodeID, nodeID; + + /* Incomming cells. */ + struct cell **cells_in; + struct pcell *pcells_in; + int nr_cells_in, size_cells_in, size_pcells_in; + + /* Outgoing cells. */ + struct cell **cells_out; + struct pcell *pcells_out; + int nr_cells_out, size_cells_out, size_pcells_out; + + /* The parts and xparts buffers for input and output. */ + struct part *parts_in, *parts_out; + struct xpart *xparts_in, *xparts_out; + int size_parts_in, size_parts_out; + int nr_parts_in, nr_parts_out; + + /* MPI request handles. */ + #ifdef WITH_MPI + MPI_Request req_parts_count_out, req_parts_count_in; + MPI_Request req_parts_out, req_parts_in; + MPI_Request req_xparts_out, req_xparts_in; + MPI_Request req_cells_count_out, req_cells_count_in; + MPI_Request req_cells_out, req_cells_in; + #endif + + }; + + +/* Function prototypes. */ +void proxy_init ( struct proxy *p , int mynodeID , int nodeID ); +void proxy_parts_load ( struct proxy *p , struct part *parts , struct xpart *xparts , int N ); +void proxy_parts_exch1 ( struct proxy *p ); +void proxy_parts_exch2 ( struct proxy *p ); +void proxy_addcell_in ( struct proxy *p , struct cell *c ); +void proxy_addcell_out ( struct proxy *p , struct cell *c ); +void proxy_cells_exch1 ( struct proxy *p ); +void proxy_cells_exch2 ( struct proxy *p ); diff --git a/src/queue.c b/src/queue.c index 7b66ccbeeb67b2f75e63ab4cd5bcf3bdb2fc5b1f..1519cebac61216ec7529ea299eaad7ffd3a1c7bc 100644 --- a/src/queue.c +++ b/src/queue.c @@ -25,11 +25,18 @@ #include <stdlib.h> #include <string.h> +/* MPI headers. */ +#ifdef WITH_MPI + #include <mpi.h> +#endif + /* Local headers. */ +#include "const.h" #include "cycle.h" #include "lock.h" #include "task.h" #include "timers.h" +#include "space.h" #include "cell.h" #include "queue.h" #include "error.h" diff --git a/src/runner.c b/src/runner.c index 6274ccd351bac0da9601de8fe21fa4250baadfbb..2838f5b376429c1f46b762659682acadc3dc8fcc 100644 --- a/src/runner.c +++ b/src/runner.c @@ -30,7 +30,13 @@ #include <limits.h> #include <omp.h> +/* MPI headers. */ +#ifdef WITH_MPI + #include <mpi.h> +#endif + /* Local headers. */ +#include "const.h" #include "cycle.h" #include "atomic.h" #include "timers.h" @@ -38,8 +44,8 @@ #include "lock.h" #include "task.h" #include "part.h" -#include "cell.h" #include "space.h" +#include "cell.h" #include "queue.h" #include "scheduler.h" #include "engine.h" @@ -89,6 +95,43 @@ const char runner_flip[27] = { 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 #include "runner_doiact.h" +/** + * @brief Send a local cell's particle data to another node. + * + * @param r The #runner. + * @param c The #cell. + * @param nodeID The destination node's ID. + * @param tagbit bit to distinguish between xv and rho sends. + */ + +void runner_dosend ( struct runner *r , struct cell *c , int nodeID , int tag ) { + +#ifdef WITH_MPI + + MPI_Request req; + + /* First check if all the density tasks have been run. */ + if ( tag & 1 ) + if ( c->parts[0].rho == 0.0 ) + error( "Attempting to send rhos before ghost task completed." ); + + /* Emit the isend. */ + if ( MPI_Isend( c->parts , sizeof(struct part) * c->count , MPI_BYTE , nodeID , tag , MPI_COMM_WORLD , &req ) != MPI_SUCCESS ) + error( "Failed to isend particle data." ); + + message( "sending %i parts with tag=%i from %i to %i." , + c->count , tag , r->e->nodeID , nodeID ); fflush(stdout); + + /* Free the request handler as we don't care what happens next. */ + MPI_Request_free( &req ); + +#else + error( "SWIFT was not compiled with MPI support." ); +#endif + + } + + /** * @brief Sort the entries in ascending order using QuickSort. * @@ -315,7 +358,7 @@ void runner_dosort ( struct runner *r , struct cell *c , int flags , int clock ) } */ #ifdef TIMER_VERBOSE - printf( "runner_dosort[%02i]: %i parts at depth %i (flags = %i%i%i%i%i%i%i%i%i%i%i%i%i) took %.3f ms.\n" , + message( "runner %02i: %i parts at depth %i (flags = %i%i%i%i%i%i%i%i%i%i%i%i%i) took %.3f ms." , r->id , c->count , c->depth , (flags & 0x1000) >> 12 , (flags & 0x800) >> 11 , (flags & 0x400) >> 10 , (flags & 0x200) >> 9 , (flags & 0x100) >> 8 , (flags & 0x80) >> 7 , (flags & 0x40) >> 6 , (flags & 0x20) >> 5 , (flags & 0x10) >> 4 , (flags & 0x8) >> 3 , (flags & 0x4) >> 2 , (flags & 0x2) >> 1 , (flags & 0x1) >> 0 , ((double)TIMER_TOC(timer_dosort)) / CPU_TPS * 1000 ); fflush(stdout); @@ -385,13 +428,13 @@ void runner_doghost ( struct runner *r , struct cell *c ) { /* Is this part within the timestep? */ if ( p->dt <= dt_step ) { - /* Some smoothing length multiples. */ - h = p->h; + /* Some smoothing length multiples. */ + h = p->h; ih = 1.0f / h; ih2 = ih * ih; ih4 = ih2 * ih2; - /* Final operation on the density. */ + /* Final operation on the density. */ p->rho = rho = ih * ih2 * ( p->rho + p->mass*kernel_root ); p->rho_dh = rho_dh = ( p->rho_dh - 3.0f*p->mass*kernel_root ) * ih4; wcount = ( p->density.wcount + kernel_root ) * ( 4.0f / 3.0 * M_PI * kernel_gamma3 ); @@ -417,7 +460,7 @@ void runner_doghost ( struct runner *r , struct cell *c ) { /* Did we get the right number density? */ if ( wcount > kernel_nwneigh + const_delta_nwneigh || wcount < kernel_nwneigh - const_delta_nwneigh ) { - // printf( "runner_doghost: particle %lli (h=%e,depth=%i) has bad wcount=%.3f.\n" , p->id , p->h , c->depth , wcount ); fflush(stdout); + // message( "particle %lli (h=%e,depth=%i) has bad wcount=%.3f." , p->id , p->h , c->depth , wcount ); fflush(stdout); // p->h += ( p->density.wcount + kernel_root - kernel_nwneigh ) / p->density.wcount_dh; pid[redo] = pid[i]; redo += 1; @@ -425,15 +468,15 @@ void runner_doghost ( struct runner *r , struct cell *c ) { p->density.wcount_dh = 0.0; p->rho = 0.0; p->rho_dh = 0.0; - p->density.div_v = 0.0; - for ( k=0 ; k < 3 ; k++) - p->density.curl_v[k] = 0.0; + p->density.div_v = 0.0; + for ( k=0 ; k < 3 ; k++) + p->density.curl_v[k] = 0.0; continue; } /* Pre-compute some stuff for the balsara switch. */ - normDiv_v = fabs( p->density.div_v / rho * ih4 ); - normCurl_v = sqrtf( p->density.curl_v[0] * p->density.curl_v[0] + p->density.curl_v[1] * p->density.curl_v[1] + p->density.curl_v[2] * p->density.curl_v[2] ) / rho * ih4; + normDiv_v = fabs( p->density.div_v / rho * ih4 ); + normCurl_v = sqrtf( p->density.curl_v[0] * p->density.curl_v[0] + p->density.curl_v[1] * p->density.curl_v[1] + p->density.curl_v[2] * p->density.curl_v[2] ) / rho * ih4; /* As of here, particle force variables will be set. Do _NOT_ try to read any particle density variables! */ @@ -445,23 +488,22 @@ void runner_doghost ( struct runner *r , struct cell *c ) { /* Compute the P/Omega/rho2. */ p->force.POrho2 = u * ( const_hydro_gamma - 1.0f ) / ( rho + h * rho_dh / 3.0f ); - /* Balsara switch */ - p->force.balsara = normDiv_v / ( normDiv_v + normCurl_v + 0.0001f * fc * ih ); + /* Balsara switch */ + p->force.balsara = normDiv_v / ( normDiv_v + normCurl_v + 0.0001f * fc * ih ); -#ifndef LEGACY_GADGET2_SPH + #ifndef LEGACY_GADGET2_SPH + /* Viscosity parameter decay time */ + tau = h / ( 2.f * const_viscosity_length * p->force.c ); - /* Viscosity parameter decay time */ - tau = h / ( 2.f * const_viscosity_length * p->force.c ); + /* Viscosity source term */ + S = fmaxf( -normDiv_v, 0.f ); - /* Viscosity source term */ - S = fmaxf( -normDiv_v, 0.f ); + /* Compute the particle's viscosity parameter time derivative */ + alpha_dot = ( const_viscosity_alpha_min - p->alpha ) / tau + ( const_viscosity_alpha_max - p->alpha ) * S; - /* Compute the particle's viscosity parameter time derivative */ - alpha_dot = ( const_viscosity_alpha_min - p->alpha ) / tau + ( const_viscosity_alpha_max - p->alpha ) * S; - - /* Update particle's viscosity paramter */ - p->alpha += alpha_dot * p->dt; -#endif + /* Update particle's viscosity paramter */ + p->alpha += alpha_dot * p->dt; + #endif /* Reset the acceleration. */ for ( k = 0 ; k < 3 ; k++ ) @@ -523,7 +565,7 @@ void runner_doghost ( struct runner *r , struct cell *c ) { } #ifdef TIMER_VERBOSE - printf( "runner_doghost[%02i]: %i parts at depth %i took %.3f ms.\n" , + message( "runner %02i: %i parts at depth %i took %.3f ms." , r->id , c->count , c->depth , ((double)TIMER_TOC(timer_doghost)) / CPU_TPS * 1000 ); fflush(stdout); #else @@ -657,7 +699,7 @@ void runner_dokick2 ( struct runner *r , struct cell *c ) { } #ifdef TIMER_VERBOSE - printf( "runner_dokick2[%02i]: %i parts at depth %i took %.3f ms.\n" , + message( "runner %02i: %i parts at depth %i took %.3f ms." , r->id , c->count , c->depth , ((double)TIMER_TOC(timer_kick2)) / CPU_TPS * 1000 ); fflush(stdout); #else @@ -1001,7 +1043,7 @@ void runner_dokick ( struct runner *r , struct cell *c , int timer ) { if ( timer ) { #ifdef TIMER_VERBOSE - printf( "runner_dokick2[%02i]: %i parts at depth %i took %.3f ms.\n" , + message( "runner %02i: %i parts at depth %i took %.3f ms." , r->id , c->count , c->depth , ((double)TIMER_TOC(timer_kick2)) / CPU_TPS * 1000 ); fflush(stdout); #else @@ -1117,6 +1159,12 @@ void *runner_main ( void *data ) { else runner_dokick2( r , ci ); break; + case task_type_send_xv: + case task_type_send_rho: + break; + case task_type_recv_xv: + case task_type_recv_rho: + break; default: error( "Unknown task type." ); } diff --git a/src/scheduler.c b/src/scheduler.c index a3f711fedf9948183671e80fa8173da9f5106b97..57a276f95b315b04ef9cba9027391c6d2be8938d 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -27,6 +27,11 @@ #include <math.h> #include <pthread.h> +/* MPI headers. */ +#ifdef WITH_MPI + #include <mpi.h> +#endif + /* Local headers. */ #include "error.h" #include "cycle.h" @@ -38,8 +43,8 @@ #include "task.h" #include "part.h" #include "debug.h" -#include "cell.h" #include "space.h" +#include "cell.h" #include "queue.h" #include "kernel.h" #include "scheduler.h" @@ -67,24 +72,28 @@ void scheduler_map_mkghosts ( struct cell *c , void *data ) { if ( finger->nr_tasks > 0 ) c->super = finger; + /* As of here, things are only interesting for local cells. */ + if ( c->nodeID != s->nodeID ) + return; + /* Make the ghost task */ if ( c->super != c || c->nr_tasks > 0 ) c->ghost = scheduler_addtask( s , task_type_ghost , task_subtype_none , 0 , 0 , c , NULL , 0 ); - /* Append a kick1 task and make sure the parent depends on it. */ - if ( c->parent == NULL || c->parent->count >= space_subsize ) { + /* Append a kick1 task if local and make sure the parent depends on it. */ + if (c->parent == NULL || c->parent->count >= space_subsize ) { c->kick1 = scheduler_addtask( s , task_type_kick1 , task_subtype_none , 0 , 0 , c , NULL , 0 ); if ( c->parent != NULL ) task_addunlock( c->kick1 , c->parent->kick1 ); } - /* Append a kick2 task if we are the active super cell. */ + /* Append a kick2 task if we are local and the active super cell. */ if ( c->super == c && c->nr_tasks > 0 ) c->kick2 = scheduler_addtask( s , task_type_kick2 , task_subtype_none , 0 , 0 , c , NULL , 0 ); /* If we are not the super cell ourselves, make our ghost depend on our parent cell. */ - if ( c->super != c ) { + if ( c->ghost != NULL && c->super != c ) { task_addunlock( c->parent->ghost , c->ghost ); c->ghost->implicit = 1; } @@ -104,6 +113,10 @@ void scheduler_map_mkghosts_nokick1 ( struct cell *c , void *data ) { if ( finger->nr_tasks > 0 ) c->super = finger; + /* As of here, things are only interesting for local cells. */ + if ( c->nodeID != s->nodeID ) + return; + /* Make the ghost task */ if ( c->super != c || c->nr_tasks > 0 ) c->ghost = scheduler_addtask( s , task_type_ghost , task_subtype_none , 0 , 0 , c , NULL , 0 ); @@ -170,8 +183,6 @@ void scheduler_splittasks ( struct scheduler *s ) { 0.5788 }; /* Loop through the tasks... */ - // #pragma omp parallel default(none) shared(s,tid,pts,space_subsize) private(ind,j,k,t,t_old,redo,ci,cj,hi,hj,sid,shift) - { redo = 0; t_old = t = NULL; while ( 1 ) { @@ -193,18 +204,26 @@ void scheduler_splittasks ( struct scheduler *s ) { t->skip = 1; continue; } - + + /* Non-local kick task? */ + if ( (t->type == task_type_kick1 || t->type == task_type_kick2 ) && + t->ci->nodeID != s->nodeID ) { + t->type = task_type_none; + t->skip = 1; + continue; + } + /* Self-interaction? */ if ( t->type == task_type_self ) { /* Get a handle on the cell involved. */ ci = t->ci; - /* Ingore this task? */ - /* if ( ci->dt_min > dt_step ) { + /* Foreign task? */ + if ( ci->nodeID != s->nodeID ) { t->skip = 1; continue; - } */ + } /* Is this cell even split? */ if ( ci->split ) { @@ -251,11 +270,11 @@ void scheduler_splittasks ( struct scheduler *s ) { hi = ci->dmin; hj = cj->dmin; - /* Ingore this task? */ - /* if ( ci->dt_min > dt_step && cj->dt_min > dt_step ) { + /* Foreign task? */ + if ( ci->nodeID != s->nodeID && cj->nodeID != s->nodeID ) { t->skip = 1; continue; - } */ + } /* Get the sort ID, use space_getsid and not t->flags to make sure we get ci and cj swapped if needed. */ @@ -433,8 +452,6 @@ void scheduler_splittasks ( struct scheduler *s ) { } /* loop over all tasks. */ - } - } @@ -473,6 +490,7 @@ struct task *scheduler_addtask ( struct scheduler *s , int type , int subtype , t->weight = 0; t->rank = 0; t->tic = 0; + t->toc = 0; t->nr_unlock_tasks = 0; /* Init the lock. */ @@ -530,7 +548,7 @@ void scheduler_ranktasks ( struct scheduler *s ) { tid[i] = t - tasks; if ( tid[i] >= nr_tasks ) error( "Task index overshoot." ); - /* printf( "scheduler_ranktasks: task %i of type %s has rank %i.\n" , i , + /* message( "task %i of type %s has rank %i." , i , (t->type == task_type_self) ? "self" : (t->type == task_type_pair) ? "pair" : "sort" , rank ); */ for ( k = 0 ; k < t->nr_unlock_tasks ; k++ ) t->unlock_tasks[k]->wait -= 1; @@ -643,7 +661,7 @@ void scheduler_reweight ( struct scheduler *s ) { break; } } - // printf( "scheduler_reweight: weighting tasks took %.3f ms.\n" , (double)( getticks() - tic ) / CPU_TPS * 1000 ); + // message( "weighting tasks took %.3f ms." , (double)( getticks() - tic ) / CPU_TPS * 1000 ); } @@ -672,7 +690,7 @@ void scheduler_start ( struct scheduler *s , unsigned int mask ) { for ( j = 0 ; j < t->nr_unlock_tasks ; j++ ) atomic_inc( &t->unlock_tasks[j]->wait ); } - // printf( "scheduler_reweight: waiting tasks took %.3f ms.\n" , (double)( getticks() - tic ) / CPU_TPS * 1000 ); + // message( "waiting tasks took %.3f ms." , (double)( getticks() - tic ) / CPU_TPS * 1000 ); /* Loop over the tasks and enqueue whoever is ready. */ // tic = getticks(); @@ -687,7 +705,7 @@ void scheduler_start ( struct scheduler *s , unsigned int mask ) { scheduler_enqueue( s , t ); } } - // printf( "scheduler_start: enqueueing tasks took %.3f ms.\n" , (double)( getticks() - tic ) / CPU_TPS * 1000 ); + // message( "enqueueing tasks took %.3f ms." , (double)( getticks() - tic ) / CPU_TPS * 1000 ); } @@ -719,7 +737,8 @@ void scheduler_enqueue ( struct scheduler *s , struct task *t ) { /* Otherwise, look for a suitable queue. */ else { - /* Find the previous owner for each task type. */ + /* Find the previous owner for each task type, and do + any pre-processing needed. */ switch ( t->type ) { case task_type_self: case task_type_sort: @@ -734,10 +753,37 @@ void scheduler_enqueue ( struct scheduler *s , struct task *t ) { ( qid < 0 || s->queues[qid].count > s->queues[t->cj->super->owner].count ) ) qid = t->cj->super->owner; break; + case task_type_recv_xv: + case task_type_recv_rho: + #ifdef WITH_MPI + if ( MPI_Irecv( t->ci->parts , sizeof(struct part) * t->ci->count , MPI_BYTE , t->ci->nodeID , t->flags , MPI_COMM_WORLD , &t->req ) != MPI_SUCCESS ) + error( "Failed to emit irecv for particle data." ); + // message( "recieving %i parts with tag=%i from %i to %i." , + // t->ci->count , t->flags , t->ci->nodeID , s->nodeID ); fflush(stdout); + qid = -1; + #else + error( "SWIFT was not compiled with MPI support." ); + #endif + break; + case task_type_send_xv: + case task_type_send_rho: + #ifdef WITH_MPI + if ( MPI_Isend( t->ci->parts , sizeof(struct part) * t->ci->count , MPI_BYTE , t->cj->nodeID , t->flags , MPI_COMM_WORLD , &t->req ) != MPI_SUCCESS ) + error( "Failed to emit isend for particle data." ); + // message( "sending %i parts with tag=%i from %i to %i." , + // t->ci->count , t->flags , s->nodeID , t->cj->nodeID ); fflush(stdout); + qid = -1; + #else + error( "SWIFT was not compiled with MPI support." ); + #endif + break; default: qid = -1; } - + + if ( qid >= s->nr_queues ) + error( "Bad computed qid." ); + /* If no previous owner, find the shortest queue. */ if ( qid < 0 ) qid = rand() % s->nr_queues; @@ -815,6 +861,51 @@ struct task *scheduler_done ( struct scheduler *s , struct task *t ) { } +/** + * @brief Resolve a single dependency by hand. + * + * @param s The #scheduler. + * @param t The dependent #task. + * + * @return A pointer to the next task, if a suitable one has + * been identified. + */ + +struct task *scheduler_unlock ( struct scheduler *s , struct task *t ) { + + int k, res; + struct task *t2, *next = NULL; + + /* Loop through the dependencies and add them to a queue if + they are ready. */ + for ( k = 0 ; k < t->nr_unlock_tasks ; k++ ) { + t2 = t->unlock_tasks[k]; + if ( ( res = atomic_dec( &t2->wait ) ) < 1 ) + error( "Negative wait!" ); + if ( res == 1 && !t2->skip ) + scheduler_enqueue( s , t2 ); + } + + /* Task definitely done. */ + if ( !t->implicit ) { + t->toc = getticks(); + pthread_mutex_lock( &s->sleep_mutex ); + if ( next == NULL ) + atomic_dec( &s->waiting ); + pthread_cond_broadcast( &s->sleep_cond ); + pthread_mutex_unlock( &s->sleep_mutex ); + } + + /* Start the clock on the follow-up task. */ + if ( next != NULL ) + next->tic = getticks(); + + /* Return the next best task. */ + return next; + + } + + /** * @brief Get a task, preferably from the given queue. * @@ -872,12 +963,14 @@ struct task *scheduler_gettask ( struct scheduler *s , int qid , struct cell *su } /* If we failed, take a short nap. */ - if ( res == NULL ) { - pthread_mutex_lock( &s->sleep_mutex ); - if ( s->waiting > 0 ) - pthread_cond_wait( &s->sleep_cond , &s->sleep_mutex ); - pthread_mutex_unlock( &s->sleep_mutex ); - } + #ifndef WITH_MPI + if ( res == NULL ) { + pthread_mutex_lock( &s->sleep_mutex ); + if ( s->waiting > 0 ) + pthread_cond_wait( &s->sleep_cond , &s->sleep_mutex ); + pthread_mutex_unlock( &s->sleep_mutex ); + } + #endif } @@ -901,7 +994,7 @@ struct task *scheduler_gettask ( struct scheduler *s , int qid , struct cell *su * @param flags The #scheduler flags. */ -void scheduler_init ( struct scheduler *s , struct space *space , int nr_queues , unsigned int flags ) { +void scheduler_init ( struct scheduler *s , struct space *space , int nr_queues , unsigned int flags , int nodeID ) { int k; @@ -925,6 +1018,7 @@ void scheduler_init ( struct scheduler *s , struct space *space , int nr_queues s->nr_queues = nr_queues; s->flags = flags; s->space = space; + s->nodeID = nodeID; /* Init other values. */ s->tasks = NULL; diff --git a/src/scheduler.h b/src/scheduler.h index cc31da2468bee44d97e126b5029a5217827d7382..511e474358a1c0b5533dbf0e17b0dee1c94d8348 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -63,12 +63,15 @@ struct scheduler { /* The space associated with this scheduler. */ struct space *space; + + /* The node we are working on. */ + int nodeID; }; /* Function prototypes. */ -void scheduler_init ( struct scheduler *s , struct space *space , int nr_queues , unsigned int flags ); +void scheduler_init ( struct scheduler *s , struct space *space , int nr_queues , unsigned int flags , int nodeID ); struct task *scheduler_gettask ( struct scheduler *s , int qid , struct cell *super ); void scheduler_enqueue ( struct scheduler *s , struct task *t ); void scheduler_start ( struct scheduler *s , unsigned int mask ); @@ -81,4 +84,4 @@ void scheduler_map_mkghosts ( struct cell *c , void *data ); void scheduler_map_mkghosts_nokick1 ( struct cell *c , void *data ); void scheduler_map_mkkick1 ( struct cell *c , void *data ); struct task *scheduler_done ( struct scheduler *s , struct task *t ); - +struct task *scheduler_unlock ( struct scheduler *s , struct task *t ); diff --git a/src/space.c b/src/space.c index a1a933fe21852704652c4b3370a047623b5cea2e..95910efe320977b4823768d941d875ff37c8095f 100644 --- a/src/space.c +++ b/src/space.c @@ -30,15 +30,23 @@ #include <math.h> #include <omp.h> +/* MPI headers. */ +#ifdef WITH_MPI + #include <mpi.h> +#endif + /* Local headers. */ +#include "const.h" #include "cycle.h" #include "atomic.h" #include "lock.h" #include "task.h" #include "kernel.h" #include "part.h" -#include "cell.h" #include "space.h" +#include "cell.h" +#include "scheduler.h" +#include "engine.h" #include "runner.h" #include "error.h" @@ -144,29 +152,22 @@ void space_rebuild_recycle ( struct space *s , struct cell *c ) { } } - + + /** - * @brief Re-build the cells as well as the tasks. - * - * @param s The #space in which to update the cells. - * @param cell_max Maximal cell size. + * @brief Re-build the cell grid. * + * @param s The #space. + * @param cell_max Maximum cell edge length. */ -void space_rebuild ( struct space *s , double cell_max ) { +void space_regrid ( struct space *s , double cell_max ) { float h_max = s->cell_min / kernel_gamma, dmin; int i, j, k, cdim[3], nr_parts = s->nr_parts; struct cell *restrict c; - struct part *restrict finger, *restrict p, *parts = s->parts; - struct xpart *xfinger, *xparts = s->xparts; - int *ind; - double ih[3], dim[3]; // ticks tic; - /* Be verbose about this. */ - // printf( "space_rebuild: (re)building space...\n" ); fflush(stdout); - /* Run through the parts and get the current h_max. */ // tic = getticks(); if ( s->cells != NULL ) { @@ -182,13 +183,19 @@ void space_rebuild ( struct space *s , double cell_max ) { } s->h_max = h_max; } - // printf( "space_rebuild: h_max is %.3e (cell_max=%.3e).\n" , h_max , cell_max ); - // printf( "space_rebuild: getting h_min and h_max took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 ); + // message( "h_max is %.3e (cell_max=%.3e)." , h_max , cell_max ); + // message( "getting h_min and h_max took %.3f ms." , (double)(getticks() - tic) / CPU_TPS * 1000 ); /* Get the new putative cell dimensions. */ for ( k = 0 ; k < 3 ; k++ ) cdim[k] = floor( s->dim[k] / fmax( h_max*kernel_gamma*space_stretch , cell_max ) ); + /* In MPI-Land, we're not allowed to change the top-level cell size. */ + #ifdef WITH_MPI + if ( cdim[0] < s->cdim[0] || cdim[1] < s->cdim[1] || cdim[2] < s->cdim[2] ) + error( "Root-level change of cell size not allowed." ); + #endif + /* Do we need to re-build the upper-level cells? */ // tic = getticks(); if ( s->cells == NULL || @@ -236,10 +243,10 @@ void space_rebuild ( struct space *s , double cell_max ) { } /* Be verbose about the change. */ - printf( "space_rebuild: set cell dimensions to [ %i %i %i ].\n" , cdim[0] , cdim[1] , cdim[2] ); fflush(stdout); + message( "set cell dimensions to [ %i %i %i ]." , cdim[0] , cdim[1] , cdim[2] ); fflush(stdout); } /* re-build upper-level cells? */ - // printf( "space_rebuild: rebuilding upper-level cells took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 ); + // message( "rebuilding upper-level cells took %.3f ms." , (double)(getticks() - tic) / CPU_TPS * 1000 ); /* Otherwise, just clean up the cells. */ else { @@ -250,6 +257,7 @@ void space_rebuild ( struct space *s , double cell_max ) { s->cells[k].sorts = NULL; s->cells[k].nr_tasks = 0; s->cells[k].nr_density = 0; + s->cells[k].nr_force = 0; s->cells[k].dx_max = 0.0f; s->cells[k].sorted = 0; s->cells[k].count = 0; @@ -260,9 +268,135 @@ void space_rebuild ( struct space *s , double cell_max ) { } + } + + +/** + * @brief Re-build the cells as well as the tasks. + * + * @param s The #space in which to update the cells. + * @param cell_max Maximal cell size. + * + */ + +void space_rebuild ( struct space *s , double cell_max ) { + + float h_max = s->cell_min / kernel_gamma, dmin; + int i, j, k, cdim[3], nr_parts = s->nr_parts; + struct cell *restrict c, *restrict cells = s->cells; + struct part *restrict finger, *restrict p, *parts = s->parts; + struct xpart *xfinger, *xparts = s->xparts; + int *ind; + double ih[3], dim[3]; + // ticks tic; + + /* Be verbose about this. */ + // message( "re)building space..." ); fflush(stdout); + + /* Run through the parts and get the current h_max. */ + // tic = getticks(); + if ( cells != NULL ) { + for ( k = 0 ; k < s->nr_cells ; k++ ) { + if ( cells[k].h_max > h_max ) + h_max = cells[k].h_max; + } + } + else { + for ( k = 0 ; k < nr_parts ; k++ ) { + if ( s->parts[k].h > h_max ) + h_max = s->parts[k].h; + } + s->h_max = h_max; + } + // message( "h_max is %.3e (cell_max=%.3e)." , h_max , cell_max ); + // message( "getting h_min and h_max took %.3f ms." , (double)(getticks() - tic) / CPU_TPS * 1000 ); + + /* Get the new putative cell dimensions. */ + for ( k = 0 ; k < 3 ; k++ ) + cdim[k] = floor( s->dim[k] / fmax( h_max*kernel_gamma*space_stretch , cell_max ) ); + + /* In MPI-Land, we're not allowed to change the top-level cell size. */ + #ifdef WITH_MPI + if ( cdim[0] < s->cdim[0] || cdim[1] < s->cdim[1] || cdim[2] < s->cdim[2] ) + error( "Root-level change of cell size not allowed." ); + #endif + + /* Do we need to re-build the upper-level cells? */ + // tic = getticks(); + if ( cells == NULL || + cdim[0] < s->cdim[0] || cdim[1] < s->cdim[1] || cdim[2] < s->cdim[2] ) { + + /* Free the old cells, if they were allocated. */ + if ( cells != NULL ) { + for ( k = 0 ; k < s->nr_cells ; k++ ) { + space_rebuild_recycle( s , &cells[k] ); + if ( cells[k].sort != NULL ) + free( cells[k].sort ); + } + free( cells ); + s->maxdepth = 0; + } + + /* Set the new cell dimensions only if smaller. */ + for ( k = 0 ; k < 3 ; k++ ) { + s->cdim[k] = cdim[k]; + s->h[k] = s->dim[k] / cdim[k]; + s->ih[k] = 1.0 / s->h[k]; + } + dmin = fminf( s->h[0] , fminf( s->h[1] , s->h[2] ) ); + + /* Allocate the highest level of cells. */ + s->tot_cells = s->nr_cells = cdim[0] * cdim[1] * cdim[2]; + if ( posix_memalign( (void *)&cells , 64 , s->nr_cells * sizeof(struct cell) ) != 0 ) + error( "Failed to allocate cells." ); + s->cells = cells; + bzero( cells , s->nr_cells * sizeof(struct cell) ); + for ( k = 0 ; k < s->nr_cells ; k++ ) + if ( lock_init( &cells[k].lock ) != 0 ) + error( "Failed to init spinlock." ); + + /* Set the cell location and sizes. */ + for ( i = 0 ; i < cdim[0] ; i++ ) + for ( j = 0 ; j < cdim[1] ; j++ ) + for ( k = 0 ; k < cdim[2] ; k++ ) { + c = &cells[ cell_getid( cdim , i , j , k ) ]; + c->loc[0] = i*s->h[0]; c->loc[1] = j*s->h[1]; c->loc[2] = k*s->h[2]; + c->h[0] = s->h[0]; c->h[1] = s->h[1]; c->h[2] = s->h[2]; + c->dmin = dmin; + c->depth = 0; + c->count = 0; + lock_init( &c->lock ); + } + + /* Be verbose about the change. */ + message( "set cell dimensions to [ %i %i %i ]." , cdim[0] , cdim[1] , cdim[2] ); fflush(stdout); + + } /* re-build upper-level cells? */ + // message( "rebuilding upper-level cells took %.3f ms." , (double)(getticks() - tic) / CPU_TPS * 1000 ); + + /* Otherwise, just clean up the cells. */ + else { + + /* Free the old cells, if they were allocated. */ + for ( k = 0 ; k < s->nr_cells ; k++ ) { + space_rebuild_recycle( s , &cells[k] ); + cells[k].sorts = NULL; + cells[k].nr_tasks = 0; + cells[k].nr_density = 0; + cells[k].nr_force = 0; + cells[k].dx_max = 0.0f; + cells[k].sorted = 0; + cells[k].count = 0; + cells[k].kick1 = NULL; + cells[k].kick2 = NULL; + } + s->maxdepth = 0; + + } + /* Run through the particles and get their cell index. */ // tic = getticks(); - if ( ( ind = (int *)malloc( sizeof(int) * s->nr_parts ) ) == NULL ) + if ( ( ind = (int *)malloc( sizeof(int) * s->size_parts ) ) == NULL ) error( "Failed to allocate temporary particle indices." ); ih[0] = s->ih[0]; ih[1] = s->ih[1]; ih[2] = s->ih[2]; dim[0] = s->dim[0]; dim[1] = s->dim[1]; dim[2] = s->dim[2]; @@ -276,14 +410,44 @@ void space_rebuild ( struct space *s , double cell_max ) { else if ( p->x[j] >= dim[j] ) p->x[j] -= dim[j]; ind[k] = cell_getid( cdim , p->x[0]*ih[0] , p->x[1]*ih[1] , p->x[2]*ih[2] ); - atomic_inc( &s->cells[ ind[k] ].count ); + atomic_inc( &cells[ ind[k] ].count ); } - // printf( "space_rebuild: getting particle indices took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 ); + // message( "getting particle indices took %.3f ms." , (double)(getticks() - tic) / CPU_TPS * 1000 ); + + + /* Move non-local parts to the end of the list. */ + #ifdef WITH_MPI + int nodeID = s->e->nodeID; + for ( k = 0 ; k < nr_parts ; k++ ) + if ( cells[ ind[k] ].nodeID != nodeID ) { + cells[ ind[k] ].count -= 1; + nr_parts -= 1; + struct part tp = parts[k]; + parts[k] = parts[ nr_parts ]; + parts[ nr_parts ] = tp; + struct xpart txp = xparts[k]; + xparts[k] = xparts[ nr_parts ]; + xparts[ nr_parts ] = txp; + int t = ind[k]; + ind[k] = ind[ nr_parts ]; + ind[ nr_parts ] = t; + } + s->nr_parts = nr_parts + engine_exchange_strays( s->e , &parts[nr_parts] , &xparts[nr_parts] , &ind[nr_parts] , s->nr_parts - nr_parts ); + for ( k = nr_parts ; k < s->nr_parts ; k++ ) { + p = &parts[k]; + ind[k] = cell_getid( cdim , p->x[0]*ih[0] , p->x[1]*ih[1] , p->x[2]*ih[2] ); + cells[ ind[k] ].count += 1; + if ( cells[ ind[k] ].nodeID != nodeID ) + error( "Received part that does not belong to me (nodeID=%i)." , cells[ ind[k] ].nodeID ); + } + nr_parts = s->nr_parts; + #endif + /* Sort the parts according to their cells. */ // tic = getticks(); - parts_sort( parts , xparts , ind , s->nr_parts , 0 , s->nr_cells-1 ); - // printf( "space_rebuild: parts_sort took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 ); + parts_sort( parts , xparts , ind , nr_parts , 0 , s->nr_cells-1 ); + // message( "parts_sort took %.3f ms." , (double)(getticks() - tic) / CPU_TPS * 1000 ); /* Verify sort. */ /* for ( k = 1 ; k < nr_parts ; k++ ) { @@ -302,13 +466,13 @@ void space_rebuild ( struct space *s , double cell_max ) { finger = parts; xfinger = xparts; for ( k = 0 ; k < s->nr_cells ; k++ ) { - c = &s->cells[ k ]; + c = &cells[ k ]; c->parts = finger; c->xparts = xfinger; finger = &finger[ c->count ]; xfinger = &xfinger[ c->count ]; } - // printf( "space_rebuild: hooking up cells took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 ); + // message( "hooking up cells took %.3f ms." , (double)(getticks() - tic) / CPU_TPS * 1000 ); /* At this point, we have the upper-level cells, old or new. Now make sure that the parts in each cell are ok. */ @@ -320,13 +484,13 @@ void space_rebuild ( struct space *s , double cell_max ) { while ( 1 ) { int myk = atomic_inc( &k ); if ( myk < s->nr_cells ) - space_split( s , &s->cells[myk] ); + space_split( s , &cells[myk] ); else break; } } - // printf( "space_rebuild: space_split took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 ); - + // message( "space_split took %.3f ms." , (double)(getticks() - tic) / CPU_TPS * 1000 ); + } @@ -387,7 +551,7 @@ void parts_sort ( struct part *parts , struct xpart *xparts , int *ind , int N , min = qstack[qid].min; max = qstack[qid].max; qstack[qid].ready = 0; - // printf( "parts_sort_par: thread %i got interval [%i,%i] with values in [%i,%i].\n" , omp_get_thread_num() , i , j , min , max ); + // message( "thread %i got interval [%i,%i] with values in [%i,%i]." , omp_get_thread_num() , i , j , min , max ); /* Loop over sub-intervals. */ while ( 1 ) { @@ -412,12 +576,12 @@ void parts_sort ( struct part *parts , struct xpart *xparts , int *ind , int N , /* Verify sort. */ /* for ( int k = i ; k <= jj ; k++ ) if ( ind[k] > pivot ) { - printf( "parts_sort: sorting failed at k=%i, ind[k]=%i, pivot=%i, i=%i, j=%i, N=%i.\n" , k , ind[k] , pivot , i , j , N ); + message( "sorting failed at k=%i, ind[k]=%i, pivot=%i, i=%i, j=%i, N=%i." , k , ind[k] , pivot , i , j , N ); error( "Partition failed (<=pivot)." ); } for ( int k = jj+1 ; k <= j ; k++ ) if ( ind[k] <= pivot ) { - printf( "parts_sort: sorting failed at k=%i, ind[k]=%i, pivot=%i, i=%i, j=%i, N=%i.\n" , k , ind[k] , pivot , i , j , N ); + message( "sorting failed at k=%i, ind[k]=%i, pivot=%i, i=%i, j=%i, N=%i." , k , ind[k] , pivot , i , j , N ); error( "Partition failed (>pivot)." ); } */ @@ -675,6 +839,7 @@ void space_split ( struct space *s , struct cell *c ) { temp->split = 0; temp->h_max = 0.0; temp->dx_max = 0.0; + temp->nodeID = c->nodeID; temp->parent = c; c->progeny[k] = temp; } @@ -737,7 +902,7 @@ void space_split ( struct space *s , struct cell *c ) { } /* Set ownership accorind to the start of the parts array. */ - c->owner = ( c->parts - s->parts ) * s->nr_queues / s->nr_parts; + c->owner = ( ( c->parts - s->parts ) % s->nr_parts ) * s->nr_queues / s->nr_parts; } @@ -809,17 +974,11 @@ struct cell *space_getcell ( struct space *s ) { lock_unlock_blind( &s->lock ); /* Init some things in the cell. */ - c->sorts = NULL; - c->nr_tasks = 0; - c->nr_density = 0; - c->dx_max = 0.0f; - c->sorted = 0; - c->count = 0; - c->kick1 = NULL; - c->kick2 = NULL; + bzero( c , sizeof(struct cell) ); + c->nodeID = -1; + c->owner = -1; if ( lock_init( &c->lock ) != 0 ) error( "Failed to initialize cell spinlock." ); - c->owner = -1; return c; @@ -849,6 +1008,7 @@ void space_init ( struct space *s , double dim[3] , struct part *parts , int N , s->dim[0] = dim[0]; s->dim[1] = dim[1]; s->dim[2] = dim[2]; s->periodic = periodic; s->nr_parts = N; + s->size_parts = N; s->parts = parts; s->cell_min = h_max; s->nr_queues = 1; @@ -873,7 +1033,7 @@ void space_init ( struct space *s , double dim[3] , struct part *parts , int N , error( "Failed to create space spin-lock." ); /* Build the cells and the tasks. */ - space_rebuild( s , h_max ); + space_regrid( s , h_max ); } diff --git a/src/space.h b/src/space.h index 12f0ad97c96a00142b8254841dfd7335b678b343..8fb66d564c24236d02675646a25a7f32e356aa37 100644 --- a/src/space.h +++ b/src/space.h @@ -85,7 +85,7 @@ struct space { struct xpart *xparts; /* The total number of parts in the space. */ - int nr_parts; + int nr_parts, size_parts; /* Is the space periodic? */ int periodic; @@ -96,6 +96,9 @@ struct space { /* Number of queues in the system. */ int nr_queues; + /* The associated engine. */ + struct engine *e; + }; diff --git a/src/swift.h b/src/swift.h index b46968e9d8125b471b341a9350f6df0bd1bee027..a68aa1feb70d49e07bdaae5bd8572aa2307fa7b9 100644 --- a/src/swift.h +++ b/src/swift.h @@ -21,6 +21,8 @@ #include "../config.h" /* Local headers. */ +#include "const.h" +#include "error.h" #include "cycle.h" #include "timers.h" #include "const.h" diff --git a/src/task.c b/src/task.c index 549dbd2ee241547699bfe759007752f8afd1c5df..c4836f807f2301b4ee417e8d5bd08765c487d5e0 100644 --- a/src/task.c +++ b/src/task.c @@ -31,16 +31,26 @@ #include <omp.h> #include <sched.h> +/* MPI headers. */ +#ifdef WITH_MPI + #include <mpi.h> +#endif + /* Local headers. */ +#include "const.h" #include "cycle.h" #include "atomic.h" #include "lock.h" +#include "space.h" #include "cell.h" #include "task.h" #include "error.h" /* Task type names. */ -const char *taskID_names[task_type_count] = { "none" , "sort" , "self" , "pair" , "sub" , "ghost" , "kick1" , "kick2" }; +const char *taskID_names[task_type_count] = { + "none" , "sort" , "self" , "pair" , "sub" , "ghost" , + "kick1" , "kick2" , "send_xv" , "recv_xv" , "send_rho" , + "recv_rho" }; /** @@ -79,8 +89,30 @@ int task_lock ( struct task *t ) { int type = t->type; struct cell *ci = t->ci, *cj = t->cj; + /* Communication task? */ + if ( type == task_type_recv_xv || type == task_type_recv_rho || + type == task_type_send_xv || type == task_type_send_rho ) { + + #ifdef WITH_MPI + /* Check the status of the MPI request. */ + int res, err; + MPI_Status stat; + if ( ( err = MPI_Test( &t->req , &res , &stat ) ) != MPI_SUCCESS ) { + char buff[ MPI_MAX_ERROR_STRING ]; + int len; + MPI_Error_string( err , buff , &len ); + message( "MPI error: %s\n" , buff ); fflush(stdout); + error( "Failed to test request on send/recv task." ); + } + return res; + #else + error( "SWIFT was not compiled with MPI support." ); + #endif + + } + /* Unary lock? */ - if ( type == task_type_self || + else if ( type == task_type_self || type == task_type_sort || (type == task_type_sub && cj == NULL) ) { if ( cell_locktree( ci ) != 0 ) diff --git a/src/task.h b/src/task.h index 41a5d16b2b687a2abaf74d0dc85dd72325c670b1..5921d88b05daaed2bf23e7658c4cdd41e473202a 100644 --- a/src/task.h +++ b/src/task.h @@ -20,7 +20,7 @@ /* Some constants. */ #define task_maxwait 3 -#define task_maxunlock 40 +#define task_maxunlock 46 /* The different task types. */ @@ -33,11 +33,16 @@ enum task_types { task_type_ghost, task_type_kick1, task_type_kick2, + task_type_send_xv, + task_type_recv_xv, + task_type_send_rho, + task_type_recv_rho, task_type_count }; extern const char *taskID_names[]; + /* The different task sub-types. */ enum task_subtypes { task_subtype_none = 0, @@ -48,6 +53,7 @@ enum task_subtypes { extern const char *taskID_names[]; + /* Data of a task. */ struct task { @@ -58,6 +64,10 @@ struct task { struct cell *ci, *cj; + #ifdef WITH_MPI + MPI_Request req; + #endif + int rid; ticks tic, toc; diff --git a/src/vector.h b/src/vector.h index 1ae0cae290bc100da85b7932c922c4a475aee899..a667764de0264a17d0c8139b18d9ca9315bd9385 100644 --- a/src/vector.h +++ b/src/vector.h @@ -27,7 +27,7 @@ #define VEC_MACRO(elcount, type) __attribute__((vector_size((elcount)*sizeof(type)))) type /* So what will the vector size be? */ - #ifdef NO__AVX__ + #ifdef __AVX__ #define VECTORIZE #define VEC_SIZE 8 #define VEC_FLOAT __m256