diff --git a/src/qsched.c b/src/qsched.c index ff183cf0b4664a53200eff68d4f2a8d65d13a2d1..082d3928e15345bf00eb0fffb135d6d3d3dd2983 100644 --- a/src/qsched.c +++ b/src/qsched.c @@ -324,7 +324,115 @@ void qsched_run_openmp ( struct qsched *s , int nr_threads , qsched_funtype fun } + +/** + * @brief Barrier function when running with pthreads. + * + * @param s Pointer to the #qsched. + * @param tid ID of the calling thread. This is needed in case + * we don't want to launch all threads. + */ + +void qsched_barrier_wait ( struct qsched *s , int tid ) { + +#if defined( HAVE_PTHREAD ) + /* First, get the barrier mutex. */ + if ( pthread_mutex_lock( &s->barrier_mutex ) != 0 ) + error( "Failed to get barrier mutex." ); + /* The callee's thread stops running. */ + s->barrier_running -= 1; + + /* If all threads are in, send a signal... */ + if ( s->barrier_running == 0 ) + if ( pthread_cond_broadcast( &s->barrier_cond ) != 0 ) + error( "Failed to broadcast barrier full condition." ); + + /* Wait for the barrier to open. */ + while ( s->barrier_count == 0 || tid >= s->barrier_launchcount ) + if ( pthread_cond_wait( &s->barrier_cond , &s->barrier_mutex ) != 0 ) + error( "Eror waiting for barrier to close." ); + + /* This thread is leaving, decrease the barrier count, increase + the number of threads running. */ + s->barrier_count -= 1; + s->barrier_running += 1; + + /* If I'm the last one out, signal the condition again. */ + if ( s->barrier_count == 0 ) + if ( pthread_cond_broadcast( &s->barrier_cond ) != 0 ) + error( "Failed to broadcast empty barrier condition." ); + + /* Last but not least, release the mutex. */ + if ( pthread_mutex_unlock( &s->barrier_mutex ) != 0 ) + error( "Failed to get unlock the barrier mutex." ); +#endif + + } + + +/** + * @brief Launch a given number of threads and wait for them to finish. + * + * @param s Pointer to the #qsched. + * @param nr_threads Number of threads to let through the barrier. + * + * Lets the specified number of threads through the barrier. Note that + * this assumes that at least that many threads are waiting there, + * otherwise this function will hang indefinitely. + */ +void qsched_launch_threads ( struct qsched *s , int nr_threads ) { + +#if defined( HAVE_PTHREAD ) + /* Wait for all the runners to have entered the barrier. */ + while ( s->barrier_running ) + if ( pthread_cond_wait( &s->barrier_cond , &s->barrier_mutex ) != 0 ) + error( "Error while waiting for barrier." ); + + /* Cry havoc and let loose the dogs of war. */ + s->barrier_count = nr_threads; + s->barrier_launchcount = nr_threads; + if ( pthread_cond_broadcast( &s->barrier_cond ) != 0 ) + error( "Failed to broadcast barrier open condition." ); + + /* Lean back and wait for the runners to come home. */ + while ( s->barrier_count || s->barrier_running ) + if ( pthread_cond_wait( &s->barrier_cond , &s->barrier_mutex ) != 0 ) + error( "Error while waiting for barrier." ); +#endif + + } + + +void *qsched_pthread_run ( void *in ) { + + struct qsched_pthread_runner *r = (struct qsched_pthread_runner *)in; + struct qsched *s = r->s; + int tid = r->tid; + struct task *t; + + /* Main loop. */ + while ( 1 ) { + + /* Wait at the barrier. */ + qsched_barrier_wait( s , tid ); + + /* Loop as long as there are tasks. */ + while ( ( t = qsched_gettask( s , tid ) ) != NULL ) { + + /* Call the user-supplied function on the task with its data. */ + s->fun( t->type , &s->data[ t->data ] ); + + /* Mark that task as done. */ + qsched_done( s , t ); + + } /* loop as long as there are tasks. */ + + } /* main loop. */ + + } + + /** * @brief Execute all the tasks in the current scheduler using * pthreads. @@ -342,46 +450,41 @@ void qsched_run_pthread ( struct qsched *s , int nr_threads , qsched_funtype fun #if defined( HAVE_PTHREAD ) - pthread_t threads[ nr_threads ]; - int k; - /* Prepare the scheduler. */ qsched_prepare( s ); - - /* The runner function. */ - void *runner ( void *data ) { - /* Local variable. */ - struct task *t; - - /* Extract the runner's qid. */ - int qid = (size_t)data; - - /* Loop as long as there are tasks. */ - while ( ( t = qsched_gettask( s , qid ) ) != NULL ) { - - /* Call the user-supplied function on the task with its data. */ - fun( t->type , &s->data[ t->data ] ); - - /* Mark that task as done. */ - qsched_done( s , t ); - - } /* loop as long as there are tasks. */ - - /* Bail. */ - return NULL; - - } /* runner function. */ - - /* Init and launch the pthreads. */ - for ( k = 0 ; k < nr_threads ; k++ ) - if ( pthread_create( &threads[k] , NULL , runner , (void *)(size_t)k ) != 0 ) + /* Make sure we have enough threads. */ + if ( nr_threads > s->runners_count ) { + + /* Reallocate the threads array? */ + if ( nr_threads > s->runners_size ) { + int runners_size_new = s->runners_size; + while ( runners_size_new < nr_threads ) + runners_size_new *= qsched_stretch; + struct qsched_pthread_runner *runners_new; + if ( ( runners_new = malloc( sizeof(struct qsched_pthread_runner) * runners_size_new ) ) == NULL ) + error( "Failed to allocate new thread array." ); + memcpy( runners_new , s->runners , sizeof(pthread_t) * s->runners_count ); + free( s->runners ); + s->runners = runners_new; + s->runners_size = runners_size_new; + } + + /* Launch the missing threads. */ + for ( int tid = s->runners_count ; tid < nr_threads ; tid++ ) { + s->barrier_running += 1; + s->runners[tid].tid = tid; + s->runners[tid].s = s; + if ( pthread_create( &s->runners[tid].thread , NULL , qsched_pthread_run , (void *)&s->runners[tid] ) != 0 ) error( "Failed to create pthread." ); - - /* Wait for the pthreads to come home. */ - for ( k = 0 ; k < nr_threads ; k++ ) - if ( pthread_join( threads[k] , NULL ) != 0 ) - error( "Failed to join on pthread." ); + } + } + + /* Set the runner function. */ + s->fun = fun; + + /* Launch the threads. */ + qsched_launch_threads( s , nr_threads ); #else error( "QuickSched was not compiled with pthread support." ); @@ -501,7 +604,8 @@ void qsched_done ( struct qsched *s , struct task *t ) { /* Set the task stats. */ t->toc = getticks(); - t->cost = t->toc - t->tic; + if (!(s->flags & qsched_flag_norecost)) + t->cost = t->toc - t->tic; /* Release this task's locks. */ for ( k = 0 ; k < t->nr_locks ; k++ ) @@ -1023,7 +1127,7 @@ void qsched_prepare ( struct qsched *s ) { /* Run through the topologically sorted tasks backwards and set their weights, re-setting the waits while we're at it. */ for ( k = count-1 ; k >= 0 ; k-- ) { - int maxweight = 0; + long long int maxweight = 0; t = &tasks[ tid[k] ]; for ( j = 0 ; j < t->nr_unlocks ; j++ ) { tasks[ t->unlocks[j] ].wait += 1; @@ -1489,7 +1593,19 @@ void qsched_init ( struct qsched *s , int nr_queues , int flags ) { #ifdef HAVE_PTHREAD if ( pthread_cond_init( &s->cond , NULL ) != 0 || pthread_mutex_init( &s->mutex , NULL ) != 0 ) - error( "Error initializing cond/mutex pair." ); + error( "Error initializing yield cond/mutex pair." ); + if ( pthread_cond_init( &s->barrier_cond , NULL ) != 0 || + pthread_mutex_init( &s->barrier_mutex , NULL ) != 0 ) + error( "Error initializing barrier cond/mutex pair." ); + s->runners_count = 0; + s->runners_size = qsched_init_runners; + if ( ( s->runners = malloc( sizeof(struct qsched_pthread_runner) * s->runners_size ) ) == NULL ) + error( "Failed to allocate runners." ); + s->barrier_running = 0; + s->barrier_count = 0; + s->barrier_launchcount = 0; + if ( pthread_mutex_lock( &s->barrier_mutex ) != 0 ) + error( "Failed to lock barrier mutex." ); #endif /* Clear the timers. */ diff --git a/src/qsched.h b/src/qsched.h index bd6c09ea4ba523a3af362c54466857252c1996c4..b9ee503cc4e1d5afd08179c7a56eaed516dcb2c1 100644 --- a/src/qsched.h +++ b/src/qsched.h @@ -24,6 +24,7 @@ #define qsched_flag_yield 4 #define qsched_flag_pthread 8 #define qsched_flag_noreown 16 +#define qsched_flag_norecost 32 /* Some sched-specific constants. */ #define qsched_stretch 2 @@ -33,6 +34,7 @@ #define qsched_init_usespertask 2 #define qsched_init_respertask 2 #define qsched_init_datapertask 8 +#define qsched_init_runners 16 #define qsched_data_round 16 #define qsched_res_none (-1) #define qsched_task_none (-1) @@ -147,10 +149,15 @@ struct qsched { /* A lock for the sched itself. */ lock_type lock; - /* Pthread stuff for condition variable for yielding threads. */ + /* Pthread stuff for condition variable for the barrier and for + yielding threads. */ #ifdef HAVE_PTHREAD - pthread_cond_t cond; - pthread_mutex_t mutex; + pthread_cond_t cond, barrier_cond; + pthread_mutex_t mutex, barrier_mutex; + struct qsched_pthread_runner *runners; + int runners_count, runners_size; + int barrier_running, barrier_count, barrier_launchcount; + qsched_funtype fun; #endif /* Timers. */ @@ -160,6 +167,21 @@ struct qsched { }; +/* Data structure passed to pthread_create. */ +struct qsched_pthread_runner { + + /* The scheduler to which this thread is attached. */ + struct qsched *s; + + /* The thread itself. */ + pthread_t thread; + + /* The thread's ID. */ + int tid; + + }; + + /* Function prototypes. */ /* Internal functions. */