diff --git a/src/atomic.h b/src/atomic.h index 818d210e60a7aacdf61d12b60623ce87e62c9ed2..21cf3822b2e6b3df66aba7be9f8143174b8cfc56 100644 --- a/src/atomic.h +++ b/src/atomic.h @@ -26,5 +26,6 @@ #define atomic_inc(v) atomic_add(v, 1) #define atomic_dec(v) atomic_add(v, -1) #define atomic_cas(v, o, n) __sync_val_compare_and_swap(v, o, n) +#define atomic_load(v) atomic_add(v, 0) #endif /* SWIFT_ATOMIC_H */ diff --git a/src/queue.c b/src/queue.c index af84c4b1f5e7e80fac05e5b122703fa718b7fead..617b74866c213898c2635224e9342d19d23f8459 100644 --- a/src/queue.c +++ b/src/queue.c @@ -34,6 +34,7 @@ #include "queue.h" /* Local headers. */ +#include "atomic.h" #include "const.h" #include "error.h" @@ -78,7 +79,7 @@ void queue_insert(struct queue *q, struct task *t) { /* Drop the task at the end of the queue. */ tid[q->count] = (t - tasks); - q->count += 1; + atomic_inc(&q->count); /* Shuffle up. */ for (k = q->count - 1; k > 0; k = (k - 1) / 2) @@ -127,22 +128,18 @@ void queue_init(struct queue *q, struct task *tasks) { * * @param q The task #queue. * @param prev The previous #task extracted from this #queue. - * @param blocking Block until access to the queue is granted. */ -struct task *queue_gettask(struct queue *q, const struct task *prev, int blocking) { +struct task *queue_gettask(struct queue *q, const struct task *prev) { lock_type *qlock = &q->lock; struct task *res = NULL; /* If there are no tasks, leave immediately. */ - if (q->count == 0) return NULL; - - /* Grab the task lock. */ - if (blocking) { - if (lock_lock(qlock) != 0) error("Locking the qlock failed.\n"); - } else { - if (lock_trylock(qlock) != 0) return NULL; + if (lock_trylock(qlock) != 0) return NULL; + if (q->count == 0) { + if (lock_unlock(qlock) != 0) error("Unlocking the qlock failed.\n"); + return NULL; } /* Set some pointers we will use often. */ @@ -211,7 +208,7 @@ struct task *queue_gettask(struct queue *q, const struct task *prev, int blockin if (ind >= 0) { /* Another one bites the dust. */ - const int qcount = q->count -= 1; + const int qcount = atomic_dec(&q->count) - 1; // returns value pre-decrement /* Get a pointer on the task that we want to return. */ res = &qtasks[tid]; diff --git a/src/queue.h b/src/queue.h index 7f6d13c425f80cac20125bf422fe9da1ed06361f..0bcf10d5027e3b8bc94ebf827b2c632a78fdb020 100644 --- a/src/queue.h +++ b/src/queue.h @@ -55,7 +55,7 @@ struct queue { } __attribute__((aligned(64))); /* Function prototypes. */ -struct task *queue_gettask(struct queue *q, const struct task *prev, int blocking); +struct task *queue_gettask(struct queue *q, const struct task *prev); void queue_init(struct queue *q, struct task *tasks); void queue_insert(struct queue *q, struct task *t); diff --git a/src/scheduler.c b/src/scheduler.c index a83416aad4ed07ee55107775221e0c5b6b38b0f7..a8840a787ad5ecbdb3a2dd89a8488e687eeb01ff 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -840,7 +840,9 @@ void scheduler_reset(struct scheduler *s, int size) { s->nr_unlocks = 0; /* Set the task pointers in the queues. */ - for (k = 0; k < s->nr_queues; k++) s->queues[k].tasks = s->tasks; + for (k = 0; k < s->nr_queues; k++) { + s->queues[k].tasks = s->tasks; + } } /** @@ -868,8 +870,10 @@ void scheduler_reweight(struct scheduler *s) { for (j = 0; j < t->nr_unlock_tasks; j++) if (t->unlock_tasks[j]->weight > t->weight) t->weight = t->unlock_tasks[j]->weight; +#ifndef DETERMINISTIC_SCHEDULE if (!t->implicit && t->tic > 0) t->weight += wscale * (t->toc - t->tic); +#endif else switch (t->type) { case task_type_sort: @@ -957,8 +961,8 @@ void scheduler_start(struct scheduler *s, unsigned int mask) { const int num_rewait_tasks = s->nr_queues > s->size - s->nr_tasks ? s->size - s->nr_tasks : s->nr_queues; - const int waiting_old = - s->waiting; // Remember that engine_launch may fiddle with this value. + /* Remember that engine_launch may fiddle with this value. */ + const int waiting_old = atomic_load(&s->waiting); for (int k = 0; k < num_rewait_tasks; k++) { rewait_tasks[k].type = task_type_rewait; rewait_tasks[k].ci = (struct cell *)&s->tasks[k * nr_tasks / s->nr_queues]; @@ -976,7 +980,7 @@ void scheduler_start(struct scheduler *s, unsigned int mask) { /* Wait for the rewait tasks to have executed. */ pthread_mutex_lock(&s->sleep_mutex); - while (s->waiting > waiting_old) { + while (atomic_load(&s->waiting) > waiting_old) { pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex); } pthread_mutex_unlock(&s->sleep_mutex); @@ -987,11 +991,18 @@ void scheduler_start(struct scheduler *s, unsigned int mask) { // tic = getticks(); for (int k = 0; k < s->nr_tasks; k++) { t = &tasks[tid[k]]; - if (atomic_dec(&t->wait) == 1 && ((1 << t->type) & s->mask) && !t->skip) { + if (((1 << t->type) & s->mask) && !t->skip && atomic_dec(&t->wait) == 1) { scheduler_enqueue(s, t); +#ifndef DETERMINISTIC_SCHEDULE pthread_cond_broadcast(&s->sleep_cond); +#endif } } + +#ifdef DETERMINISTIC_SCHEDULE + pthread_cond_broadcast(&s->sleep_cond); +#endif + // message( "enqueueing tasks took %.3f ms." , (double)( getticks() - tic ) / // CPU_TPS * 1000 ); } @@ -1041,10 +1052,13 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { case task_type_pair: case task_type_sub: qid = t->ci->super->owner; - if (t->cj != NULL && - (qid < 0 || - s->queues[qid].count > s->queues[t->cj->super->owner].count)) - qid = t->cj->super->owner; + + if (t->cj != NULL) { + int count_i = atomic_load(&s->queues[qid].count); + int count_j = atomic_load(&s->queues[t->cj->super->owner].count); + if (qid < 0 || count_i > count_j) + qid = t->cj->super->owner; + } break; case task_type_recv: #ifdef WITH_MPI @@ -1091,11 +1105,11 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { /* If no previous owner, find the shortest queue. */ if (qid < 0) qid = rand() % s->nr_queues; - /* Increase the waiting counter. */ - atomic_inc(&s->waiting); - /* Insert the task into that queue. */ queue_insert(&s->queues[qid], t); + + /* Increase the waiting counter. */ + atomic_inc(&s->waiting); } } @@ -1201,29 +1215,32 @@ struct task *scheduler_gettask(struct scheduler *s, int qid, if (qid >= nr_queues || qid < 0) error("Bad queue ID."); /* Loop as long as there are tasks... */ - while (s->waiting > 0 && res == NULL) { + while (atomic_load(&s->waiting) > 0 && res == NULL) { /* Try more than once before sleeping. */ - for (int tries = 0; res == NULL && s->waiting && tries < scheduler_maxtries; + for (int tries = 0; atomic_load(&s->waiting) && tries < scheduler_maxtries; tries++) { /* Try to get a task from the suggested queue. */ - if (s->queues[qid].count > 0) { - TIMER_TIC - res = queue_gettask(&s->queues[qid], prev, 0); - TIMER_TOC(timer_qget); - if (res != NULL) break; - } + TIMER_TIC + res = queue_gettask(&s->queues[qid], prev); + TIMER_TOC(timer_qget); + if (res != NULL) break; /* If unsuccessful, try stealing from the other queues. */ if (s->flags & scheduler_flag_steal) { int count = 0, qids[nr_queues]; - for (k = 0; k < nr_queues; k++) + for (k = 0; k < nr_queues; k++) { + if (lock_lock(&s->queues[k].lock) != 0) + error("Locking the qlock failed.\n"); if (s->queues[k].count > 0) qids[count++] = k; + if (lock_unlock(&s->queues[k].lock) != 0) + error("Unlocking the qlock failed.\n"); + } for (k = 0; k < scheduler_maxsteal && count > 0; k++) { int ind = rand_r(&seed) % count; TIMER_TIC - res = queue_gettask(&s->queues[qids[ind]], prev, 0); + res = queue_gettask(&s->queues[qids[ind]], prev); TIMER_TOC(timer_qsteal); if (res != NULL) break; @@ -1241,7 +1258,8 @@ struct task *scheduler_gettask(struct scheduler *s, int qid, if (res == NULL) { #endif pthread_mutex_lock(&s->sleep_mutex); - if (s->waiting > 0) pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex); + if (atomic_load(&s->waiting) > 0) + pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex); pthread_mutex_unlock(&s->sleep_mutex); } }