Commit 06efb8a9 authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

massive refactoring, added spearate scheduler, new scheduling policy based on...

massive refactoring, added spearate scheduler, new scheduling policy based on task rank, should expand to task weight some time soon.


Former-commit-id: 40f69e4385302ae59779b3d58eac557331d5cd42
parent 76f86ea0
......@@ -32,9 +32,9 @@ AM_LDFLAGS = $(LAPACK_LIBS) $(BLAS_LIBS) $(HDF5_LDFLAGS) -version-info 0:0:0
# Build the libswiftsim library
lib_LTLIBRARIES = libswiftsim.la
libswiftsim_la_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \
io.c timers.c debug.c
io.c timers.c debug.c scheduler.c
# List required headers
include_HEADERS = space.h runner.h queue.h task.h lock.h cell.h part.h const.h \
engine.h swift.h io.h timers.h debug.h
engine.h swift.h io.h timers.h debug.h scheduler.h
......@@ -44,6 +44,7 @@
#include "cell.h"
#include "space.h"
#include "queue.h"
#include "scheduler.h"
#include "engine.h"
#include "runner.h"
#include "runner_iact.h"
......@@ -54,67 +55,304 @@
/**
* @brief Prepare the #engine by re-building the cells and tasks.
* @brief Fill the #space's task list.
*
* @param e The #engine to prepare.
* @param s The #space we are working in.
*/
void engine_prepare ( struct engine *e ) {
void engine_maketasks ( struct engine *e ) {
int j, k, qid, rebuild;
struct space *s = e->s;
struct queue *q;
struct scheduler *sched = &e->sched;
int i, j, k, ii, jj, kk, iii, jjj, kkk, cid, cjd, sid;
int *cdim = s->cdim;
struct task *t, *t2;
struct cell *ci, *cj;
/* Re-set the scheduler. */
scheduler_reset( sched , s->tot_cells * space_maxtaskspercell );
/* Run through the highest level of cells and add pairs. */
for ( i = 0 ; i < cdim[0] ; i++ )
for ( j = 0 ; j < cdim[1] ; j++ )
for ( k = 0 ; k < cdim[2] ; k++ ) {
cid = cell_getid( cdim , i , j , k );
if ( s->cells[cid].count == 0 )
continue;
ci = &s->cells[cid];
if ( ci->count == 0 )
continue;
scheduler_addtask( sched , task_type_self , task_subtype_density , 0 , 0 , ci , NULL , 0 );
for ( ii = -1 ; ii < 2 ; ii++ ) {
iii = i + ii;
if ( !s->periodic && ( iii < 0 || iii >= cdim[0] ) )
continue;
iii = ( iii + cdim[0] ) % cdim[0];
for ( jj = -1 ; jj < 2 ; jj++ ) {
jjj = j + jj;
if ( !s->periodic && ( jjj < 0 || jjj >= cdim[1] ) )
continue;
jjj = ( jjj + cdim[1] ) % cdim[1];
for ( kk = -1 ; kk < 2 ; kk++ ) {
kkk = k + kk;
if ( !s->periodic && ( kkk < 0 || kkk >= cdim[2] ) )
continue;
kkk = ( kkk + cdim[2] ) % cdim[2];
cjd = cell_getid( cdim , iii , jjj , kkk );
cj = &s->cells[cjd];
if ( cid >= cjd || cj->count == 0 )
continue;
sid = sortlistID[ (kk+1) + 3*( (jj+1) + 3*(ii+1) ) ];
t = scheduler_addtask( sched , task_type_pair , task_subtype_density , sid , 0 , ci , cj , 1 );
}
}
}
}
/* Split the tasks. */
scheduler_splittasks( sched );
/* Count the number of tasks associated with each cell and
store the density tasks in each cell, and make each sort
depend on the sorts of its progeny. */
// #pragma omp parallel for private(t,j)
for ( k = 0 ; k < sched->nr_tasks ; k++ ) {
t = &sched->tasks[k];
if ( t->skip )
continue;
if ( t->type == task_type_sort && t->ci->split )
for ( j = 0 ; j < 8 ; j++ ) {
if ( t->ci->progeny[j] != NULL ) {
if ( t->ci->progeny[j]->sorts == NULL )
t->ci->progeny[j]->sorts = scheduler_addtask( sched , task_type_sort , task_subtype_none , t->flags , 0 , t->ci->progeny[j] , NULL , 0 );
t->ci->progeny[j]->sorts->skip = 0;
task_addunlock( t->ci->progeny[j]->sorts , t );
}
}
if ( t->type == task_type_self ) {
atomic_inc( &t->ci->nr_tasks );
if ( t->subtype == task_subtype_density ) {
t->ci->density[ atomic_inc( &t->ci->nr_density ) ] = t;
}
}
else if ( t->type == task_type_pair ) {
atomic_inc( &t->ci->nr_tasks );
atomic_inc( &t->cj->nr_tasks );
if ( t->subtype == task_subtype_density ) {
t->ci->density[ atomic_inc( &t->ci->nr_density ) ] = t;
t->cj->density[ atomic_inc( &t->cj->nr_density ) ] = t;
}
}
else if ( t->type == task_type_sub ) {
atomic_inc( &t->ci->nr_tasks );
if ( t->cj != NULL )
atomic_inc( &t->cj->nr_tasks );
if ( t->subtype == task_subtype_density ) {
t->ci->density[ atomic_inc( &t->ci->nr_density ) ] = t;
if ( t->cj != NULL )
t->cj->density[ atomic_inc( &t->cj->nr_density ) ] = t;
}
}
}
/* Append a ghost task to each cell. */
space_map_cells_pre( s , 1 , &scheduler_map_mkghosts , sched );
/* Run through the tasks and make force tasks for each density task.
Each force task depends on the cell ghosts and unlocks the kick2 task
of its super-cell. */
kk = sched->nr_tasks;
// #pragma omp parallel for private(t,t2)
for ( k = 0 ; k < kk ; k++ ) {
/* Get a pointer to the task. */
t = &sched->tasks[k];
/* Skip? */
if ( t->skip )
continue;
/* Self-interaction? */
if ( t->type == task_type_self && t->subtype == task_subtype_density ) {
task_addunlock( t , t->ci->super->ghost );
t2 = scheduler_addtask( sched , task_type_self , task_subtype_force , 0 , 0 , t->ci , NULL , 0 );
task_addunlock( t->ci->ghost , t2 );
task_addunlock( t2 , t->ci->super->kick2 );
}
/* Otherwise, pair interaction? */
else if ( t->type == task_type_pair && t->subtype == task_subtype_density ) {
task_addunlock( t , t->ci->super->ghost );
if ( t->ci->super != t->cj->super )
task_addunlock( t , t->cj->super->ghost );
t2 = scheduler_addtask( sched , task_type_pair , task_subtype_force , 0 , 0 , t->ci , t->cj , 0 );
task_addunlock( t->ci->ghost , t2 );
task_addunlock( t->cj->ghost , t2 );
task_addunlock( t2 , t->ci->super->kick2 );
if ( t->ci->super != t->cj->super )
task_addunlock( t2 , t->cj->super->kick2 );
}
/* Otherwise, sub interaction? */
else if ( t->type == task_type_sub && t->subtype == task_subtype_density ) {
task_addunlock( t , t->ci->super->ghost );
if ( t->cj != NULL && t->ci->super != t->cj->super )
task_addunlock( t , t->cj->super->ghost );
t2 = scheduler_addtask( sched , task_type_sub , task_subtype_force , t->flags , 0 , t->ci , t->cj , 0 );
task_addunlock( t->ci->ghost , t2 );
if ( t->cj != NULL )
task_addunlock( t->cj->ghost , t2 );
task_addunlock( t2 , t->ci->super->kick2 );
if ( t->cj != NULL && t->ci->super != t->cj->super )
task_addunlock( t2 , t->cj->super->kick2 );
}
}
/* Rank the tasks. */
scheduler_ranktasks( sched );
/* Count the number of each task type. */
int counts[ task_type_count+1 ];
for ( k = 0 ; k <= task_type_count ; k++ )
counts[k] = 0;
for ( k = 0 ; k < sched->nr_tasks ; k++ )
if ( !sched->tasks[k].skip )
counts[ (int)sched->tasks[k].type ] += 1;
else
counts[ task_type_count ] += 1;
printf( "engine_maketasks: task counts are [ %s=%i" , taskID_names[0] , counts[0] );
for ( k = 1 ; k < task_type_count ; k++ )
printf( " %s=%i" , taskID_names[k] , counts[k] );
printf( " skipped=%i ]\n" , counts[ task_type_count ] ); fflush(stdout);
}
TIMER_TIC
/* Rebuild the space. */
// tic = getticks();
rebuild = ( space_prepare( e->s ) || e->step == 0 );
// printf( "engine_prepare: space_prepare took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 );
/**
* @brief Mark tasks to be skipped and set the sort flags accordingly.
*
* @return 1 if the space has to be rebuilt, 0 otherwise.
*/
int engine_marktasks ( struct engine *e ) {
struct scheduler *s = &e->sched;
int k, nr_tasks = s->nr_tasks, *ind = s->tasks_ind;
struct task *t, *tasks = s->tasks;
float dt_step = e->dt_step;
struct cell *ci, *cj;
/* The queues only need to be re-built if we have variable time-steps
or the space was rebuilt. */
if ( !(e->policy & engine_policy_fixdt) || rebuild ) {
/* Run through the tasks and mark as skip or not. */
for ( k = 0 ; k < nr_tasks ; k++ ) {
// tic = getticks();
/* Init the queues (round-robin). */
for ( qid = 0 ; qid < e->nr_queues ; qid++ )
queue_init( &e->queues[qid] , s->nr_tasks , s->tasks );
/* Fill the queues (round-robin). */
for ( qid = 0 , k = 0 ; k < s->nr_tasks ; k++ ) {
if ( s->tasks[ s->tasks_ind[k] ].skip )
continue;
q = &e->queues[qid];
qid = ( qid + 1 ) % e->nr_queues;
q->tid[ q->count ] = s->tasks_ind[k];
q->count += 1;
/* Get a handle on the kth task. */
t = &tasks[ ind[k] ];
/* Sort-task? Note that due to the task ranking, the sorts
will all come before the pairs and/or subs. */
if ( t->type == task_type_sort ) {
/* Re-set the flags. */
t->flags = 0;
t->skip = 1;
}
// printf( "engine_prepare: re-filling queues took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 );
}
/* Single-cell task? */
else if ( t->type == task_type_self ||
t->type == task_type_ghost ||
( t->type == task_type_sub && t->cj == NULL ) ) {
/* Set this task's skip. */
t->skip = ( t->ci->dt_min > dt_step );
}
/* Otherwise, just re-set them. */
else {
for ( qid = 0 ; qid < e->nr_queues ; qid++ )
e->queues[qid].next = 0;
/* Pair? */
else if ( t->type == task_type_pair || ( t->type == task_type_sub && t->cj != NULL ) ) {
/* Local pointers. */
ci = t->ci;
cj = t->cj;
/* Set this task's skip. */
t->skip = ( ci->dt_min > dt_step && cj->dt_min > dt_step );
/* Too much particle movement? */
if ( t->tight &&
( fmaxf( ci->h_max , cj->h_max ) + ci->dx_max + cj->dx_max > cj->dmin ||
ci->dx_max > space_maxreldx*ci->h_max || cj->dx_max > space_maxreldx*cj->h_max ) )
return 1;
/* Set the sort flags. */
if ( !t->skip && t->type == task_type_pair ) {
ci->sorts->flags |= (1 << t->flags);
ci->sorts->skip = 0;
cj->sorts->flags |= (1 << t->flags);
cj->sorts->skip = 0;
}
}
/* Kick2? */
else if ( t->type == task_type_kick2 )
t->skip = 0;
/* None? */
else if ( t->type == task_type_none )
t->skip = 1;
}
/* All is well... */
return 0;
}
/**
* @brief Prepare the #engine by re-building the cells and tasks.
*
* @param e The #engine to prepare.
*/
void engine_prepare ( struct engine *e ) {
int rebuild;
TIMER_TIC
/* Run throught the tasks and get all the waits right. */
/* Run through the tasks and mark as skip or not. */
// tic = getticks();
#pragma omp parallel for schedule(static) private(j)
for ( k = 0 ; k < s->nr_tasks ; k++ ) {
if ( s->tasks[k].skip )
continue;
for ( j = 0 ; j < s->tasks[k].nr_unlock_tasks ; j++ )
atomic_inc( &s->tasks[k].unlock_tasks[j]->wait );
}
// printf( "engine_prepare: preparing task dependencies took %.3f ms.\n" , (double)(getticks() - tic) / CPU_TPS * 1000 );
rebuild = ( e->step == 0 || engine_marktasks( e ) );
// printf( "space_prepare: space_marktasks took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 );
/* Did this not go through? */
if ( rebuild ) {
/* Re-set the queues.*/
for ( k = 0 ; k < e->nr_queues ; k++ )
e->queues[k].next = 0;
/* Re-build the space. */
tic = getticks();
space_rebuild( e->s , 0.0 );
printf( "engine_prepare: space_rebuild took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 );
/* Re-build the tasks. */
tic = getticks();
engine_maketasks( e );
printf( "engine_prepare: engine_maketasks took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 );
/* Run through the tasks and mark as skip or not. */
// tic = getticks();
if ( engine_marktasks( e ) )
error( "engine_marktasks failed after space_rebuild." );
// printf( "engine_prepare: engine_marktasks took %.3f ms.\n" , (double)(getticks() - tic)/CPU_TPS*1000 );
}
/* Start the scheduler. */
scheduler_start( &e->sched );
TIMER_TOC( timer_prepare );
}
......@@ -455,7 +693,7 @@ void engine_single_force ( double *dim , long long int pid , struct part *__rest
* @param sort_queues Flag to try to sort the queues topologically.
*/
void engine_step ( struct engine *e , int sort_queues ) {
void engine_step ( struct engine *e ) {
int k;
float dt = e->dt, dt_step, dt_max = 0.0f, dt_min = FLT_MAX;
......@@ -509,15 +747,6 @@ void engine_step ( struct engine *e , int sort_queues ) {
/* Prepare the space. */
engine_prepare( e );
/* Sort the queues?*/
if ( sort_queues ) {
#pragma omp parallel for default(none), shared(e)
for ( k = 0 ; k < e->nr_queues ; k++ ) {
queue_sort( &e->queues[k] );
e->queues[k].next = 0;
}
}
// engine_single_density( e->s->dim , 3392063069037 , e->s->parts , e->s->nr_parts , e->s->periodic );
/* Start the clock. */
......@@ -634,7 +863,6 @@ void engine_init ( struct engine *e , struct space *s , float dt , int nr_thread
/* Store the values. */
e->s = s;
e->nr_threads = nr_threads;
e->nr_queues = nr_queues;
e->policy = policy;
e->step = 0;
e->nullstep = 0;
......@@ -661,14 +889,8 @@ void engine_init ( struct engine *e , struct space *s , float dt , int nr_thread
dt *= 0.5f;
e->dt = dt;
/* Allocate the queues. */
if ( posix_memalign( (void *)(&e->queues) , 64 , nr_queues * sizeof(struct queue) ) != 0 )
error( "Failed to allocate queues." );
bzero( e->queues , nr_queues * sizeof(struct queue) );
/* Sort the queues topologically. */
// for ( k = 0 ; k < nr_queues ; k++ )
// queue_sort( &e->queues[k] );
/* Init the scheduler. */
scheduler_init( &e->sched , e->s , nr_queues , scheduler_flag_steal );
/* Allocate and init the threads. */
if ( ( e->runners = (struct runner *)malloc( sizeof(struct runner) * nr_threads ) ) == NULL )
......
......@@ -46,11 +46,8 @@ struct engine {
/* The running policy. */
int policy;
/* The number of queues. */
int nr_queues;
/* The queues. */
struct queue *queues;
/* The task scheduler. */
struct scheduler sched;
/* The maximum dt to step (current). */
float dt_step;
......@@ -85,4 +82,5 @@ struct engine {
void engine_barrier( struct engine *e );
void engine_init ( struct engine *e , struct space *s , float dt , int nr_threads , int nr_queues , int policy );
void engine_prepare ( struct engine *e );
void engine_step ( struct engine *e , int sort_queues );
void engine_step ( struct engine *e );
void engine_maketasks ( struct engine *e );
......@@ -37,6 +37,7 @@
#include "task.h"
#include "part.h"
#include "space.h"
#include "scheduler.h"
#include "engine.h"
#include "error.h"
#include "kernel.h"
......
......@@ -24,12 +24,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <math.h>
#include <float.h>
#include <limits.h>
#include <omp.h>
#include <sched.h>
/* Local headers. */
#include "cycle.h"
......@@ -93,11 +87,30 @@ void queue_insert ( struct queue *q , struct task *t ) {
if ( lock_lock( &q->lock ) != 0 )
error( "Failed to get queue lock." );
/* Swap next task to end. */
q->tid[ q->count ] = q->tid[ q->next ];
/* Does the queue need to be grown? */
if ( q->count == q->size ) {
int *temp;
q->size *= queue_sizegrow;
if ( ( temp = (int *)malloc( sizeof(int) * q->size ) ) == NULL )
error( "Failed to allocate new indices." );
memcpy( temp , q->tid , sizeof(int) * q->count );
free( q->tid );
q->tid = temp;
}
/* Drop the task at the end of the queue. */
q->tid[ q->count ] = ( t - q->tasks );
q->count += 1;
q->tid[ q->next ] = t - q->tasks;
q->next += 1;
/* Shuffle up. */
for ( int k = q->count - 1 ; k > 0 ; k /= 2 )
if ( q->tasks[ q->tid[k] ].rank < q->tasks[ q->tid[k/2] ].rank ) {
int temp = q->tid[k];
q->tid[k] = q->tid[k/2];
q->tid[k/2] = temp;
}
else
break;
/* Unlock the queue. */
if ( lock_unlock( &q->lock ) != 0 )
......@@ -113,23 +126,18 @@ void queue_insert ( struct queue *q , struct task *t ) {
* @param tasks List of tasks to which the queue indices refer to.
*/
void queue_init ( struct queue *q , int size , struct task *tasks ) {
void queue_init ( struct queue *q , struct task *tasks ) {
/* Allocate the task list if needed. */
if ( q->tid == NULL || q->size < size ) {
if ( q->tid != NULL )
free( q->tid );
q->size = size;
if ( ( q->tid = (int *)malloc( sizeof(int) * size ) ) == NULL )
error( "Failed to allocate queue tids." );
}
q->size = queue_sizeinit;
if ( ( q->tid = (int *)malloc( sizeof(int) * q->size ) ) == NULL )
error( "Failed to allocate queue tids." );
/* Set the tasks pointer. */
q->tasks = tasks;
/* Init counters. */
q->count = 0;
q->next = 0;
/* Init the queue lock. */
if ( lock_init( &q->lock ) != 0 )
......@@ -146,157 +154,29 @@ void queue_init ( struct queue *q , int size , struct task *tasks ) {
* @param keep Remove the returned task from this queue.
*/
struct task *queue_gettask_old ( struct queue *q , int blocking , int keep ) {
struct task *queue_gettask ( struct queue *q , int qid , int blocking ) {
int k, tid = -1, qcount, *qtid = q->tid;
int k, qcount, *qtid = q->tid;
lock_type *qlock = &q->lock;
struct task *qtasks = q->tasks, *res = NULL;
TIMER_TIC
/* If there are no tasks, leave immediately. */
if ( q->next >= q->count ) {
if ( q->count == 0 ) {
TIMER_TOC(queue_timer_gettask);
return NULL;
}
/* Main loop, while there are tasks... */
while ( q->next < q->count ) {
while ( q->count > 0 ) {
/* Grab the task lock. */
// if ( blocking ) {
if ( lock_lock( qlock ) != 0 )
error( "Locking the task_lock failed.\n" );
// }
// else {
// if ( lock_trylock( qlock ) != 0 )
// break;
// }
if ( lock_lock( qlock ) != 0 )
error( "Locking the qlock failed.\n" );
/* Loop over the remaining task IDs. */
qcount = q->count;
for ( k = q->next ; k < qcount ; k++ ) {
/* Put a finger on the task. */
res = &qtasks[ qtid[k] ];
/* Is this task blocked? */
if ( res->wait )
continue;
/* Different criteria for different types. */
if ( res->type == task_type_self || res->type == task_type_sort || (res->type == task_type_sub && res->cj == NULL) ) {
if ( res->ci->hold || cell_locktree( res->ci ) != 0 )
continue;
}
else if ( res->type == task_type_pair || (res->type == task_type_sub && res->cj != NULL) ) {
if ( res->ci->hold || res->cj->hold || res->ci->wait || res->cj->wait )
continue;
if ( cell_locktree( res->ci ) != 0 )
continue;
if ( cell_locktree( res->cj ) != 0 ) {
cell_unlocktree( res->ci );
continue;
}
}
/* If we made it this far, we're safe. */
break;
} /* loop over the task IDs. */
/* Did we get a task? */
if ( k < qcount ) {
/* Do we need to swap? */
if ( k != q->next )
COUNT(queue_counter_swap);
/* get the task ID. */
tid = qtid[k];
/* Remove the task? */
if ( keep ) {
/* Bubble-up. */
q->count = qcount - 1;
for ( ; k < qcount - 1 ; k++ )
qtid[k] = qtid[k+1];
}
/* No, leave it in the queue. */
else {