diff --git a/paper/paper.tex b/paper/paper.tex index 4d1236b1b66c8a18520276d237655151ce5c0546..f59a0c40b65868d8d7154bc0458c847096501c0e 100644 --- a/paper/paper.tex +++ b/paper/paper.tex @@ -942,7 +942,7 @@ a problem efficiently. The source code of both examples is distributed with the QuickSched library, along with scripts to run the benchmarks and generate the plots used in the following. -All examples were compiled with gcc v.\,4.8.2 using the +All examples were compiled with gcc v.\,5.2.0 using the {\tt -O2 -march=native} flags and run on a 64-core AMD Opteron 6376 machine at 2.67\,GHz. @@ -1015,7 +1015,8 @@ random matrix using tiles of size $64\times 64$ floats using QuickSched as described above. The task costs were initialized to the asymptotic cost of the underlying operations. -For this matrix, a total of 11\,440 tasks with 32\,240 dependencies +For this matrix, a total of 11\,440 tasks with 21\,824 dependencies, +as well as 1\,024 resources with 21\,856 locks and 11\,408 uses were generated. For these tests, {\tt pthread} parallelism and resource re-owning diff --git a/src/qsched.c b/src/qsched.c index f6ce8b26349b91fc1f64afcab843266a5b0dc274..dc60a6c657699a6aaa37cb7c360e5be8bc216927 100644 --- a/src/qsched.c +++ b/src/qsched.c @@ -299,14 +299,17 @@ void qsched_run_openmp ( struct qsched *s , int nr_threads , qsched_funtype fun /* Parallel loop. */ #pragma omp parallel num_threads( nr_threads ) { - /* Local variable. */ - struct task *t; + /* Current task ID. */ + qsched_task_t tid = qsched_task_none; /* Get the ID of the current thread. */ int qid = omp_get_thread_num() % s->nr_queues; /* Loop as long as there are tasks. */ - while ( ( t = qsched_gettask( s , qid ) ) != NULL ) { + while ( ( tid = qsched_gettask( s , qid , tid ) ) != qsched_task_none ) { + + /* Get a hold of the task structure itself. */ + struct task *t = &s->tasks[tid]; /* Call the user-supplied function on the task with its data. */ fun( t->type , &s->data[ t->data ] ); @@ -408,20 +411,25 @@ void *qsched_pthread_run ( void *in ) { struct qsched_pthread_runner *r = (struct qsched_pthread_runner *)in; struct qsched *s = r->s; - int tid = r->tid; - struct task *t; + int qid = r->tid; /* Main loop. */ while ( 1 ) { /* Wait at the barrier. */ - qsched_barrier_wait( s , tid ); + qsched_barrier_wait( s , qid ); /* If there is no function to execute, then just quit. */ if (s->fun == NULL) pthread_exit(NULL); + /* Current and last task ID. */ + qsched_task_t tid = qsched_task_none; + /* Loop as long as there are tasks. */ - while ( ( t = qsched_gettask( s , tid ) ) != NULL ) { + while ( ( tid = qsched_gettask( s , qid , tid ) ) != qsched_task_none ) { + + /* Get a hold of the task structure itself. */ + struct task *t = &s->tasks[tid]; /* Call the user-supplied function on the task with its data. */ s->fun( t->type , &s->data[ t->data ] ); @@ -798,6 +806,7 @@ void qsched_unlocktask ( struct qsched *s , int tid ) { * * @param s Pointer to the #qsched. * @param qid The queue to use. + * @param last The previously executed task or #qsched_task_none. * * @return A pointer to a task object. * @@ -806,7 +815,7 @@ void qsched_unlocktask ( struct qsched *s , int tid ) { * will require the #qsched to be re-prepared. */ -struct task *qsched_gettask ( struct qsched *s , int qid ) { +qsched_task_t qsched_gettask ( struct qsched *s , int qid , qsched_task_t last ) { int naq, k, tid, qids[ s->nr_queues ]; struct task *t; @@ -827,7 +836,7 @@ struct task *qsched_gettask ( struct qsched *s , int qid ) { /* Try to get a task from my own queue. */ { TIMER_TIC - tid = queue_get( &s->queues[qid] , s , 1 ); + tid = queue_get( &s->queues[qid] , s , 1 , last ); TIMER_TOC( s , qsched_timer_queue ) if ( tid < 0 ) { @@ -838,7 +847,7 @@ struct task *qsched_gettask ( struct qsched *s , int qid ) { while ( naq > 0 ) { k = rand() % naq; TIMER_TIC2 - tid = queue_get( &s->queues[ qids[k] ] , s , 0 ); + tid = queue_get( &s->queues[ qids[k] ] , s , 0 , last ); TIMER_TOC( s , qsched_timer_queue ) if ( tid < 0 ) qids[k] = qids[ --naq ]; @@ -869,7 +878,7 @@ struct task *qsched_gettask ( struct qsched *s , int qid ) { /* Return the task. */ TIMER_TOC( s , qsched_timer_gettask ) - return t; + return tid; } @@ -889,7 +898,7 @@ struct task *qsched_gettask ( struct qsched *s , int qid ) { /* Return empty-handed. No toc here as we don't want to count the final wait when all tasks have been executed. */ - return NULL; + return qsched_task_none; } @@ -1170,12 +1179,14 @@ void qsched_prepare ( struct qsched *s ) { * * @param s Pointer to the #qsched. * @param parent ID of the parent resource or #qsched_res_none if none. - * @param owner ID of the ower + * @param owner ID of the ower. + * @param data Pointer to the resources data, if available. + * @param size Number of bytes in this resource. * * @return The ID of the new shared resource. */ -int qsched_addres ( struct qsched *s , int owner , int parent ) { +int qsched_addres ( struct qsched *s , int owner , int parent , void *data , size_t size ) { struct res *res_new; int id; @@ -1213,6 +1224,8 @@ int qsched_addres ( struct qsched *s , int owner , int parent ) { s->res[ id ].hold = 0; s->res[ id ].owner = owner; s->res[ id ].parent = parent; + s->res[ id ].data = data; + s->res[ id ].size = size; /* Unlock the sched. */ lock_unlock_blind( &s->lock ); @@ -1626,7 +1639,7 @@ void qsched_init ( struct qsched *s , int nr_queues , int flags ) { pthread_mutex_init( &s->barrier_mutex , NULL ) != 0 ) error( "Error initializing barrier cond/mutex pair." ); s->runners_count = 0; - s->runners_size = qsched_init_runners; + s->runners_size = nr_queues > qsched_init_runners ? nr_queues : qsched_init_runners; if ( ( s->runners = malloc( sizeof(struct qsched_pthread_runner) * s->runners_size ) ) == NULL ) error( "Failed to allocate runners." ); s->barrier_running = 0; @@ -1634,6 +1647,16 @@ void qsched_init ( struct qsched *s , int nr_queues , int flags ) { s->barrier_launchcount = 0; if ( pthread_mutex_lock( &s->barrier_mutex ) != 0 ) error( "Failed to lock barrier mutex." ); + /* Launch the missing threads. */ + for ( int tid = 0 ; tid < nr_queues ; tid++ ) { + s->barrier_running += 1; + s->runners[tid].tid = tid; + s->runners[tid].s = s; + if ( pthread_create( &s->runners[tid].thread , NULL , qsched_pthread_run , (void *)&s->runners[tid] ) != 0 ) + error( "Failed to create pthread." ); + s->runners_count += 1; + } + } #endif diff --git a/src/qsched.h b/src/qsched.h index b9ee503cc4e1d5afd08179c7a56eaed516dcb2c1..8f1246bd8e33f1223056df2a6eb7a8f2e816c3a3 100644 --- a/src/qsched.h +++ b/src/qsched.h @@ -188,7 +188,7 @@ struct qsched_pthread_runner { void qsched_sort ( int *data , int *ind , int N , int min , int max ); void qsched_quicksort ( int *data , int *ind , int N , int min , int max ); void qsched_sort_rec ( int *data , int *ind , int N , int min , int max ); -struct task *qsched_gettask ( struct qsched *s , int qid ); +qsched_task_t qsched_gettask ( struct qsched *s , int qid, qsched_task_t last ); void qsched_done ( struct qsched *s , struct task *t ); void *qsched_getdata( struct qsched *s , struct task *t ); int qsched_lockres ( struct qsched *s , int rid ); @@ -200,7 +200,7 @@ void qsched_enqueue ( struct qsched *s , struct task *t ); /* External functions. */ void qsched_init ( struct qsched *s , int nr_queues , int flags ); -qsched_res_t qsched_addres ( struct qsched *s , int owner , qsched_res_t parent ); +qsched_res_t qsched_addres ( struct qsched *s , int owner , qsched_res_t parent , void *data , size_t size ); void qsched_addlock ( struct qsched *s , qsched_task_t t , qsched_res_t res ); void qsched_addunlock ( struct qsched *s , qsched_task_t ta , qsched_task_t tb ); qsched_task_t qsched_addtask ( struct qsched *s , int type , unsigned int flags , void *data , int data_size , int cost ); diff --git a/src/queue.c b/src/queue.c index 2b98592b746a71fa19a664efd7b36f0b5bdf3ae4..a5f748845eb940b9d4d43a1d942d7273a3f9f4c0 100644 --- a/src/queue.c +++ b/src/queue.c @@ -33,6 +33,73 @@ #include "task.h" #include "qsched.h" #include "queue.h" +#include "res.h" + +/** + * @brief Compute an overlap score for a pair of tasks. + * + * @param s The #qsched in which the tasks and their resources live. + * @param a Index of the first #task or #qsched_task_none. + * @param b Index of the second #task or #qsched_task_none. + * + * @return The Jaccard similarity of the resources of both tasks. + */ + +float queue_task_overlap ( struct qsched *s , qsched_task_t a , qsched_task_t b ) { + /* Trivial case, one of the tasks is NULL. */ + if (a == qsched_task_none || b == qsched_task_none) return 0.0f; + + /* Get a handle on the two tasks. */ + const struct task *ta = &s->tasks[a]; + const struct task *tb = &s->tasks[b]; + + /* Collect the locks/uses resource ids for both tasks. */ + const int nr_res_a = ta->nr_locks + ta->nr_uses; + const int nr_res_b = tb->nr_locks + tb->nr_uses; + if (nr_res_a == 0 || nr_res_b == 0) return nr_res_a == nr_res_b ? 1.0f : 0.0f; + struct res *res_a[nr_res_a], *res_b[nr_res_b]; + for (int k = 0; k < ta->nr_locks; k++) + res_a[k] = &s->res[s->locks[ta->locks[k]]]; + for (int k = 0; k < ta->nr_uses; k++) + res_a[ta->nr_locks + k] = &s->res[s->locks[ta->uses[k]]]; + for (int k = 0; k < tb->nr_locks; k++) + res_b[k] = &s->res[s->locks[tb->locks[k]]]; + for (int k = 0; k < tb->nr_uses; k++) + res_b[tb->nr_locks + k] = &s->res[s->locks[tb->uses[k]]]; + + /* Compute the resource union, which is just the sum of the + resource sizes. */ + int res_union = 0; + for (int k = 0; k < nr_res_a; k++) + res_union += res_a[k]->size; + for (int k = 0; k < nr_res_b; k++) + res_union += res_b[k]->size; + if (res_union == 0) return 1.0f; + + /* Compute the resource intersection. */ + int res_isect = 0; + for (int k = 0; k < nr_res_a; k++) { + const struct res *ra = res_a[k]; + for (int j = 0; j < nr_res_b; j++) { + const struct res *rb = res_b[j]; + if (ra->data <= rb->data && rb->data < ra->data + ra->size) { + if (rb->data + rb->size < ra->data + ra->size) + res_isect += rb->size; + else + res_isect += ra->data + ra->size - rb->data; + } else if (rb->data <= ra->data && ra->data < rb->data + rb->size) { + if (ra->data + ra->size < rb->data + rb->size) + res_isect += ra->size; + else + res_isect += rb->data + rb->size - ra->data; + } + } + } + + /* Return the Jaccard similarity, i.e. the ratio of the intersection + and the union. */ + return ((float)res_isect) / res_union; +} /** @@ -41,14 +108,14 @@ * @param q The #queue. * @param s The #qsched in which this queue's tasks lives. * @param insist If set, wait at the queue's lock, otherwise fail. + * @param last Index of the previous task executed, or -1. * * @return The task ID or -1 if no available task could be found. */ -int queue_get ( struct queue *q , struct qsched *s , int insist ) { +int queue_get ( struct queue *q , struct qsched *s , int insist , int last ) { - int k, j, temp, tid, *inds, count; - struct task *tasks = s->tasks; + const struct task *tasks = s->tasks; /* Should we even try? */ if ( q->count == 0 ) @@ -65,35 +132,74 @@ int queue_get ( struct queue *q , struct qsched *s , int insist ) { TIMER_TOC( s , qsched_timer_qlock ); /* Get a pointer to the indices. */ - inds = q->inds; - count = q->count; - - /* Loop over the queue entries. */ - for ( k = 0 ; k < count ; k++ ) { + int *inds = q->inds; + int count = q->count; - /* Get the task ID. */ - tid = inds[k]; + /* Data for the sliding window in which to try the task with the + best overlap with the previous task. */ + struct { + int ind, tid; + float score; + } window[queue_search_window]; + int window_count = 0; + qsched_task_t tid = qsched_task_none; + int ind = -1; - /* If the task can be locked, break. */ - if ( qsched_locktask( s , tid ) ) - break; + /* Loop over the queue entries. */ + for (int k = 0 ; k < count ; k++ ) { + if (k < queue_search_window) { + window[window_count].ind = k; + window[window_count].tid = inds[k]; + window[window_count].score = queue_task_overlap(s, last, inds[k]); + window_count += 1; + } else { + int ind_max = 0; + for (int i = 1; i < window_count; i++) + if (window[i].score > window[ind_max].score) ind_max = i; + if (qsched_locktask(s, window[ind_max].tid)) { + tid = window[ind_max].tid; + ind = window[ind_max].ind; + break; + } else { + window[ind_max].ind = k; + window[ind_max].tid = inds[k]; + window[ind_max].score = queue_task_overlap(s, last, inds[k]); + } + } + } + /* If we didn't get a task, loop through whatever is left in the window. */ + if (tid == qsched_task_none) { + while (window_count > 0) { + int ind_max = 0; + for (int i = 1; i < window_count; i++) + if (window[i].score > window[ind_max].score) ind_max = i; + if (qsched_locktask(s, window[ind_max].tid)) { + tid = window[ind_max].tid; + ind = window[ind_max].ind; + break; + } else { + window_count -= 1; + window[ind_max] = window[window_count]; } - + } + } + /* Did we get a task? */ - if ( k < count ) { + if ( tid != qsched_task_none ) { /* Swap the last element to the new heap position. */ q->count = ( count -= 1 ); - inds[k] = inds[ count ]; + inds[ind] = inds[ count ]; /* Fix the heap. */ + int k = ind; long long int w = tasks[ inds[k] ].weight; if ( k > 0 && w > tasks[ inds[(k-1)/2] ].weight ) while ( k > 0 ) { - j = (k - 1) / 2; + int j = (k - 1) / 2; if ( w > tasks[ inds[j] ].weight ) { - temp = inds[j]; + int temp = inds[j]; inds[j] = inds[k]; inds[k] = temp; k = j; @@ -103,12 +209,13 @@ int queue_get ( struct queue *q , struct qsched *s , int insist ) { } else while ( 1 ) { - if ( ( j = 2*k + 1 ) >= count ) + int j = 2*k + 1; + if ( j >= count ) break; if ( j+1 < count && tasks[ inds[j] ].weight < tasks[ inds[j+1] ].weight ) j = j + 1; if ( tasks[ inds[j] ].weight > w ) { - temp = inds[j]; + int temp = inds[j]; inds[j] = inds[k]; inds[k] = temp; k = j; @@ -119,10 +226,6 @@ int queue_get ( struct queue *q , struct qsched *s , int insist ) { } /* did we get a task? */ - /* Otherwise, clear the task ID. */ - else - tid = qsched_task_none; - /* Unlock the queue. */ lock_unlock_blind( &q->lock ); diff --git a/src/queue.h b/src/queue.h index 10d268e5da18173d7d09ba21d9a12af1f97a3820..7d53c9c01112128cee5d60852ab670b1d242e7df 100644 --- a/src/queue.h +++ b/src/queue.h @@ -20,6 +20,7 @@ /* Some constants. */ #define queue_grow 2 +#define queue_search_window 8 /* The queue data structure. */ struct queue { @@ -40,7 +41,7 @@ struct queue { /* Function prototypes. */ -int queue_get ( struct queue *q , struct qsched *s , int insist ); +int queue_get ( struct queue *q , struct qsched *s , int insist, int last ); void queue_put ( struct queue *q , struct qsched *s , int tid ); void queue_init ( struct queue *q , int size ); void queue_free ( struct queue *q );