Skip to content
Snippets Groups Projects
Commit 21e5ce73 authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

check both count and count_incoming before deciding to skip a queue.

parent 83caf9ab
No related branches found
No related tags found
1 merge request!178Queue fix
......@@ -84,6 +84,7 @@ void queue_get_incoming(struct queue *q) {
/* Drop the task at the end of the queue. */
tid[q->count] = offset;
q->count += 1;
atomic_dec(&q->count_incoming);
/* Shuffle up. */
for (int k = q->count - 1; k > 0; k = (k - 1) / 2)
......@@ -114,13 +115,23 @@ void queue_insert(struct queue *q, struct task *t) {
/* Spin until the new offset can be stored. */
while (atomic_cas(&q->tid_incoming[ind], -1, t - q->tasks) != -1) {
/* Try to get the queue lock, non-blocking, ensures that at
least somebody is working on this queue. */
if (lock_trylock(&q->lock) == 0) {
/* Clean up the incoming DEQ. */
queue_get_incoming(q);
if (lock_unlock(&q->lock) != 0) error("Unlocking the qlock failed.\n");
/* Release the queue lock. */
if (lock_unlock(&q->lock) != 0) {
error("Unlocking the qlock failed.\n");
}
}
}
/* Increase the incoming count. */
atomic_inc(&q->count_incoming);
}
/**
......@@ -154,6 +165,7 @@ void queue_init(struct queue *q, struct task *tasks) {
}
q->first_incoming = 0;
q->last_incoming = 0;
q->count_incoming = 0;
}
/**
......
......@@ -55,7 +55,7 @@ struct queue {
/* DEQ for incoming tasks. */
int *tid_incoming;
volatile unsigned int first_incoming, last_incoming;
volatile unsigned int first_incoming, last_incoming, count_incoming;
} __attribute__((aligned(64)));
......
......@@ -1254,7 +1254,8 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
tries++) {
/* Try to get a task from the suggested queue. */
if (s->queues[qid].count > 0) {
if (s->queues[qid].count > 0 ||
s->queues[qid].count_incoming > 0) {
TIMER_TIC
res = queue_gettask(&s->queues[qid], prev, 0);
TIMER_TOC(timer_qget);
......@@ -1265,7 +1266,10 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
if (s->flags & scheduler_flag_steal) {
int count = 0, qids[nr_queues];
for (int k = 0; k < nr_queues; k++)
if (s->queues[k].count > 0) qids[count++] = k;
if (s->queues[k].count > 0 ||
s->queues[k].count_incoming > 0) {
qids[count++] = k;
}
for (int k = 0; k < scheduler_maxsteal && count > 0; k++) {
const int ind = rand_r(&seed) % count;
TIMER_TIC
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment