Skip to content
Snippets Groups Projects
Commit b383d69e authored by Angus Lepper's avatar Angus Lepper
Browse files

Allow bit-reproducible results with 1 main and 1 runner thread

- queue_insert before incrementing s->waiting
- Decrement t->wait only if we might execute t
  i.e. it passes the mask and skip checks
- Remove unused non-blocking queue_gettask operation
- atomic.h atomic_load adds fences, avoids tearing across cache lines
- t->weight compile-time flag disables dependence on previous runtime
- scheduler_start compile-time flag removes race with scheduler_done
parent a7baf53d
No related branches found
No related tags found
No related merge requests found
......@@ -26,5 +26,6 @@
#define atomic_inc(v) atomic_add(v, 1)
#define atomic_dec(v) atomic_add(v, -1)
#define atomic_cas(v, o, n) __sync_val_compare_and_swap(v, o, n)
#define atomic_load(v) atomic_add(v, 0)
#endif /* SWIFT_ATOMIC_H */
......@@ -34,6 +34,7 @@
#include "queue.h"
/* Local headers. */
#include "atomic.h"
#include "const.h"
#include "error.h"
......@@ -78,7 +79,7 @@ void queue_insert(struct queue *q, struct task *t) {
/* Drop the task at the end of the queue. */
tid[q->count] = (t - tasks);
q->count += 1;
atomic_inc(&q->count);
/* Shuffle up. */
for (k = q->count - 1; k > 0; k = (k - 1) / 2)
......@@ -127,22 +128,18 @@ void queue_init(struct queue *q, struct task *tasks) {
*
* @param q The task #queue.
* @param prev The previous #task extracted from this #queue.
* @param blocking Block until access to the queue is granted.
*/
struct task *queue_gettask(struct queue *q, const struct task *prev, int blocking) {
struct task *queue_gettask(struct queue *q, const struct task *prev) {
lock_type *qlock = &q->lock;
struct task *res = NULL;
/* If there are no tasks, leave immediately. */
if (q->count == 0) return NULL;
/* Grab the task lock. */
if (blocking) {
if (lock_lock(qlock) != 0) error("Locking the qlock failed.\n");
} else {
if (lock_trylock(qlock) != 0) return NULL;
if (lock_trylock(qlock) != 0) return NULL;
if (q->count == 0) {
if (lock_unlock(qlock) != 0) error("Unlocking the qlock failed.\n");
return NULL;
}
/* Set some pointers we will use often. */
......@@ -211,7 +208,7 @@ struct task *queue_gettask(struct queue *q, const struct task *prev, int blockin
if (ind >= 0) {
/* Another one bites the dust. */
const int qcount = q->count -= 1;
const int qcount = atomic_dec(&q->count) - 1; // returns value pre-decrement
/* Get a pointer on the task that we want to return. */
res = &qtasks[tid];
......
......@@ -55,7 +55,7 @@ struct queue {
} __attribute__((aligned(64)));
/* Function prototypes. */
struct task *queue_gettask(struct queue *q, const struct task *prev, int blocking);
struct task *queue_gettask(struct queue *q, const struct task *prev);
void queue_init(struct queue *q, struct task *tasks);
void queue_insert(struct queue *q, struct task *t);
......
......@@ -840,7 +840,9 @@ void scheduler_reset(struct scheduler *s, int size) {
s->nr_unlocks = 0;
/* Set the task pointers in the queues. */
for (k = 0; k < s->nr_queues; k++) s->queues[k].tasks = s->tasks;
for (k = 0; k < s->nr_queues; k++) {
s->queues[k].tasks = s->tasks;
}
}
/**
......@@ -868,8 +870,10 @@ void scheduler_reweight(struct scheduler *s) {
for (j = 0; j < t->nr_unlock_tasks; j++)
if (t->unlock_tasks[j]->weight > t->weight)
t->weight = t->unlock_tasks[j]->weight;
#ifndef DETERMINISTIC_SCHEDULE
if (!t->implicit && t->tic > 0)
t->weight += wscale * (t->toc - t->tic);
#endif
else
switch (t->type) {
case task_type_sort:
......@@ -957,8 +961,8 @@ void scheduler_start(struct scheduler *s, unsigned int mask) {
const int num_rewait_tasks = s->nr_queues > s->size - s->nr_tasks
? s->size - s->nr_tasks
: s->nr_queues;
const int waiting_old =
s->waiting; // Remember that engine_launch may fiddle with this value.
/* Remember that engine_launch may fiddle with this value. */
const int waiting_old = atomic_load(&s->waiting);
for (int k = 0; k < num_rewait_tasks; k++) {
rewait_tasks[k].type = task_type_rewait;
rewait_tasks[k].ci = (struct cell *)&s->tasks[k * nr_tasks / s->nr_queues];
......@@ -976,7 +980,7 @@ void scheduler_start(struct scheduler *s, unsigned int mask) {
/* Wait for the rewait tasks to have executed. */
pthread_mutex_lock(&s->sleep_mutex);
while (s->waiting > waiting_old) {
while (atomic_load(&s->waiting) > waiting_old) {
pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex);
}
pthread_mutex_unlock(&s->sleep_mutex);
......@@ -987,11 +991,18 @@ void scheduler_start(struct scheduler *s, unsigned int mask) {
// tic = getticks();
for (int k = 0; k < s->nr_tasks; k++) {
t = &tasks[tid[k]];
if (atomic_dec(&t->wait) == 1 && ((1 << t->type) & s->mask) && !t->skip) {
if (((1 << t->type) & s->mask) && !t->skip && atomic_dec(&t->wait) == 1) {
scheduler_enqueue(s, t);
#ifndef DETERMINISTIC_SCHEDULE
pthread_cond_broadcast(&s->sleep_cond);
#endif
}
}
#ifdef DETERMINISTIC_SCHEDULE
pthread_cond_broadcast(&s->sleep_cond);
#endif
// message( "enqueueing tasks took %.3f ms." , (double)( getticks() - tic ) /
// CPU_TPS * 1000 );
}
......@@ -1041,10 +1052,13 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
case task_type_pair:
case task_type_sub:
qid = t->ci->super->owner;
if (t->cj != NULL &&
(qid < 0 ||
s->queues[qid].count > s->queues[t->cj->super->owner].count))
qid = t->cj->super->owner;
if (t->cj != NULL) {
int count_i = atomic_load(&s->queues[qid].count);
int count_j = atomic_load(&s->queues[t->cj->super->owner].count);
if (qid < 0 || count_i > count_j)
qid = t->cj->super->owner;
}
break;
case task_type_recv:
#ifdef WITH_MPI
......@@ -1091,11 +1105,11 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
/* If no previous owner, find the shortest queue. */
if (qid < 0) qid = rand() % s->nr_queues;
/* Increase the waiting counter. */
atomic_inc(&s->waiting);
/* Insert the task into that queue. */
queue_insert(&s->queues[qid], t);
/* Increase the waiting counter. */
atomic_inc(&s->waiting);
}
}
......@@ -1201,29 +1215,32 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
if (qid >= nr_queues || qid < 0) error("Bad queue ID.");
/* Loop as long as there are tasks... */
while (s->waiting > 0 && res == NULL) {
while (atomic_load(&s->waiting) > 0 && res == NULL) {
/* Try more than once before sleeping. */
for (int tries = 0; res == NULL && s->waiting && tries < scheduler_maxtries;
for (int tries = 0; atomic_load(&s->waiting) && tries < scheduler_maxtries;
tries++) {
/* Try to get a task from the suggested queue. */
if (s->queues[qid].count > 0) {
TIMER_TIC
res = queue_gettask(&s->queues[qid], prev, 0);
TIMER_TOC(timer_qget);
if (res != NULL) break;
}
TIMER_TIC
res = queue_gettask(&s->queues[qid], prev);
TIMER_TOC(timer_qget);
if (res != NULL) break;
/* If unsuccessful, try stealing from the other queues. */
if (s->flags & scheduler_flag_steal) {
int count = 0, qids[nr_queues];
for (k = 0; k < nr_queues; k++)
for (k = 0; k < nr_queues; k++) {
if (lock_lock(&s->queues[k].lock) != 0)
error("Locking the qlock failed.\n");
if (s->queues[k].count > 0) qids[count++] = k;
if (lock_unlock(&s->queues[k].lock) != 0)
error("Unlocking the qlock failed.\n");
}
for (k = 0; k < scheduler_maxsteal && count > 0; k++) {
int ind = rand_r(&seed) % count;
TIMER_TIC
res = queue_gettask(&s->queues[qids[ind]], prev, 0);
res = queue_gettask(&s->queues[qids[ind]], prev);
TIMER_TOC(timer_qsteal);
if (res != NULL)
break;
......@@ -1241,7 +1258,8 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
if (res == NULL) {
#endif
pthread_mutex_lock(&s->sleep_mutex);
if (s->waiting > 0) pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex);
if (atomic_load(&s->waiting) > 0)
pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex);
pthread_mutex_unlock(&s->sleep_mutex);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment