Skip to content
Snippets Groups Projects
Commit 20c7ed6c authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

more specific type names, routines for dynamic task creation, as yet untested.

parent bf44c60f
Branches
No related tags found
No related merge requests found
......@@ -46,6 +46,190 @@
#include "queue.h"
/**
* @brief Make sure that the scheduler has enough memory
* allocated for the tasks, resources, dependencies,
* locks, uses, and data.
*
* @param s Pointer to the #qsched.
* @param nr_tasks Minimum number of tasks.
* @param nr_res Minimum number of reources.
* @param nr_deps Minimum number of dependencies.
* @param nr_locks Minimum number of locks.
* @param nr_uses Minimum number of uses.
* @param size_data Mnimum size of task data.
*
* This procedure pre-allocates resources in the sched. Although
* These buffers are grown automatically, this is not possible
* when dynamic tasks are spawned (see #qsched_addtask_dynamic),
* and insufficient buffers my cause dynamic task allocation to fail!
*
* Keep in mind that the individual blocks of task data are
* rounded up to 16 bytes for alignment.
* The number of dependencies, locks, and uses refers to the
* number of times #qsched_addunlock, #qsched_addlock, and
* #qsched_adduses are called, respectively.
*/
void qsched_ensure ( struct qsched *s , int nr_tasks , int nr_res , int nr_deps , int nr_locks , int nr_uses , int size_data ) {
int dirty = 0;
/* Re-allocate tasks? */
if ( s->size < nr_tasks ) {
dirty = 1;
struct task *tasks_new;
if ( ( tasks_new = (struct task *)malloc( sizeof(struct task) * nr_tasks ) ) == NULL )
error( "Failed to allocate new task buffer." );
memcpy( tasks_new , s->tasks , sizeof(struct task) * s->count );
free( s->tasks );
s->tasks = tasks_new;
s->size = nr_tasks;
}
/* Re-allocate resources? */
if ( s->size_res < nr_res ) {
dirty = 1;
struct res *res_new;
if ( ( res_new = (struct res *)malloc( sizeof(struct res) * nr_res ) ) == NULL )
error( "Failed to allocate new res buffer." );
memcpy( res_new , s->res , sizeof(struct res) * s->count_res );
free( s->res );
s->res = res_new;
s->size_res = nr_res;
}
/* Re-allocate dependencies? */
if ( s->size_deps < nr_deps ) {
dirty = 1;
qsched_task_t *deps_new, *deps_key_new;
if ( ( deps_new = (qsched_task_t *)malloc( sizeof(qsched_task_t) * nr_deps ) ) == NULL ||
( deps_key_new = (qsched_task_t *)malloc( sizeof(qsched_task_t) * nr_deps ) ) == NULL )
error( "Failed to allocate new deps buffer." );
memcpy( deps_new , s->deps , sizeof(qsched_task_t) * s->count_deps );
memcpy( deps_key_new , s->deps_key , sizeof(qsched_task_t) * s->count_deps );
free( s->deps );
free( s->deps_key );
s->deps = deps_new;
s->deps_key = deps_key_new;
s->size_deps = nr_deps;
}
/* Re-allocate locks? */
if ( s->size_locks < nr_locks ) {
dirty = 1;
qsched_res_t *locks_new;
qsched_task_t *locks_key_new;
if ( ( locks_new = (qsched_res_t *)malloc( sizeof(qsched_res_t) * nr_locks ) ) == NULL ||
( locks_key_new = (qsched_task_t *)malloc( sizeof(qsched_task_t) * nr_locks ) ) == NULL )
error( "Failed to allocate new locks buffer." );
memcpy( locks_new , s->locks , sizeof(qsched_res_t) * s->count_locks );
memcpy( locks_key_new , s->locks_key , sizeof(qsched_task_t) * s->count_locks );
free( s->locks );
free( s->locks_key );
s->locks = locks_new;
s->locks_key = locks_key_new;
s->size_locks = nr_locks;
}
/* Re-allocate uses? */
if ( s->size_uses < nr_uses ) {
dirty = 1;
qsched_res_t *uses_new;
qsched_task_t *uses_key_new;
if ( ( uses_new = (qsched_res_t *)malloc( sizeof(qsched_res_t) * nr_uses ) ) == NULL ||
( uses_key_new = (qsched_task_t *)malloc( sizeof(qsched_task_t) * nr_uses ) ) == NULL )
error( "Failed to allocate new uses buffer." );
memcpy( uses_new , s->uses , sizeof(qsched_res_t) * s->count_uses );
memcpy( uses_key_new , s->uses_key , sizeof(qsched_task_t) * s->count_uses );
free( s->uses );
free( s->uses_key );
s->uses = uses_new;
s->uses_key = uses_key_new;
s->size_uses = nr_uses;
}
/* Re-allocate resources? */
if ( s->size_data < size_data ) {
dirty = 1;
char *data_new;
if ( ( data_new = (char *)malloc( size_data ) ) == NULL )
error( "Failed to allocate new data buffer." );
memcpy( data_new , s->data , s->count_data );
free( s->data );
s->data = data_new;
s->size_data = size_data;
}
/* Mark scheduler if dirty. */
if ( dirty )
s->flags |= qsched_flag_dirty;
}
/**
* @brief Add a task to the scheduler on the fly.
*
* @param s Pointer to the #qsched.
* @param type Task type.
* @param flags Task flags.
* @param data Task data.
* @param data_size Size, in bytes, of task data.
* @param cost Relative task computational cost.
* @param locks Array of #qsched_res_t locked by this task.
* @param nr_locks Number of locked resources.
* @param uses Array of #qsched_res_t used by this task.
* @param nr_uses Number of used resources.
*/
void qsched_addtask_dynamic ( struct qsched *s , int type , unsigned int flags , void *data , int data_size , int cost , qsched_res_t *locks , int nr_locks , qsched_res_t *uses , int nr_uses ) {
int k, tid, ind, data_size2;
struct task *t;
/* Allocate a new task. */
if ( ( tid = atomic_inc( &s->count ) ) >= s->size )
error( "Task buffer overflow." );
t = &s->tasks[ tid ];
/* Set the task data. */
t->type = type;
t->flags = flags;
t->weight = t->cost = cost;
t->nr_unlocks = 0;
t->unlocks = NULL;
/* Add the data. */
data_size2 = ( data_size + (qsched_data_round-1) ) & ~(qsched_data_round-1);
if ( ( ind = atomic_add( &s->count_data , data_size2 ) ) + data_size2 > s->size_data )
error( "Data buffer overflow." );
memcpy( &s->data[ ind ] , data , data_size );
t->data = ind;
/* Add the locks. */
if ( ( ind = atomic_add( &s->count_locks , nr_locks ) ) + nr_locks > s->size_locks )
error( "Locks buffer overflow." );
memcpy( &s->locks[ ind ] , locks , sizeof(qsched_res_t)*nr_locks );
for ( k = 0 ; k < nr_locks ; k++ )
s->locks_key[ ind + k ] = tid;
t->locks = &s->locks[ ind ];
/* Add the uses. */
if ( ( ind = atomic_add( &s->count_uses , nr_uses ) ) + nr_uses > s->size_uses )
error( "uses buffer overflow." );
memcpy( &s->uses[ ind ] , uses , sizeof(qsched_res_t)*nr_uses );
for ( k = 0 ; k < nr_uses ; k++ )
s->uses_key[ ind + k ] = tid;
t->uses = &s->uses[ ind ];
/* The task is now ready to run, submit it. */
t->wait = 0;
qsched_enqueue( s , t );
}
/**
* @brief Clear the tasks and resources in a scheduler.
*
......
......@@ -59,13 +59,13 @@ struct qsched {
struct task *tasks;
/* The dependency indices. */
int *deps, *deps_key;
qsched_task_t *deps;
/* The conflict/lock array. */
int *locks, *locks_key;
qsched_res_t *locks, *uses;
/* The conflict/use array. */
int *uses, *uses_key;
/* Sorting indices. */
qsched_task_t *deps_key, *locks_key, *uses_key;
/* The shared resources. */
// lock_type *res;
......@@ -141,6 +141,7 @@ void qsched_unlockres ( struct qsched *s , int rid );
int qsched_locktask ( struct qsched *s , int tid );
void qsched_unlocktask ( struct qsched *s , int tid );
void qsched_prepare ( struct qsched *s );
void qsched_enqueue ( struct qsched *s , struct task *t );
/* External functions. */
void qsched_init ( struct qsched *s , int nr_queues , int flags );
......@@ -152,3 +153,5 @@ void qsched_adduse ( struct qsched *s , qsched_task_t t , qsched_res_t res );
void qsched_free ( struct qsched *s );
void qsched_run ( struct qsched *s , int nr_threads , qsched_funtype fun );
void qsched_reset ( struct qsched *s );
void qsched_addtask_dynamic ( struct qsched *s , int type , unsigned int flags , void *data , int data_size , int cost , qsched_res_t *locks , int nr_locks , qsched_res_t *uses , int nr_uses );
void qsched_ensure ( struct qsched *s , int nr_tasks , int nr_res , int nr_deps , int nr_locks , int nr_uses , int size_data );
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment