queue.c 6.24 KiB
/*******************************************************************************
* This file is part of QuickSched.
* Coypright (c) 2013 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/* Config parameters. */
#include "../config.h"
/* Standard includes. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef WITH_MPI
#include <mpi.h>
#endif
/* Local includes. */
#include "error.h"
#include "cycle.h"
#include "atomic.h"
#include "lock.h"
#include "task.h"
#include "qsched.h"
#include "queue.h"
/**
* @brief Get a task index from the given #queue.
*
* @param q The #queue.
* @param s The #qsched in which this queue's tasks lives.
* @param insist If set, wait at the queue's lock, otherwise fail.
*
* @return The task ID or -1 if no available task could be found.
*/
int queue_get ( struct queue *q , struct qsched *s , int insist ) {
int k, j, temp, tid, *inds, count;
struct task *tasks = s->tasks;
/* Should we even try? */
if ( q->count == 0 )
return -1;
/* Lock this queue. */
TIMER_TIC
if ( insist ) {
if ( lock_lock( &q->lock ) != 0 )
error( "Failed to lock queue." );
}
else if ( lock_trylock( &q->lock ) != 0 )
return qsched_task_none;
TIMER_TOC( s , qsched_timer_qlock );
/* Get a pointer to the indices. */
inds = q->inds;
count = q->count;
/* Loop over the queue entries. */
for ( k = 0 ; k < count ; k++ ) {
/* Get the task ID. */
tid = inds[k];
/* If the task can be locked, break. */
if ( qsched_locktask( s , tid ) )
break;
}
/* Did we get a task? */
if ( k < count ) {
/* Swap the last element to the new heap position. */
q->count = ( count -= 1 );
inds[k] = inds[ count ];
/* Fix the heap. */
long long int w = tasks[ inds[k] ].weight;
if ( k > 0 && w > tasks[ inds[(k-1)/2] ].weight )
while ( k > 0 ) {
j = (k - 1) / 2;
if ( w > tasks[ inds[j] ].weight ) {
temp = inds[j];
inds[j] = inds[k];
inds[k] = temp;
k = j;
}
else
break;
}
else
while ( 1 ) {
if ( ( j = 2*k + 1 ) >= count )
break;
if ( j+1 < count && tasks[ inds[j] ].weight < tasks[ inds[j+1] ].weight )
j = j + 1;
if ( tasks[ inds[j] ].weight > w ) {
temp = inds[j];
inds[j] = inds[k];
inds[k] = temp;
k = j;
}
else
break;
}
} /* did we get a task? */
/* Otherwise, clear the task ID. */
else
tid = qsched_task_none;
/* Unlock the queue. */
lock_unlock_blind( &q->lock );
/* Return the task ID. */
return tid;
}
/**
* @brief Add a task index to the given #queue.
*
* @param q The #queue.
* @param s The #qsched in which the tasks live.
* @param tid The task index.
*/
void queue_put ( struct queue *q , struct qsched *s , int tid ) {
int ind, j, temp;
struct task *tasks = s->tasks;
int *inds, *inds_new;
/* Lock this queue. */
if ( lock_lock( &q->lock ) != 0 )
error( "Failed to lock queue." );
/* Get a pointer to the indices. */
inds = q->inds;
/* Get the index of the new task. */
ind = q->count;
/* Does the queue need to be extended? */
if ( ind >= q->size ) {
/* Increase the queue size. */
q->size *= queue_grow;
/* Allocate the new indices. */
if ( ( inds_new = (int *)malloc( sizeof(int) * q->size ) ) == NULL )
error( "Failed to allocate new indices." );
/* Copy the old indices. */
memcpy( inds_new , inds , sizeof(int) * q->count );
/* Clear the old indices and replace them with the new. */
free( inds );
q->inds = ( inds = inds_new );
}
/* Store the task index. */
q->count += 1;
inds[ind] = tid;
/* Bubble up the new entry. */
long long int w = tasks[ inds[ind] ].weight;
while ( ind > 0 ) {
j = (ind - 1)/2;
if ( tasks[ inds[j] ].weight < w ) {
temp = inds[j];
inds[j] = inds[ind];
inds[ind] = temp;
ind = j;
}
else
break;
}
/* Unlock the queue. */
lock_unlock_blind( &q->lock );
}
/**
* @brief Clean up a queue and free its memory.
*/
void queue_free ( struct queue *q ) {
/* Free the inds. */
if ( q->inds != NULL )
free( (void *)q->inds );
}
/**
* @brief Initialize the given #queue.
*
* @param q The #queue.
* @param size The maximum size of the queue.
*/
void queue_init ( struct queue *q , int size ) {
/* Allocate the task list if needed. */
if ( q->inds == NULL || q->size < size ) {
if ( q->inds != NULL )
free( (int *)q->inds );
q->size = size;
if ( ( q->inds = (int *)malloc( sizeof(int) * size ) ) == NULL )
error( "Failed to allocate queue inds." );
}
q->size = size;
/* Init the lock. */
lock_init( &q->lock );
/* Init the count. */
q->count = 0;
}