Skip to content
Snippets Groups Projects
Commit 83caf9ab authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

add an incoming waiting list for the queue, makes adding new tasks faster as...

add an incoming waiting list for the queue, makes adding new tasks faster as it does not always require the queue lock.
parent 778c4cca
No related branches found
No related tags found
1 merge request!178Queue fix
......@@ -26,5 +26,6 @@
#define atomic_inc(v) atomic_add(v, 1)
#define atomic_dec(v) atomic_add(v, -1)
#define atomic_cas(v, o, n) __sync_val_compare_and_swap(v, o, n)
#define atomic_swap(v, n) __sync_lock_test_and_set(v, n)
#endif /* SWIFT_ATOMIC_H */
......@@ -34,6 +34,7 @@
#include "queue.h"
/* Local headers. */
#include "atomic.h"
#include "const.h"
#include "error.h"
......@@ -48,54 +49,78 @@
int queue_counter[queue_counter_count];
/**
* @brief Insert a used tasks into the given queue.
* @brief Enqueue all tasks in the incoming DEQ.
*
* @param q The #queue.
* @param t The #task.
* @param q The #queue, assumed to be locked.
*/
void queue_insert(struct queue *q, struct task *t) {
int k, *tid;
struct task *tasks;
/* Lock the queue. */
if (lock_lock(&q->lock) != 0) error("Failed to get queue lock.");
void queue_get_incoming(struct queue *q) {
int *tid = q->tid;
struct task *tasks = q->tasks;
/* Loop over the incoming DEQ. */
while (1) {
/* Is there a next element? */
const int ind = q->first_incoming % queue_incoming_size;
if (q->tid_incoming[ind] < 0) break;
/* Get the next offset off the DEQ. */
const int offset = atomic_swap(&q->tid_incoming[ind], -1);
atomic_inc(&q->first_incoming);
/* Does the queue need to be grown? */
if (q->count == q->size) {
int *temp;
q->size *= queue_sizegrow;
if ((temp = (int *)malloc(sizeof(int) * q->size)) == NULL)
error("Failed to allocate new indices.");
memcpy(temp, tid, sizeof(int) * q->count);
free(tid);
q->tid = tid = temp;
}
tid = q->tid;
tasks = q->tasks;
/* Drop the task at the end of the queue. */
tid[q->count] = offset;
q->count += 1;
/* Shuffle up. */
for (int k = q->count - 1; k > 0; k = (k - 1) / 2)
if (tasks[tid[k]].weight > tasks[tid[(k - 1) / 2]].weight) {
int temp = tid[k];
tid[k] = tid[(k - 1) / 2];
tid[(k - 1) / 2] = temp;
} else
break;
/* Does the queue need to be grown? */
if (q->count == q->size) {
int *temp;
q->size *= queue_sizegrow;
if ((temp = (int *)malloc(sizeof(int) * q->size)) == NULL)
error("Failed to allocate new indices.");
memcpy(temp, tid, sizeof(int) * q->count);
free(tid);
q->tid = tid = temp;
/* Check the queue's consistency. */
/* for (int k = 1; k < q->count; k++)
if ( tasks[ tid[(k-1)/2] ].weight < tasks[ tid[k] ].weight )
error( "Queue heap is disordered." ); */
}
}
/* Drop the task at the end of the queue. */
tid[q->count] = (t - tasks);
q->count += 1;
/* Shuffle up. */
for (k = q->count - 1; k > 0; k = (k - 1) / 2)
if (tasks[tid[k]].weight > tasks[tid[(k - 1) / 2]].weight) {
int temp = tid[k];
tid[k] = tid[(k - 1) / 2];
tid[(k - 1) / 2] = temp;
} else
break;
/* Check the queue's consistency. */
/* for ( k = 1 ; k < q->count ; k++ )
if ( tasks[ tid[(k-1)/2] ].weight < tasks[ tid[k] ].weight )
error( "Queue heap is disordered." ); */
/* Unlock the queue. */
if (lock_unlock(&q->lock) != 0) error("Failed to unlock queue.");
/**
* @brief Insert a used tasks into the given queue.
*
* @param q The #queue.
* @param t The #task.
*/
void queue_insert(struct queue *q, struct task *t) {
/* Get an index in the DEQ. */
const int ind = atomic_inc(&q->last_incoming) % queue_incoming_size;
/* Spin until the new offset can be stored. */
while (atomic_cas(&q->tid_incoming[ind], -1, t - q->tasks) != -1) {
/* Try to get the queue lock, non-blocking, ensures that at
least somebody is working on this queue. */
if (lock_trylock(&q->lock) == 0) {
queue_get_incoming(q);
if (lock_unlock(&q->lock) != 0) error("Unlocking the qlock failed.\n");
}
}
}
/**
......@@ -120,6 +145,15 @@ void queue_init(struct queue *q, struct task *tasks) {
/* Init the queue lock. */
if (lock_init(&q->lock) != 0) error("Failed to init queue lock.");
/* Init the incoming DEQ. */
if ((q->tid_incoming = (int *)malloc(sizeof(int) * queue_incoming_size)) == NULL)
error("Failed to allocate queue incoming buffer.");
for (int k = 0; k < queue_incoming_size; k++) {
q->tid_incoming[k] = -1;
}
q->first_incoming = 0;
q->last_incoming = 0;
}
/**
......@@ -142,6 +176,9 @@ struct task *queue_gettask(struct queue *q, const struct task *prev,
} else {
if (lock_trylock(qlock) != 0) return NULL;
}
/* Fill any tasks from the incoming DEQ. */
queue_get_incoming(q);
/* If there are no tasks, leave immediately. */
if (q->count == 0) {
......
......@@ -29,6 +29,7 @@
#define queue_sizeinit 100
#define queue_sizegrow 2
#define queue_search_window 8
#define queue_incoming_size 1024
/* Counters. */
enum {
......@@ -51,6 +52,10 @@ struct queue {
/* The task indices. */
int *tid;
/* DEQ for incoming tasks. */
int *tid_incoming;
volatile unsigned int first_incoming, last_incoming;
} __attribute__((aligned(64)));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment