Commit 11b644b4 authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

simplify barrier, allow specifying how many runners to start.


Former-commit-id: 2a2a82fe0d23957746dbfaa09f3a332cb51791b1
parent 0c135c44
......@@ -326,21 +326,21 @@ void engine_prepare ( struct engine *e ) {
/* Run through the tasks and mark as skip or not. */
// tic = getticks();
rebuild = 1 || ( e->step == 0 || engine_marktasks( e ) );
rebuild = ( e->step == 0 || engine_marktasks( e ) );
// printf( "space_prepare: space_marktasks took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 );
/* Did this not go through? */
if ( rebuild ) {
/* Re-build the space. */
tic = getticks();
// tic = getticks();
space_rebuild( e->s , 0.0 );
printf( "engine_prepare: space_rebuild took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 );
// printf( "engine_prepare: space_rebuild took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 );
/* Re-build the tasks. */
tic = getticks();
// tic = getticks();
engine_maketasks( e );
printf( "engine_prepare: engine_maketasks took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 );
// printf( "engine_prepare: engine_maketasks took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 );
/* Run through the tasks and mark as skip or not. */
// tic = getticks();
......@@ -369,35 +369,31 @@ void engine_prepare ( struct engine *e ) {
* @param e The #engine.
*/
void engine_barrier( struct engine *e ) {
void engine_barrier ( struct engine *e ) {
/* First, get the barrier mutex. */
if ( pthread_mutex_lock( &e->barrier_mutex ) != 0 )
error( "Failed to get barrier mutex." );
/* Wait for the barrier to close. */
while ( e->barrier_count < 0 )
if ( pthread_cond_wait( &e->barrier_cond , &e->barrier_mutex ) != 0 )
error( "Eror waiting for barrier to close." );
/* This thread is no longer running. */
e->barrier_running -= 1;
/* Once I'm in, increase the barrier count. */
e->barrier_count += 1;
/* If all threads are in, send a signal... */
if ( e->barrier_count == e->nr_threads )
if ( e->barrier_running == 0 )
if ( pthread_cond_broadcast( &e->barrier_cond ) != 0 )
error( "Failed to broadcast barrier full condition." );
/* Wait for barrier to be released. */
while ( e->barrier_count > 0 )
/* Wait for the barrier to open. */
while ( e->barrier_launch == 0 )
if ( pthread_cond_wait( &e->barrier_cond , &e->barrier_mutex ) != 0 )
error( "Error waiting for barrier to be released." );
/* Decrease the counter before leaving... */
e->barrier_count += 1;
error( "Eror waiting for barrier to close." );
/* This thread has been launched. */
e->barrier_running += 1;
e->barrier_launch -= 1;
/* If I'm the last one out, signal the condition again. */
if ( e->barrier_count == 0 )
if ( e->barrier_launch == 0 )
if ( pthread_cond_broadcast( &e->barrier_cond ) != 0 )
error( "Failed to broadcast empty barrier condition." );
......@@ -556,6 +552,28 @@ void engine_single_force ( double *dim , long long int pid , struct part *__rest
fflush(stdout);
}
/**
* @breif Launch the runners.
*
* @param e The #engine.
* @param nr_runners The number of #runners to let loose.
*/
void engine_launch ( struct engine *e , int nr_runners ) {
/* Cry havoc and let loose the dogs of war. */
e->barrier_launch = nr_runners;
if ( pthread_cond_broadcast( &e->barrier_cond ) != 0 )
error( "Failed to broadcast barrier open condition." );
/* Sit back and wait for the runners to come home. */
while ( e->barrier_launch || e->barrier_running )
if ( pthread_cond_wait( &e->barrier_cond , &e->barrier_mutex ) != 0 )
error( "Error while waiting for barrier." );
}
/**
......@@ -597,12 +615,7 @@ void engine_step ( struct engine *e ) {
/* First kick. */
TIMER_TIC
scheduler_start( &e->sched , (1 << task_type_kick1) );
e->barrier_count = -e->barrier_count;
if ( pthread_cond_broadcast( &e->barrier_cond ) != 0 )
error( "Failed to broadcast barrier open condition." );
while ( e->barrier_count < e->nr_threads )
if ( pthread_cond_wait( &e->barrier_cond , &e->barrier_mutex ) != 0 )
error( "Error while waiting for barrier." );
engine_launch( e , ( e->nr_threads > 16 ) ? 16 : e->nr_threads );
TIMER_TOC( timer_kick1 );
/* Check if all the kick1 threads have executed. */
......@@ -620,22 +633,11 @@ void engine_step ( struct engine *e ) {
// engine_single_density( e->s->dim , 3392063069037 , e->s->parts , e->s->nr_parts , e->s->periodic );
/* Start the clock. */
/* Send off the runners. */
TIMER_TIC_ND
/* Cry havoc and let loose the dogs of war. */
e->barrier_count = -e->barrier_count;
if ( pthread_cond_broadcast( &e->barrier_cond ) != 0 )
error( "Failed to broadcast barrier open condition." );
/* Sit back and wait for the runners to come home. */
while ( e->barrier_count < e->nr_threads )
if ( pthread_cond_wait( &e->barrier_cond , &e->barrier_mutex ) != 0 )
error( "Error while waiting for barrier." );
/* Stop the clock. */
engine_launch( e , e->nr_threads );
TIMER_TOC(timer_runners);
// engine_single_force( e->s->dim , 8328423931905 , e->s->parts , e->s->nr_parts , e->s->periodic );
// for(k=0; k<10; ++k)
......@@ -746,7 +748,8 @@ void engine_init ( struct engine *e , struct space *s , float dt , int nr_thread
error( "Failed to initialize barrier condition variable." );
if ( pthread_mutex_lock( &e->barrier_mutex ) != 0 )
error( "Failed to lock barrier mutex." );
e->barrier_count = 0;
e->barrier_running = 0;
e->barrier_launch = 0;
/* Run through the parts and get the minimum time step. */
e->dt_orig = dt;
......@@ -773,6 +776,7 @@ void engine_init ( struct engine *e , struct space *s , float dt , int nr_thread
for ( k = 0 ; k < nr_threads ; k++ ) {
e->runners[k].id = k;
e->runners[k].e = e;
e->barrier_running += 1;
if ( pthread_create( &e->runners[k].thread , NULL , &runner_main , &e->runners[k] ) != 0 )
error( "Failed to create runner thread." );
#if defined(HAVE_SETAFFINITY)
......@@ -787,7 +791,7 @@ void engine_init ( struct engine *e , struct space *s , float dt , int nr_thread
}
/* Wait for the runner threads to be in place. */
while ( e->barrier_count != e->nr_threads )
while ( e->barrier_running || e->barrier_launch )
if ( pthread_cond_wait( &e->barrier_cond , &e->barrier_mutex ) != 0 )
error( "Error while waiting for runner threads to get in place." );
......
......@@ -74,7 +74,7 @@ struct engine {
/* Data for the threads' barrier. */
pthread_mutex_t barrier_mutex;
pthread_cond_t barrier_cond;
int barrier_count;
volatile int barrier_running, barrier_launch;
};
......
......@@ -703,10 +703,10 @@ void scheduler_done ( struct scheduler *s , struct task *t ) {
}
/* Task definitely done. */
// pthread_mutex_lock( &s->sleep_mutex );
pthread_mutex_lock( &s->sleep_mutex );
atomic_dec( &s->waiting );
// pthread_cond_broadcast( &s->sleep_cond );
// pthread_mutex_unlock( &s->sleep_mutex );
pthread_cond_broadcast( &s->sleep_cond );
pthread_mutex_unlock( &s->sleep_mutex );
}
......@@ -754,10 +754,10 @@ struct task *scheduler_gettask ( struct scheduler *s , int qid ) {
}
/* If we failed, take a short nap. */
/* pthread_mutex_lock( &s->sleep_mutex );
pthread_mutex_lock( &s->sleep_mutex );
if ( s->waiting > 0 )
pthread_cond_wait( &s->sleep_cond , &s->sleep_mutex );
pthread_mutex_unlock( &s->sleep_mutex ); */
pthread_mutex_unlock( &s->sleep_mutex );
}
......
......@@ -161,10 +161,10 @@ void space_rebuild ( struct space *s , double cell_max ) {
struct part *restrict finger, *restrict p, *parts = s->parts;
int *ind;
double ih[3], dim[3];
ticks tic;
// ticks tic;
/* Be verbose about this. */
printf( "space_rebuild: (re)building space...\n" ); fflush(stdout);
// printf( "space_rebuild: (re)building space...\n" ); fflush(stdout);
/* Run through the parts and get the current h_max. */
// tic = getticks();
......@@ -260,7 +260,7 @@ void space_rebuild ( struct space *s , double cell_max ) {
}
/* Run through the particles and get their cell index. */
tic = getticks();
// tic = getticks();
if ( ( ind = (int *)malloc( sizeof(int) * s->nr_parts ) ) == NULL )
error( "Failed to allocate temporary particle indices." );
ih[0] = s->ih[0]; ih[1] = s->ih[1]; ih[2] = s->ih[2];
......@@ -277,12 +277,12 @@ void space_rebuild ( struct space *s , double cell_max ) {
ind[k] = cell_getid( cdim , p->x[0]*ih[0] , p->x[1]*ih[1] , p->x[2]*ih[2] );
atomic_inc( &s->cells[ ind[k] ].count );
}
printf( "space_rebuild: getting particle indices took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 );
// printf( "space_rebuild: getting particle indices took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 );
/* Sort the parts according to their cells. */
tic = getticks();
// tic = getticks();
parts_sort( parts , ind , s->nr_parts , 0 , s->nr_cells-1 );
printf( "space_rebuild: parts_sort took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 );
// printf( "space_rebuild: parts_sort took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 );
/* Verify sort. */
/* for ( k = 1 ; k < nr_parts ; k++ ) {
......@@ -308,7 +308,7 @@ void space_rebuild ( struct space *s , double cell_max ) {
/* At this point, we have the upper-level cells, old or new. Now make
sure that the parts in each cell are ok. */
tic = getticks();
// tic = getticks();
k = 0;
#pragma omp parallel num_threads(8) shared(s,k)
{
......@@ -320,7 +320,7 @@ void space_rebuild ( struct space *s , double cell_max ) {
break;
}
}
printf( "space_rebuild: space_rebuild_recurse took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 );
// printf( "space_rebuild: space_split took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 );
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment