diff --git a/src/cell.h b/src/cell.h index 0577d3c71c678ce56ca013915d3df18c87e50e2d..0f8af822bb4050a21508a6b244ca4431d4a793b3 100644 --- a/src/cell.h +++ b/src/cell.h @@ -73,6 +73,9 @@ struct cell { /* Spin lock for various uses. */ lock_type lock; + /* ID of the previous owner, e.g. runner. */ + int owner; + /* Linking pointer for "memory management". */ struct cell *next; diff --git a/src/queue.c b/src/queue.c index 4b6744c8b8083276001c2cea6ab636184649ef9c..f1845b3a2c5c835409ff4479f86291b47624fbf2 100644 --- a/src/queue.c +++ b/src/queue.c @@ -84,6 +84,34 @@ int queue_counter[ queue_counter_count ]; +/** + * @brief Insert a used tasks into the given queue. + * + * @param q The #queue. + * @param t The #task. + */ + +void queue_insert ( struct queue *q , struct task *t ) { + + int k; + + /* Lock the queue. */ + if ( lock_lock( &q->lock ) != 0 ) + error( "Failed to get queue lock." ); + + /* Bubble-up the tasks. */ + for ( k = q->count ; k > q->next ; k-- ) + q->tid[k] = q->tid[k-1]; + q->tid[ q->next ] = t - q->tasks; + q->count += 1; + q->next += 1; + + /* Unlock the queue. */ + if ( lock_unlock( &q->lock ) != 0 ) + error( "Failed to unlock queue." ); + } + + /** * @brief Initialize the given queue. * @@ -239,6 +267,152 @@ struct task *queue_gettask ( struct queue *q , int blocking , int keep ) { } +struct task *queue_gettask_new ( struct queue *q , int rid , int blocking , int keep ) { + + int k, tid = -1, qcount, *qtid = q->tid; + lock_type *qlock = &q->lock; + struct task *qtasks = q->tasks, *res = NULL; + int ind_best, score_best = -1, score; + TIMER_TIC + + /* If there are no tasks, leave immediately. */ + if ( q->next >= q->count ) { + TIMER_TOC(queue_timer_gettask); + return NULL; + } + + /* Main loop, while there are tasks... */ + while ( q->next < q->count ) { + + /* Grab the task lock. */ + // if ( blocking ) { + if ( lock_lock( qlock ) != 0 ) + error( "Locking the task_lock failed.\n" ); + // } + // else { + // if ( lock_trylock( qlock ) != 0 ) + // break; + // } + + /* Loop over the remaining task IDs. */ + qcount = q->count; ind_best = -1; + for ( k = q->next ; k < qcount ; k++ ) { + + /* Put a finger on the task. */ + res = &qtasks[ qtid[k] ]; + + /* Is this task blocked? */ + if ( res->wait ) + continue; + + /* Get the score for this task. */ + if ( res->type == tid_self || res->type == tid_sort || ( res->type == tid_sub && res->cj == NULL ) ) + score = ( res->ci->owner == rid ); + else + score = ( res->ci->owner == rid ) + ( res->cj->owner == rid ); + if ( score <= score_best ) + continue; + + /* Different criteria for different types. */ + if ( res->type == tid_self || (res->type == tid_sub && res->cj == NULL) ) { + if ( res->ci->hold || cell_locktree( res->ci ) != 0 ) + continue; + } + else if ( res->type == tid_pair || (res->type == tid_sub && res->cj != NULL) ) { + if ( res->ci->hold || res->cj->hold || res->ci->wait || res->cj->wait ) + continue; + if ( cell_locktree( res->ci ) != 0 ) + continue; + if ( cell_locktree( res->cj ) != 0 ) { + cell_unlocktree( res->ci ); + continue; + } + } + + /* If we owned a previous task, unlock it. */ + if ( ind_best >= 0 ) { + res = &qtasks[ qtid[ ind_best ] ]; + if ( res->type == tid_self || res->type == tid_pair || res->type == tid_sub ) + cell_unlocktree( res->ci ); + if ( res->type == tid_pair || (res->type == tid_sub && res->cj != NULL) ) + cell_unlocktree( res->cj ); + } + + /* If we made it this far, we're safe. */ + ind_best = k; + score_best = score; + + /* Should we bother looking any farther? */ + if ( score_best == 2 ) + break; + + } /* loop over the task IDs. */ + + /* Did we get a task? */ + if ( ind_best >= 0 ) { + + /* Do we need to swap? */ + if ( ind_best != q->next ) + COUNT(queue_counter_swap); + + /* get the task ID. */ + tid = qtid[ ind_best ]; + + /* Own the cells involved. */ + qtasks[ tid ].ci->owner = rid; + if ( qtasks[ tid ].cj != NULL ) + qtasks[ tid ].cj->owner = rid; + + /* Remove the task? */ + if ( keep ) { + + /* Bubble-up. */ + q->count = qcount - 1; + for ( k = ind_best ; k < qcount - 1 ; k++ ) + qtid[k] = qtid[k+1]; + + } + + /* No, leave it in the queue. */ + else { + + TIMER_TIC2 + + /* Bubble-down the task. */ + for ( k = ind_best ; k > q->next ; k-- ) + qtid[ k ] = qtid[ k-1 ]; + qtid[ q->next ] = tid; + + /* up the counter. */ + q->next += 1; + + TIMER_TOC2(queue_timer_bubble); + + } + + } + + /* Release the task lock. */ + if ( lock_unlock( qlock ) != 0 ) + error( "Unlocking the task_lock failed.\n" ); + + /* Leave? */ + if ( tid >= 0 ) { + TIMER_TOC(queue_timer_gettask); + return &qtasks[tid]; + } + else if ( !blocking ) + break; + + } /* while there are tasks. */ + + /* No beef. */ + TIMER_TOC(queue_timer_gettask); + return NULL; + + } + + /** * @brief Sort the tasks IDs according to their weight and constraints. * diff --git a/src/queue.h b/src/queue.h index 4f4b2670ada03cdc3fc97c3b645a744f26e56f2a..5c872e9940b33c49f6f5beb5ff60f4ee27592580 100644 --- a/src/queue.h +++ b/src/queue.h @@ -56,5 +56,7 @@ struct queue { /* Function prototypes. */ struct task *queue_gettask ( struct queue *q , int blocking , int keep ); +struct task *queue_gettask_new ( struct queue *q , int rid , int blocking , int keep ); void queue_init ( struct queue *q , int size , struct task *tasks ); +void queue_insert ( struct queue *q , struct task *t ); void queue_sort ( struct queue *q ); diff --git a/src/runner.c b/src/runner.c index 644cbed88444d4963cb52207377911d6fb827ddf..daaf68ea844edd301384aaa2110b357354806143 100644 --- a/src/runner.c +++ b/src/runner.c @@ -1030,7 +1030,6 @@ void *runner_main ( void *data ) { struct runner_thread *rt = (struct runner_thread *)data; struct runner *r = rt->r; - struct space *s = r->s; int threadID = rt->id; int k, qid, naq, keep, tpq; struct queue *queues[ r->nr_queues ], *myq; @@ -1080,11 +1079,11 @@ void *runner_main ( void *data ) { TIMER_TIC t = NULL; if ( r->nr_queues == 1 ) { - t = queue_gettask( &r->queues[0] , 1 , 0 ); + t = queue_gettask_new( &r->queues[0] , rt->id , 1 , 0 ); } else if ( r->policy & runner_policy_steal ) { if ( ( myq->next == myq->count ) || - ( t = queue_gettask( myq , 0 , 0 ) ) == NULL ) { + ( t = queue_gettask_new( myq , rt->id , 0 , 0 ) ) == NULL ) { TIMER_TIC2 qid = rand_r( &myseed ) % naq; keep = ( r->policy & runner_policy_keep ) && @@ -1093,19 +1092,9 @@ void *runner_main ( void *data ) { COUNT(runner_counter_steal_empty); else COUNT(runner_counter_steal_stall); - t = queue_gettask( queues[qid] , 0 , keep ); - if ( t != NULL && keep ) { - COUNT(runner_counter_keep); - if ( lock_lock( &myq->lock ) != 0 ) - error( "Failed to get queue lock." ); - for ( k = myq->count ; k > myq->next ; k-- ) - myq->tid[k] = myq->tid[k-1]; - myq->tid[ myq->next ] = t - s->tasks; - myq->count += 1; - myq->next += 1; - if ( lock_unlock( &myq->lock ) != 0 ) - error( "Failed to unlock queue." ); - } + t = queue_gettask_new( queues[qid] , rt->id , 0 , keep ); + if ( t != NULL && keep ) + queue_insert( myq , t ); TIMER_TOC2(runner_timer_steal); } }