diff --git a/src/atomic.h b/src/atomic.h index 818d210e60a7aacdf61d12b60623ce87e62c9ed2..0b87a0f77e17bafc64a2a59b3c70bda782fc14d4 100644 --- a/src/atomic.h +++ b/src/atomic.h @@ -26,5 +26,6 @@ #define atomic_inc(v) atomic_add(v, 1) #define atomic_dec(v) atomic_add(v, -1) #define atomic_cas(v, o, n) __sync_val_compare_and_swap(v, o, n) +#define atomic_swap(v, n) __sync_lock_test_and_set(v, n) #endif /* SWIFT_ATOMIC_H */ diff --git a/src/queue.c b/src/queue.c index dfdac883f9713eb57bb2dc45eb97774983e3a9b2..6b16a58eb84ca745a4673010033c6041c98f5a31 100644 --- a/src/queue.c +++ b/src/queue.c @@ -34,6 +34,7 @@ #include "queue.h" /* Local headers. */ +#include "atomic.h" #include "const.h" #include "error.h" @@ -48,54 +49,78 @@ int queue_counter[queue_counter_count]; /** - * @brief Insert a used tasks into the given queue. + * @brief Enqueue all tasks in the incoming DEQ. * - * @param q The #queue. - * @param t The #task. + * @param q The #queue, assumed to be locked. */ -void queue_insert(struct queue *q, struct task *t) { - - int k, *tid; - struct task *tasks; - - /* Lock the queue. */ - if (lock_lock(&q->lock) != 0) error("Failed to get queue lock."); +void queue_get_incoming(struct queue *q) { + + int *tid = q->tid; + struct task *tasks = q->tasks; + + /* Loop over the incoming DEQ. */ + while (1) { + + /* Is there a next element? */ + const int ind = q->first_incoming % queue_incoming_size; + if (q->tid_incoming[ind] < 0) break; + + /* Get the next offset off the DEQ. */ + const int offset = atomic_swap(&q->tid_incoming[ind], -1); + atomic_inc(&q->first_incoming); + + /* Does the queue need to be grown? */ + if (q->count == q->size) { + int *temp; + q->size *= queue_sizegrow; + if ((temp = (int *)malloc(sizeof(int) * q->size)) == NULL) + error("Failed to allocate new indices."); + memcpy(temp, tid, sizeof(int) * q->count); + free(tid); + q->tid = tid = temp; + } - tid = q->tid; - tasks = q->tasks; + /* Drop the task at the end of the queue. */ + tid[q->count] = offset; + q->count += 1; + + /* Shuffle up. */ + for (int k = q->count - 1; k > 0; k = (k - 1) / 2) + if (tasks[tid[k]].weight > tasks[tid[(k - 1) / 2]].weight) { + int temp = tid[k]; + tid[k] = tid[(k - 1) / 2]; + tid[(k - 1) / 2] = temp; + } else + break; - /* Does the queue need to be grown? */ - if (q->count == q->size) { - int *temp; - q->size *= queue_sizegrow; - if ((temp = (int *)malloc(sizeof(int) * q->size)) == NULL) - error("Failed to allocate new indices."); - memcpy(temp, tid, sizeof(int) * q->count); - free(tid); - q->tid = tid = temp; + /* Check the queue's consistency. */ + /* for (int k = 1; k < q->count; k++) + if ( tasks[ tid[(k-1)/2] ].weight < tasks[ tid[k] ].weight ) + error( "Queue heap is disordered." ); */ } +} - /* Drop the task at the end of the queue. */ - tid[q->count] = (t - tasks); - q->count += 1; - - /* Shuffle up. */ - for (k = q->count - 1; k > 0; k = (k - 1) / 2) - if (tasks[tid[k]].weight > tasks[tid[(k - 1) / 2]].weight) { - int temp = tid[k]; - tid[k] = tid[(k - 1) / 2]; - tid[(k - 1) / 2] = temp; - } else - break; - - /* Check the queue's consistency. */ - /* for ( k = 1 ; k < q->count ; k++ ) - if ( tasks[ tid[(k-1)/2] ].weight < tasks[ tid[k] ].weight ) - error( "Queue heap is disordered." ); */ - - /* Unlock the queue. */ - if (lock_unlock(&q->lock) != 0) error("Failed to unlock queue."); +/** + * @brief Insert a used tasks into the given queue. + * + * @param q The #queue. + * @param t The #task. + */ + +void queue_insert(struct queue *q, struct task *t) { + /* Get an index in the DEQ. */ + const int ind = atomic_inc(&q->last_incoming) % queue_incoming_size; + + /* Spin until the new offset can be stored. */ + while (atomic_cas(&q->tid_incoming[ind], -1, t - q->tasks) != -1) { + /* Try to get the queue lock, non-blocking, ensures that at + least somebody is working on this queue. */ + if (lock_trylock(&q->lock) == 0) { + queue_get_incoming(q); + if (lock_unlock(&q->lock) != 0) error("Unlocking the qlock failed.\n"); + } + } } /** @@ -120,6 +145,15 @@ void queue_init(struct queue *q, struct task *tasks) { /* Init the queue lock. */ if (lock_init(&q->lock) != 0) error("Failed to init queue lock."); + + /* Init the incoming DEQ. */ + if ((q->tid_incoming = (int *)malloc(sizeof(int) * queue_incoming_size)) == NULL) + error("Failed to allocate queue incoming buffer."); + for (int k = 0; k < queue_incoming_size; k++) { + q->tid_incoming[k] = -1; + } + q->first_incoming = 0; + q->last_incoming = 0; } /** @@ -142,6 +176,9 @@ struct task *queue_gettask(struct queue *q, const struct task *prev, } else { if (lock_trylock(qlock) != 0) return NULL; } + + /* Fill any tasks from the incoming DEQ. */ + queue_get_incoming(q); /* If there are no tasks, leave immediately. */ if (q->count == 0) { diff --git a/src/queue.h b/src/queue.h index b28039a30724fa0c086f9111470af2566933e3ac..7b41fa78ca11beb76d9ab6c7938d83bf782ce0e1 100644 --- a/src/queue.h +++ b/src/queue.h @@ -29,6 +29,7 @@ #define queue_sizeinit 100 #define queue_sizegrow 2 #define queue_search_window 8 +#define queue_incoming_size 1024 /* Counters. */ enum { @@ -51,6 +52,10 @@ struct queue { /* The task indices. */ int *tid; + + /* DEQ for incoming tasks. */ + int *tid_incoming; + volatile unsigned int first_incoming, last_incoming; } __attribute__((aligned(64)));