Commit 17f2903b authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'queue_fix' into 'master'

Queue fix

Separate adding and getting a task. Uses a double-ended queue to buffer incoming tasks. The buffer is then added to the actual queue whenever queue_gettask is called.

Peter, can you check if this doesn't break anything? Thanks!

See merge request !178
parents 78f0419b 21e5ce73
......@@ -26,5 +26,6 @@
#define atomic_inc(v) atomic_add(v, 1)
#define atomic_dec(v) atomic_add(v, -1)
#define atomic_cas(v, o, n) __sync_val_compare_and_swap(v, o, n)
#define atomic_swap(v, n) __sync_lock_test_and_set(v, n)
#endif /* SWIFT_ATOMIC_H */
......@@ -34,6 +34,7 @@
#include "queue.h"
/* Local headers. */
#include "atomic.h"
#include "const.h"
#include "error.h"
......@@ -48,54 +49,89 @@
int queue_counter[queue_counter_count];
/**
* @brief Insert a used tasks into the given queue.
* @brief Enqueue all tasks in the incoming DEQ.
*
* @param q The #queue.
* @param t The #task.
* @param q The #queue, assumed to be locked.
*/
void queue_insert(struct queue *q, struct task *t) {
int k, *tid;
struct task *tasks;
/* Lock the queue. */
if (lock_lock(&q->lock) != 0) error("Failed to get queue lock.");
void queue_get_incoming(struct queue *q) {
int *tid = q->tid;
struct task *tasks = q->tasks;
/* Loop over the incoming DEQ. */
while (1) {
/* Is there a next element? */
const int ind = q->first_incoming % queue_incoming_size;
if (q->tid_incoming[ind] < 0) break;
/* Get the next offset off the DEQ. */
const int offset = atomic_swap(&q->tid_incoming[ind], -1);
atomic_inc(&q->first_incoming);
/* Does the queue need to be grown? */
if (q->count == q->size) {
int *temp;
q->size *= queue_sizegrow;
if ((temp = (int *)malloc(sizeof(int) * q->size)) == NULL)
error("Failed to allocate new indices.");
memcpy(temp, tid, sizeof(int) * q->count);
free(tid);
q->tid = tid = temp;
}
tid = q->tid;
tasks = q->tasks;
/* Drop the task at the end of the queue. */
tid[q->count] = offset;
q->count += 1;
atomic_dec(&q->count_incoming);
/* Shuffle up. */
for (int k = q->count - 1; k > 0; k = (k - 1) / 2)
if (tasks[tid[k]].weight > tasks[tid[(k - 1) / 2]].weight) {
int temp = tid[k];
tid[k] = tid[(k - 1) / 2];
tid[(k - 1) / 2] = temp;
} else
break;
/* Does the queue need to be grown? */
if (q->count == q->size) {
int *temp;
q->size *= queue_sizegrow;
if ((temp = (int *)malloc(sizeof(int) * q->size)) == NULL)
error("Failed to allocate new indices.");
memcpy(temp, tid, sizeof(int) * q->count);
free(tid);
q->tid = tid = temp;
/* Check the queue's consistency. */
/* for (int k = 1; k < q->count; k++)
if ( tasks[ tid[(k-1)/2] ].weight < tasks[ tid[k] ].weight )
error( "Queue heap is disordered." ); */
}
}
/* Drop the task at the end of the queue. */
tid[q->count] = (t - tasks);
q->count += 1;
/* Shuffle up. */
for (k = q->count - 1; k > 0; k = (k - 1) / 2)
if (tasks[tid[k]].weight > tasks[tid[(k - 1) / 2]].weight) {
int temp = tid[k];
tid[k] = tid[(k - 1) / 2];
tid[(k - 1) / 2] = temp;
} else
break;
/* Check the queue's consistency. */
/* for ( k = 1 ; k < q->count ; k++ )
if ( tasks[ tid[(k-1)/2] ].weight < tasks[ tid[k] ].weight )
error( "Queue heap is disordered." ); */
/* Unlock the queue. */
if (lock_unlock(&q->lock) != 0) error("Failed to unlock queue.");
/**
* @brief Insert a used tasks into the given queue.
*
* @param q The #queue.
* @param t The #task.
*/
void queue_insert(struct queue *q, struct task *t) {
/* Get an index in the DEQ. */
const int ind = atomic_inc(&q->last_incoming) % queue_incoming_size;
/* Spin until the new offset can be stored. */
while (atomic_cas(&q->tid_incoming[ind], -1, t - q->tasks) != -1) {
/* Try to get the queue lock, non-blocking, ensures that at
least somebody is working on this queue. */
if (lock_trylock(&q->lock) == 0) {
/* Clean up the incoming DEQ. */
queue_get_incoming(q);
/* Release the queue lock. */
if (lock_unlock(&q->lock) != 0) {
error("Unlocking the qlock failed.\n");
}
}
}
/* Increase the incoming count. */
atomic_inc(&q->count_incoming);
}
/**
......@@ -120,6 +156,16 @@ void queue_init(struct queue *q, struct task *tasks) {
/* Init the queue lock. */
if (lock_init(&q->lock) != 0) error("Failed to init queue lock.");
/* Init the incoming DEQ. */
if ((q->tid_incoming = (int *)malloc(sizeof(int) * queue_incoming_size)) == NULL)
error("Failed to allocate queue incoming buffer.");
for (int k = 0; k < queue_incoming_size; k++) {
q->tid_incoming[k] = -1;
}
q->first_incoming = 0;
q->last_incoming = 0;
q->count_incoming = 0;
}
/**
......@@ -142,6 +188,9 @@ struct task *queue_gettask(struct queue *q, const struct task *prev,
} else {
if (lock_trylock(qlock) != 0) return NULL;
}
/* Fill any tasks from the incoming DEQ. */
queue_get_incoming(q);
/* If there are no tasks, leave immediately. */
if (q->count == 0) {
......
......@@ -29,6 +29,7 @@
#define queue_sizeinit 100
#define queue_sizegrow 2
#define queue_search_window 8
#define queue_incoming_size 1024
/* Counters. */
enum {
......@@ -51,6 +52,10 @@ struct queue {
/* The task indices. */
int *tid;
/* DEQ for incoming tasks. */
int *tid_incoming;
volatile unsigned int first_incoming, last_incoming, count_incoming;
} __attribute__((aligned(64)));
......
......@@ -1254,7 +1254,8 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
tries++) {
/* Try to get a task from the suggested queue. */
if (s->queues[qid].count > 0) {
if (s->queues[qid].count > 0 ||
s->queues[qid].count_incoming > 0) {
TIMER_TIC
res = queue_gettask(&s->queues[qid], prev, 0);
TIMER_TOC(timer_qget);
......@@ -1265,7 +1266,10 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
if (s->flags & scheduler_flag_steal) {
int count = 0, qids[nr_queues];
for (int k = 0; k < nr_queues; k++)
if (s->queues[k].count > 0) qids[count++] = k;
if (s->queues[k].count > 0 ||
s->queues[k].count_incoming > 0) {
qids[count++] = k;
}
for (int k = 0; k < scheduler_maxsteal && count > 0; k++) {
const int ind = rand_r(&seed) % count;
TIMER_TIC
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment