Commit cf1c99ef authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

split-off engine type.


Former-commit-id: e13f23d1ae2d6365cb8bc7261afabb478c164c3d
parent 13344759
......@@ -26,8 +26,9 @@ AM_LDFLAGS = $(LAPACK_LIBS) $(BLAS_LIBS) -version-info 0:0:0
# Build the libgadgetsmp library
lib_LTLIBRARIES = libgadgetsmp.la
libgadgetsmp_la_SOURCES = space.c runner.c queue.c task.c cell.c
libgadgetsmp_la_SOURCES = space.c runner.c queue.c task.c cell.c engine.c
# List required headers
include_HEADERS = space.h runner.h queue.h task.h lock.h cell.h part.h gadgetsmp.h
include_HEADERS = space.h runner.h queue.h task.h lock.h cell.h part.h const.h \
engine.h gadgetsmp.h
/*******************************************************************************
* This file is part of GadgetSMP.
* Coypright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/* Config parameters. */
#include "../config.h"
/* Some standard headers. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <math.h>
#include <float.h>
#include <limits.h>
#include <omp.h>
#include <sched.h>
/* Local headers. */
#include "cycle.h"
#include "const.h"
#include "lock.h"
#include "task.h"
#include "part.h"
#include "cell.h"
#include "space.h"
#include "queue.h"
#include "engine.h"
#include "runner.h"
#include "runner_iact.h"
/* Error macro. */
#define error(s) { printf( "%s:%s:%i: %s\n" , __FILE__ , __FUNCTION__ , __LINE__ , s ); abort(); }
/* Convert cell location to ID. */
#define cell_getid( cdim , i , j , k ) ( (int)(k) + (cdim)[2]*( (int)(j) + (cdim)[1]*(int)(i) ) )
/**
* @brief Sort the tasks in topological order over all queues.
*
* @param e The #engine.
*/
void engine_ranktasks ( struct engine *e ) {
int i, j = 0, k, temp, left = 0, rank;
struct task *t;
struct space *s = e->s;
int *tid;
/* Run throught the tasks and get all the waits right. */
for ( k = 0 ; k < s->nr_tasks ; k++ ) {
for ( j = 0 ; j < s->tasks[k].nr_unlock_tasks ; j++ )
s->tasks[k].unlock_tasks[j]->wait += 1;
}
/* Allocate and init the task-ID array. */
if ( ( tid = (int *)malloc( sizeof(int) * s->nr_tasks ) ) == NULL )
error( "Failed to allocate temporary tid array." );
for ( k = 0 ; k < s->nr_tasks ; k++ )
tid[k] = k;
/* Main loop. */
for ( rank = 0 ; left < s->nr_tasks ; rank++ ) {
/* Load the tids of tasks with no waits. */
for ( k = left ; k < s->nr_tasks ; k++ )
if ( s->tasks[ tid[k] ].wait == 0 ) {
temp = tid[j]; tid[j] = tid[k]; tid[k] = temp;
j += 1;
}
/* Traverse the task tree and add tasks with no weight. */
for ( i = left ; i < j ; i++ ) {
t = &s->tasks[ tid[i] ];
t->rank = rank;
s->tasks_ind[i] = t - s->tasks;
/* printf( "engine_ranktasks: task %i of type %s has rank %i.\n" , i ,
(t->type == task_type_self) ? "self" : (t->type == task_type_pair) ? "pair" : "sort" , rank ); */
for ( k = 0 ; k < t->nr_unlock_tasks ; k++ )
t->unlock_tasks[k]->wait -= 1;
}
/* The new left (no, not tony). */
left = j;
}
/* Release the temporary array. */
free(tid);
}
/**
* @brief Implements a barrier for the #runner threads.
*
* @param e The #engine.
*/
void engine_barrier( struct engine *e ) {
/* First, get the barrier mutex. */
if ( pthread_mutex_lock( &e->barrier_mutex ) != 0 )
error( "Failed to get barrier mutex." );
/* Wait for the barrier to close. */
while ( e->barrier_count < 0 )
if ( pthread_cond_wait( &e->barrier_cond , &e->barrier_mutex ) != 0 )
error( "Eror waiting for barrier to close." );
/* Once I'm in, increase the barrier count. */
e->barrier_count += 1;
/* If all threads are in, send a signal... */
if ( e->barrier_count == e->nr_threads )
if ( pthread_cond_broadcast( &e->barrier_cond ) != 0 )
error( "Failed to broadcast barrier full condition." );
/* Wait for barrier to be released. */
while ( e->barrier_count > 0 )
if ( pthread_cond_wait( &e->barrier_cond , &e->barrier_mutex ) != 0 )
error( "Error waiting for barrier to be released." );
/* Decrease the counter before leaving... */
e->barrier_count += 1;
/* If I'm the last one out, signal the condition again. */
if ( e->barrier_count == 0 )
if ( pthread_cond_broadcast( &e->barrier_cond ) != 0 )
error( "Failed to broadcast empty barrier condition." );
/* Last but not least, release the mutex. */
if ( pthread_mutex_unlock( &e->barrier_mutex ) != 0 )
error( "Failed to get unlock the barrier mutex." );
}
/**
* @brief Let the #engine loose to compute the forces.
*
* @param e The #engine.
* @param sort_queues Flag to try to sort the queues topologically.
*/
void engine_run ( struct engine *e , int sort_queues ) {
int j, k;
struct space *s = e->s;
/* Run throught the tasks and get all the waits right. */
for ( k = 0 ; k < s->nr_tasks ; k++ ) {
s->tasks[k].done = 0;
for ( j = 0 ; j < s->tasks[k].nr_unlock_tasks ; j++ )
s->tasks[k].unlock_tasks[j]->wait += 1;
for ( j = 0 ; j < s->tasks[k].nr_unlock_cells ; j++ )
s->tasks[k].unlock_cells[j]->wait += 1;
}
/* Re-set the queues.*/
if ( sort_queues ) {
#pragma omp parallel for default(none), shared(e)
for ( k = 0 ; k < e->nr_queues ; k++ ) {
queue_sort( &e->queues[k] );
e->queues[k].next = 0;
}
}
else
for ( k = 0 ; k < e->nr_queues ; k++ )
e->queues[k].next = 0;
/* Cry havoc and let loose the dogs of war. */
e->barrier_count = -e->barrier_count;
if ( pthread_cond_broadcast( &e->barrier_cond ) != 0 )
error( "Failed to broadcast barrier open condition." );
/* Sit back and wait for the runners to come home. */
while ( e->barrier_count < e->nr_threads )
if ( pthread_cond_wait( &e->barrier_cond , &e->barrier_mutex ) != 0 )
error( "Error while waiting for barrier." );
}
/**
* @brief init an engine with the given number of threads, queues, and
* the given policy.
*
* @param e The #engine.
* @param s The #space in which this #runner will run.
* @param nr_threads The number of threads to spawn.
* @param nr_queues The number of task queues to create.
* @param policy The queueing policy to use.
*/
void engine_init ( struct engine *e , struct space *s , int nr_threads , int nr_queues , int policy ) {
#if defined(HAVE_SETAFFINITY)
cpu_set_t cpuset;
#endif
int k, qid, nrq;
/* Store the values. */
e->s = s;
e->nr_threads = nr_threads;
e->nr_queues = nr_queues;
e->policy = policy;
/* First of all, init the barrier and lock it. */
if ( pthread_mutex_init( &e->barrier_mutex , NULL ) != 0 )
error( "Failed to initialize barrier mutex." );
if ( pthread_cond_init( &e->barrier_cond , NULL ) != 0 )
error( "Failed to initialize barrier condition variable." );
if ( pthread_mutex_lock( &e->barrier_mutex ) != 0 )
error( "Failed to lock barrier mutex." );
e->barrier_count = 0;
/* Allocate the queues. */
if ( posix_memalign( (void *)(&e->queues) , 64 , nr_queues * sizeof(struct queue) ) != 0 )
error( "Failed to allocate queues." );
bzero( e->queues , nr_queues * sizeof(struct queue) );
/* Init the queues. */
for ( k = 0 ; k < nr_queues ; k++ )
queue_init( &e->queues[k] , s->nr_tasks , s->tasks );
/* Rank the tasks in topological order. */
engine_ranktasks( e );
/* How many queues to fill initially? */
for ( nrq = 0 , k = nr_queues ; k > 0 ; k = k / 2 )
nrq += 1;
/* Fill the queues (round-robin). */
for ( k = 0 ; k < s->nr_tasks ; k++ ) {
if ( s->tasks[ s->tasks_ind[k] ].type == task_type_none )
continue;
// qid = 0;
// qid = k % nrq;
qid = k % nr_queues;
e->queues[qid].tid[ e->queues[qid].count ] = s->tasks_ind[k];
e->queues[qid].count += 1;
}
/* Sort the queues topologically. */
for ( k = 0 ; k < nr_queues ; k++ )
queue_sort( &e->queues[k] );
/* Allocate and init the threads. */
if ( ( e->runners = (struct runner *)malloc( sizeof(struct runner) * nr_threads ) ) == NULL )
error( "Failed to allocate threads array." );
for ( k = 0 ; k < nr_threads ; k++ ) {
e->runners[k].id = k;
e->runners[k].e = e;
if ( pthread_create( &e->runners[k].thread , NULL , &runner_main , &e->runners[k] ) != 0 )
error( "Failed to create runner thread." );
#if defined(HAVE_SETAFFINITY)
/* Set the cpu mask to zero | e->id. */
CPU_ZERO( &cpuset );
CPU_SET( e->runners[k].id , &cpuset );
/* Apply this mask to the runner's pthread. */
if ( pthread_setaffinity_np( e->runners[k].thread , sizeof(cpu_set_t) , &cpuset ) != 0 )
error( "Failed to set thread affinity." );
#endif
}
/* Wait for the runner threads to be in place. */
while ( e->barrier_count != e->nr_threads )
if ( pthread_cond_wait( &e->barrier_cond , &e->barrier_mutex ) != 0 )
error( "Error while waiting for runner threads to get in place." );
}
/*******************************************************************************
* This file is part of GadgetSMP.
* Coypright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/* Some constants. */
#define engine_policy_none 0
#define engine_policy_rand 1
#define engine_policy_steal 2
#define engine_policy_keep 4
#define engine_policy_block 8
#define engine_queue_scale 1.2
/* Data structure for the engine. */
struct engine {
/* Number of threads on which to run. */
int nr_threads;
/* The space with which the runner is associated. */
struct space *s;
/* The runner's threads. */
struct runner *runners;
/* The running policy. */
int policy;
/* The number of queues. */
int nr_queues;
/* The queues. */
struct queue *queues;
/* Data for the threads' barrier. */
pthread_mutex_t barrier_mutex;
pthread_cond_t barrier_cond;
int barrier_count;
};
/* Function prototypes. */
void engine_barrier( struct engine *e );
void engine_init ( struct engine *e , struct space *s , int nr_threads , int nr_queues , int policy );
void engine_ranktasks ( struct engine *e );
void engine_run ( struct engine *e , int sort_queues );
......@@ -30,5 +30,6 @@
#include "space.h"
#include "queue.h"
#include "runner_iact.h"
#include "engine.h"
#include "runner.h"
......@@ -40,6 +40,7 @@
#include "cell.h"
#include "space.h"
#include "queue.h"
#include "engine.h"
#include "runner.h"
#include "runner_iact.h"
......@@ -85,63 +86,6 @@ const char runner_flip[27] = { 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1
#include "runner_doiact.h"
/**
* @brief Sort the tasks in topological order over all queues.
*
* @param r The #runner.
*/
void runner_ranktasks ( struct runner *r ) {
int i, j = 0, k, temp, left = 0, rank;
struct task *t;
struct space *s = r->s;
int *tid;
/* Run throught the tasks and get all the waits right. */
for ( k = 0 ; k < s->nr_tasks ; k++ ) {
for ( j = 0 ; j < s->tasks[k].nr_unlock_tasks ; j++ )
s->tasks[k].unlock_tasks[j]->wait += 1;
}
/* Allocate and init the task-ID array. */
if ( ( tid = (int *)malloc( sizeof(int) * s->nr_tasks ) ) == NULL )
error( "Failed to allocate temporary tid array." );
for ( k = 0 ; k < s->nr_tasks ; k++ )
tid[k] = k;
/* Main loop. */
for ( rank = 0 ; left < s->nr_tasks ; rank++ ) {
/* Load the tids of tasks with no waits. */
for ( k = left ; k < s->nr_tasks ; k++ )
if ( s->tasks[ tid[k] ].wait == 0 ) {
temp = tid[j]; tid[j] = tid[k]; tid[k] = temp;
j += 1;
}
/* Traverse the task tree and add tasks with no weight. */
for ( i = left ; i < j ; i++ ) {
t = &s->tasks[ tid[i] ];
t->rank = rank;
s->tasks_ind[i] = t - s->tasks;
/* printf( "runner_ranktasks: task %i of type %s has rank %i.\n" , i ,
(t->type == task_type_self) ? "self" : (t->type == task_type_pair) ? "pair" : "sort" , rank ); */
for ( k = 0 ; k < t->nr_unlock_tasks ; k++ )
t->unlock_tasks[k]->wait -= 1;
}
/* The new left (no, not tony). */
left = j;
}
/* Release the temporary array. */
free(tid);
}
/**
* @brief Sort the entries in ascending order using QuickSort.
*
......@@ -282,7 +226,7 @@ inline void merge_backward ( struct entry *__restrict__ one , int none , struct
* @param c The #cell.
*/
void runner_dosort ( struct runner_thread *rt , struct cell *c , int flags ) {
void runner_dosort ( struct runner *r , struct cell *c , int flags ) {
struct entry *finger;
struct entry *fingers[8];
......@@ -478,7 +422,7 @@ void runner_dosort ( struct runner_thread *rt , struct cell *c , int flags ) {
#ifdef TIMER_VERBOSE
printf( "runner_dosort[%02i]: %i parts at depth %i (flags = %i%i%i%i%i%i%i%i%i%i%i%i%i) took %.3f ms.\n" ,
rt->id , c->count , c->depth ,
r->id , c->count , c->depth ,
(flags & 0x1000) >> 12 , (flags & 0x800) >> 11 , (flags & 0x400) >> 10 , (flags & 0x200) >> 9 , (flags & 0x100) >> 8 , (flags & 0x80) >> 7 , (flags & 0x40) >> 6 , (flags & 0x20) >> 5 , (flags & 0x10) >> 4 , (flags & 0x8) >> 3 , (flags & 0x4) >> 2 , (flags & 0x2) >> 1 , (flags & 0x1) >> 0 ,
((double)TIMER_TOC(runner_timer_dosort)) / CPU_TPS * 1000 ); fflush(stdout);
#else
......@@ -491,20 +435,16 @@ void runner_dosort ( struct runner_thread *rt , struct cell *c , int flags ) {
/**
* @brief Intermediate task between density and force
*
* @param rt The runner thread.
* @param r The runner thread.
* @param ci THe cell.
*/
void runner_doghost ( struct runner_thread *r , struct cell *c ) {
void runner_doghost ( struct runner *r , struct cell *c ) {
struct part *p;
int i, k;
TIMER_TIC
/* If this cell has progeny, don't bother. */
if ( c->split )
return;
/* Loop over the parts in this cell. */
for ( i = 0 ; i < c->count ; i++ ) {
......@@ -529,7 +469,7 @@ void runner_doghost ( struct runner_thread *r , struct cell *c ) {
#ifdef TIMER_VERBOSE
printf( "runner_doghost[%02i]: %i parts at depth %i took %.3f ms.\n" ,
rt->id , c->count , c->depth ,
r->id , c->count , c->depth ,
((double)TIMER_TOC(runner_timer_doghost)) / CPU_TPS * 1000 ); fflush(stdout);
#else
TIMER_TOC(runner_timer_doghost);
......@@ -538,50 +478,6 @@ void runner_doghost ( struct runner_thread *r , struct cell *c ) {
}
/**
* @brief Implements a barrier for the #runner threads.
*
*/
void runner_barrier( struct runner *r ) {
/* First, get the barrier mutex. */
if ( pthread_mutex_lock( &r->barrier_mutex ) != 0 )
error( "Failed to get barrier mutex." );
/* Wait for the barrier to close. */
while ( r->barrier_count < 0 )
if ( pthread_cond_wait( &r->barrier_cond , &r->barrier_mutex ) != 0 )
error( "Eror waiting for barrier to close." );
/* Once I'm in, increase the barrier count. */
r->barrier_count += 1;
/* If all threads are in, send a signal... */
if ( r->barrier_count == r->nr_threads )
if ( pthread_cond_broadcast( &r->barrier_cond ) != 0 )
error( "Failed to broadcast barrier full condition." );
/* Wait for barrier to be released. */
while ( r->barrier_count > 0 )
if ( pthread_cond_wait( &r->barrier_cond , &r->barrier_mutex ) != 0 )
error( "Error waiting for barrier to be released." );
/* Decrease the counter before leaving... */
r->barrier_count += 1;
/* If I'm the last one out, signal the condition again. */
if ( r->barrier_count == 0 )
if ( pthread_cond_broadcast( &r->barrier_cond ) != 0 )
error( "Failed to broadcast empty barrier condition." );
/* Last but not least, release the mutex. */
if ( pthread_mutex_unlock( &r->barrier_mutex ) != 0 )
error( "Failed to get unlock the barrier mutex." );
}
/**
* @brief The #runner main thread routine.
*
......@@ -590,14 +486,14 @@ void runner_barrier( struct runner *r ) {
void *runner_main ( void *data ) {
struct runner_thread *rt = (struct runner_thread *)data;
struct runner *r = rt->r;
int threadID = rt->id;
struct runner *r = (struct runner *)data;
struct engine *e = r->e;
int threadID = r->id;
int k, qid, naq, keep, tpq;
struct queue *queues[ r->nr_queues ], *myq;
struct queue *queues[ e->nr_queues ], *myq;
struct task *t;
struct cell *ci, *cj;
unsigned int myseed = rand() + rt->id;
unsigned int myseed = rand() + r->id;
#ifdef TIMER
ticks stalled;
#endif
......@@ -606,23 +502,23 @@ void *runner_main ( void *data ) {
while ( 1 ) {
/* Wait at the barrier. */
runner_barrier( r );
engine_barrier( e );
/* Set some convenient local data. */
keep = r->policy & runner_policy_keep;
myq = &r->queues[ threadID % r->nr_queues ];
tpq = ceil( ((double)r->nr_threads) / r->nr_queues );
keep = e->policy & engine_policy_keep;
myq = &e->queues[ threadID % e->nr_queues ];
tpq = ceil( ((double)e->nr_threads) / e->nr_queues );
stalled = 0;
/* Set up the local list of active queues. */
naq = r->nr_queues;
naq = e->nr_queues;
for ( k = 0 ; k < naq ; k++ )
queues[k] = &r->queues[k];
queues[k] = &e->queues[k];
/* Set up the local list of active queues. */
naq = r->nr_queues;
naq = e->nr_queues;
for ( k = 0 ; k < naq ; k++ )
queues[k] = &r->queues[k];
queues[k] = &e->queues[k];
/* Loop while there are tasks... */
while ( 1 ) {
......@@ -640,32 +536,32 @@ void *runner_main ( void *data ) {
/* Get a task, how and from where depends on the policy. */
TIMER_TIC
t = NULL;
if ( r->nr_queues == 1 ) {
t = queue_gettask( &r->queues[0] , 1 , 0 );
if ( e->nr_queues == 1 ) {
t = queue_gettask( &e->queues[0] , 1 , 0 );
}
else if ( r->policy & runner_policy_steal ) {
else if ( e->policy & engine_policy_steal ) {
if ( ( myq->next == myq->count ) ||
( t = queue_gettask_new( myq , rt->id , 0 , 0 ) ) == NULL ) {
( t = queue_gettask_new( myq , r->id , 0 , 0 ) ) == NULL ) {
TIMER_TIC2
qid = rand_r( &myseed ) % naq;
keep = ( r->policy & runner_policy_keep ) &&
keep = ( e->policy & engine_policy_keep ) &&
( myq->count <= myq->size-tpq );
if ( myq->next == myq->count )
COUNT(runner_counter_steal_empty);
else
COUNT(runner_counter_steal_stall);
t = queue_gettask_new( queues[qid] , rt->id , 0 , keep );