diff --git a/configure.ac b/configure.ac index f92416c97878843743cec83e3fafd88ce6a19d2b..a5323152c323f56ee104835e34b54a613cd640be 100644 --- a/configure.ac +++ b/configure.ac @@ -270,6 +270,10 @@ if test "$enable_warn" != "no"; then fi fi +# Check for libnuma. +AC_CHECK_LIB([numa], [numa_available]) + + # Check for git, needed for revision stamps. AC_PATH_PROG([GIT_CMD], [git]) AC_SUBST([GIT_CMD]) diff --git a/examples/Makefile.am b/examples/Makefile.am index df37d2cd20cc62ce9afb52dad7675d950e5d62ef..2054d83875d03916ef0245f0a86de63815da2dca 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -29,9 +29,9 @@ MPI_THREAD_LIBS = @MPI_THREAD_LIBS@ MPI_LIBS = $(METIS_LIBS) $(MPI_THREAD_LIBS) # Set-up the library -bin_PROGRAMS = test test_bh test_bh_sorted test_fmm_sorted test_bh_mpi test_qr_mpi +bin_PROGRAMS = test test_bh test_bh_sorted test_fmm_sorted test_bh_mpi test_qr_mpi test_qr if HAVECBLAS -bin_PROGRAMS += test_qr test_qr_mpi_cblas +bin_PROGRAMS += test_qr_mpi_cblas endif # Sources for test @@ -42,7 +42,8 @@ test_LDADD = ../src/.libs/libquicksched.a # Sources for test_qr test_qr_SOURCES = test_qr.c test_qr_CFLAGS = $(AM_CFLAGS) -test_qr_LDADD = ../src/.libs/libquicksched.a -llapacke -llapacke -lblas -lcblas +test_qr_LDADD = ../src/.libs/libquicksched.a +#-llapacke -llapacke -lblas -lcblas # Sources for test_bh test_bh_SOURCES = test_bh.c diff --git a/examples/test_qr_mpi.c b/examples/test_qr_mpi.c index f23dff3347cbabb4190a7064717e6749c4824b5e..ca83d75c14013be19488347ab050365855f2033a 100644 --- a/examples/test_qr_mpi.c +++ b/examples/test_qr_mpi.c @@ -36,12 +36,12 @@ #include <cblas.h> #endif +#define NO_TASK_TIMERS /* Local includes. */ #include "quicksched.h" #include "res.h" -#define NO_TASK_TIMERS #ifdef WITH_CBLAS_LIB /** @@ -472,27 +472,10 @@ void test_qr(int m, int n, int K, int nr_threads, int runs, double* matrix) { qsched_res_t *rid = NULL, *tau_id = NULL; double **rids = NULL, **taus = NULL; int data[3]; -#ifdef TASK_TIMERS - long long int MPI_data[7]; -#else long long int MPI_data[6]; -#endif // ticks tic, toc_run, tot_setup, tot_run = 0; -#ifdef TASK_TIMERS - ticks tic, toc_run; - ticks start; -#endif char processor_name[MPI_MAX_PROCESSOR_NAME]; int name_len; - #ifdef TASK_TIMERS - long long int *task_start; - long long int *task_finish; - int *task_tids; - int *task_types; - int *task_i; - int *task_j; - int *task_k; - #endif // Initialize the MPI environment int MpiThreadLevel; @@ -514,16 +497,10 @@ if(MpiThreadLevel != MPI_THREAD_MULTIPLE) printf("Task_DGEQRF = %i\n", task_DGEQRF); /* Runner function to pass to the scheduler. */ void runner(struct qsched *s, int type, void * data) { - #ifdef TASK_TIMERS - tic = getticks(); - #endif /* Decode the task data. */ long long int* idata = (long long int*)data; // int i = idata[0], j = idata[1], k = idata[2]; long long int i, j , a , t; -#ifdef TASK_TIMERS - long long int tid = -1; -#endif /* Need to pull the resources.*/ /* Decode and execute the task. */ @@ -534,9 +511,6 @@ if(MpiThreadLevel != MPI_THREAD_MULTIPLE) double *cornerTile = (double*)qsched_getresdata(s, i); double *tau = (double*)qsched_getresdata(s, j); DGEQRF(cornerTile, K, tau, k, m); - #ifdef TASK_TIMERS - tid = (idata[2] * m * m); - #endif break; case task_DLARFT: t = idata[2]; @@ -547,9 +521,6 @@ if(MpiThreadLevel != MPI_THREAD_MULTIPLE) tau = (double*)qsched_getresdata(s, t); DLARFT(cornerTile, lockedTile, K, j, k, tau, m); - #ifdef TASK_TIMERS - tid = (idata[3] * m * m) + (idata[4] * m); - #endif break; case task_DTSQRF: i = idata[0]; @@ -560,9 +531,6 @@ if(MpiThreadLevel != MPI_THREAD_MULTIPLE) tau = (double*)qsched_getresdata(s, t); DTSQRF(cornerTile, lockedTile, K, i, k, tau, m); - #ifdef TASK_TIMERS - tid = (idata[3] * m * m) + (idata[3] * m) + idata[4]; - #endif break; case task_DSSRFT: a = idata[2]; @@ -575,68 +543,14 @@ if(MpiThreadLevel != MPI_THREAD_MULTIPLE) tau = (double*)qsched_getresdata(s, t); DSSRFT(lockedTile1, usedTile, lockedTile2, K, i, j, k, tau, m); - #ifdef TASK_TIMERS - tid = (idata[4] * m * m) + (idata[6] * m) + (idata[5]); - #endif break; // default: // error("Unknown task type."); } - #ifdef TASK_TIMERS - if(type > 0){ - - /*if(kk == 0) - { - if(type == task_DGEQRF) - printf("Ran DGEQRF with k = %lli\n", kk); - else if (type == task_DLARFT) - printf("Ran DLARFT with k = %lli, j = %lli, i = %lli\n", kk ,jj ,ii); - else if (type == task_DTSQRF) - printf("Ran DTSQRF with k = %lli, j = %lli, i = %lli\n", kk, jj, ii); - else if (type == task_DSSRFT) - printf("Ran DSSRFT with k = %lli, j = %lli, i = %lli\n", kk, jj, ii); - }*/ - toc_run = getticks(); -// message("tid = %lli", tid); -// message("sizeof arrays = %i", m*n*m*n); - task_start[tid] = tic - start; - task_finish[tid] = toc_run - start; - task_tids[tid] = omp_get_thread_num(); - task_types[tid] = type; - if(type == task_DGEQRF){ - task_i[tid] = (int)idata[2]; - task_j[tid] = (int)idata[2]; - task_k[tid] = (int)idata[2]; - }else if (type == task_DLARFT) - { - task_i[tid] = (int)idata[3]; - task_j[tid] = (int)idata[4]; - task_k[tid] = (int)idata[3]; - }else if(type == task_DTSQRF) - { - task_i[tid] = (int)idata[4]; - task_j[tid] = (int)idata[3]; - task_k[tid] = (int)idata[3]; - }else{ - task_i[tid] = (int)idata[5]; - task_j[tid] = (int)idata[6]; - task_k[tid] = (int)idata[4]; - } - } - #endif } qsched_init(&s, nr_threads, qsched_flag_yield | qsched_flag_pthread, MPI_COMM_WORLD); -#ifdef TASK_TIMERS -task_start = (long long int*)calloc(sizeof(long long int), m*n*m*n); -task_finish = (long long int*)calloc(sizeof(long long int), m*n*m*n); -task_tids = (int*)calloc(sizeof(int), m*n*m*n); -task_types = (int*)calloc(sizeof(int), m*n*m*n); -task_i = (int*)calloc(sizeof(int), m*n*m*n); -task_j = (int*)calloc(sizeof(int), m*n*m*n); -task_k = (int*)calloc(sizeof(int), m*n*m*n); -#endif srand(6); /* Allocate and fill the original matrix. */ @@ -700,14 +614,8 @@ if(s.rank == 0) { data[2] = k; MPI_data[0] = rid[k*m+k]; MPI_data[1] = tau_id[k*m+k]; - #ifdef TASK_TIMERS - MPI_data[2] = k; - tid_new = qsched_addtask(&s, task_DGEQRF, task_flag_none, MPI_data, - sizeof(long long int) * 3, 200); - #else tid_new = qsched_addtask(&s, task_DGEQRF, task_flag_none, MPI_data, sizeof(long long int) * 2, 200); - #endif qsched_addlock(&s, tid_new, rid[k * m + k]); qsched_addlock(&s, tid_new, tau_id[k*m+k]); if(k == 0) @@ -725,15 +633,8 @@ if(s.rank == 0) { MPI_data[0] = rid[j*m+k]; MPI_data[1] = rid[k*m+k]; MPI_data[2] = tau_id[k*m+k]; - #ifdef TASK_TIMERS - MPI_data[3] = k; - MPI_data[4] = j; - tid_new = qsched_addtask(&s, task_DLARFT, task_flag_none, MPI_data, - sizeof(long long int) * 5, 300); - #else tid_new = qsched_addtask(&s, task_DLARFT, task_flag_none, MPI_data, sizeof(long long int) * 3, 300); - #endif if(k == 0) { memcpy(rids[j*m+k], &A_orig[(data[1]*m+data[0])*K*K], sizeof(double)*K*K); @@ -756,15 +657,8 @@ if(s.rank == 0) { MPI_data[0] = rid[k*m+i]; MPI_data[1] = rid[k*m+k]; MPI_data[2] = tau_id[k*m+i]; - #ifdef TASK_TIMERS - MPI_data[3] = k; - MPI_data[4] = i; - tid_new = qsched_addtask(&s, task_DTSQRF, task_flag_none, MPI_data, - sizeof(long long int) * 5, 300); - #else tid_new = qsched_addtask(&s, task_DTSQRF, task_flag_none, MPI_data, sizeof(long long int) * 3, 300); - #endif if(k == 0) { memcpy(rids[k*m+i], &A_orig[(data[1]*m+data[0])*K*K], sizeof(double)*K*K); @@ -784,16 +678,8 @@ if(s.rank == 0) { MPI_data[1] = rid[k*m+i]; MPI_data[2] = rid[j*m+k]; MPI_data[3] = tau_id[k*m+i]; - #ifdef TASK_TIMERS - MPI_data[4] = k; - MPI_data[5] = i; - MPI_data[6] = j; - tid_new = qsched_addtask(&s, task_DSSRFT, task_flag_none, MPI_data, - sizeof(long long int) * 7, 500); - #else tid_new = qsched_addtask(&s, task_DSSRFT, task_flag_none, MPI_data, sizeof(long long int) * 4, 500); - #endif if(k == 0) { memcpy(rids[j*m+i], &A_orig[(data[1]*m+data[0])*K*K], sizeof(double)*K*K); @@ -813,10 +699,6 @@ if(s.rank == 0) { } /* build the tasks. */ } -#ifdef TASK_TIMERS - MPI_Barrier(s.comm); - start = getticks(); -#endif qsched_run_MPI(&s, nr_threads, runner); // Print off a hello world message printf("Hello world from processor %s, rank = %i, count_ranks = %i\n", @@ -824,28 +706,30 @@ qsched_run_MPI(&s, nr_threads, runner); #ifdef TASK_TIMERS -FILE *file = NULL; - if(s.rank == 0) - file = fopen("Task_timers0.out", "w"); - else if (s.rank == 1) - file = fopen("Task_timers1.out", "w"); - else if (s.rank == 2) - file = fopen("Task_timers2.out", "w"); - else if (s.rank == 3) - file = fopen("Task_timers3.out", "w"); - - for(i = 0; i < m*n*m*n; i++) +//Each rank wants to loop through the tasks they executed and output the data, then synchronize. + FILE *file; +if(s.rank == 0) +{ + file = fopen("task_timers.tks", "w"); + fclose(file); +} +for(i = 0; i < s.count_ranks; i++) +{ + if(i == s.rank) { - if(task_types[i] > 0) - fprintf(file, "%i %i %lli %lli %i %i %i\n", task_types[i], s.rank*nr_threads+task_tids[i],task_start[i], task_finish[i] ,task_i[i], task_j[i], task_k[i] ); + file = fopen("task_timers.tks", "a"); + for(j = 0; j < s.task_ranks[s.count_ranks]; j++) + { + if(s.tasks[j].node_executed == s.rank && s.tasks[j].type != -101) + { + struct task *t = &s.tasks[j]; + fprintf(file, "%lli %i %llu %llu %i %i\n", t->id, t->type, t->task_start, t->task_finish, t->node_executed, t->thread_executed); + } + } + fclose(file); } - - - fclose(file); - free(task_types); - free(task_tids); - free(task_start); - free(task_finish); + MPI_Barrier(s.comm); +} #endif //TODO Copy all the resources back into A, synchronize them across the nodes and check correctness. diff --git a/src/qsched.c b/src/qsched.c index 09ce4b7b3d1be353d3444f66c8c898db1596d87a..e41df559a414329798031b09b47447585bc8706a 100644 --- a/src/qsched.c +++ b/src/qsched.c @@ -40,7 +40,12 @@ #include <pthread.h> #endif -#define NO_TASK_TIMERS +#ifdef HAVE_LIBNUMA +#include <unistd.h> + #include <numa.h> +#endif + +//#define NO_TASK_TIMERS /* Local includes. */ #include "cycle.h" @@ -3600,6 +3605,9 @@ void *qsched_pthread_run ( void *in ) { #endif /* Mark that task as done. */ qsched_done( s , t ); +#ifdef TASK_TIMERS + t->thread_executed = tid; +#endif } /* loop as long as there are tasks. */ @@ -3623,13 +3631,54 @@ void qsched_run_MPI ( struct qsched *s, int nr_threads, qsched_funtype fun ) { printf("Rank[%i]: qsched_prepare_mpi took %lli (= %e) ticks\n", s->rank, toc - tic, (float)(toc - tic)); - MPI_Barrier(s->comm); - #ifdef TASK_TIMERS - s->start = getticks(); - #endif - message("Beginning execution of quicksched"); - tic = getticks(); #if defined( HAVE_PTHREAD ) + +#if defined( HAVE_LIBNUMA) + int nr_cores = sysconf(_SC_NPROCESSORS_ONLN); + int i, j, cpuid[nr_cores]; + cpu_set_t cpuset; + /* Get next highest power of 2. */ + int maxint = 1; + while (maxint < nr_cores) maxint *= 2; + + cpuid[0] = 0; + int k = 1; + for (i = 1; i < maxint; i *= 2) + for (j = maxint / i / 2; j < maxint; j += maxint / i) + if (j < nr_cores && j != 0) cpuid[k++] = j; + + + if(numa_available() >= 0){ + /* Ascending NUMA distance. Use Angus' Bubblesort implementation. */ + int home = numa_node_of_cpu(sched_getcpu()), half = nr_cores / 2; + int done = 0; + while(!done) { + done = 1; + for(i = 1; i < nr_cores ; i++) + { + int node_a = numa_node_of_cpu(cpuid[i-1]); + int node_b = numa_node_of_cpu(cpuid[i]); + + + /* Try to avoid hyperthreads... */ + int thread_a = cpuid[i-1] >= half; + int thread_b = cpuid[i] >= half; + + int swap = thread_a > thread_b; + + if(swap) + { + int t = cpuid[i-1]; + cpuid[i-1] = cpuid[i]; + cpuid[i] = t; + done = 0; + } + } + } + }else{ + error("Somehow numa isn't available despite configure saying it should be."); + } +#endif /* Make sure we have enough threads. */ if ( nr_threads > s->runners_count ) { @@ -3654,10 +3703,22 @@ printf("Rank[%i]: qsched_prepare_mpi took %lli (= %e) ticks\n", s->rank, s->runners[tid].s = s; if ( pthread_create( &s->runners[tid].thread , NULL , qsched_pthread_run , (void *)&s->runners[tid] ) != 0 ) error( "Failed to create pthread." ); + #if defined( HAVE_LIBNUMA) + CPU_ZERO(&cpuset); + CPU_SET(cpuid[tid], &cpuset); + if ( pthread_setaffinity_np(s->runners[tid].thread, sizeof(cpu_set_t), &cpuset ) != 0){ + error("Failed to set thread affinity."); + } + #endif s->runners_count += 1; } } - + MPI_Barrier(s->comm); + #ifdef TASK_TIMERS + s->start = getticks(); + #endif + message("Beginning execution of quicksched"); + tic = getticks(); /* Set the runner function. */ s->fun = fun; message("Launching %i pthreads", s->runners_count); @@ -3674,6 +3735,12 @@ toc = getticks(); #else #if defined( HAVE_OPENMP ) +MPI_Barrier(s->comm); + #ifdef TASK_TIMERS + s->start = getticks(); + #endif + message("Beginning execution of quicksched"); + tic = getticks(); /* Parallel loop. */ #pragma omp parallel num_threads( nr_threads ) { @@ -3684,22 +3751,16 @@ toc = getticks(); /* Loop as long as there are tasks. */ while ( ( t = qsched_gettask( s , qid ) ) != NULL ) { - - #ifdef TASK_TIMERS - t->task_start = getticks() - s->start; - #endif + /* Call the user-supplied function on the task with its data. */ fun( s, t->type , &s->data[ t->data ] ); /* Mark that task as done. */ qsched_done( s , t ); - - #ifdef TASK_TIMERS - t->task_finish = getticks() - s->start; - t->node_executed = s->rank; - t->thread_executed = omp_get_thread_num(); - #endif } /* loop as long as there are tasks. */ +#ifdef TASK_TIMERS + t->thread_executed = omp_get_thread_num(); +#endif } /* parallel loop. */ @@ -4015,9 +4076,9 @@ void qsched_enqueue ( struct qsched *s , struct task *t ) { } /* If its a send or recv place it in queue 0. */ - /*TODO else if (t->type == task_type_send || t->type == task_type_recv){ +/* else if (t->type == task_type_send || t->type == task_type_recv){ #ifdef WITH_MPI - queue_put(&s->queues[qid], s, t->id); + queue_put(&s->queues[0], s, t->id); #else error("Task of type send/recv in non-MPI process - please don't use these reserved types."); #endif @@ -4076,6 +4137,13 @@ void qsched_done ( struct qsched *s , struct task *t ) { if (!(s->flags & qsched_flag_norecost)) t->cost = t->toc - t->tic; +#ifdef TASK_TIMERS + t->task_finish = getticks() - s->start; + t->node_executed = s->rank; + #ifndef HAVE_PTHREAD + t->thread_executed = omp_get_thread_num(); + #endif +#endif if(!(t->type == task_type_send || t->type == task_type_recv)){ /* Release this task's locks. */ for ( k = 0 ; k < t->nr_locks ; k++ ) @@ -4211,6 +4279,10 @@ int qsched_locktask ( struct qsched *s , long long int tid ) { /* Get a pointer on the task. */ t = &s->tasks[gettaskindex(tid,s)]; + +#ifdef TASK_TIMERS + t->task_start = getticks() - s->start; +#endif /* Communication task? */ if (t->type == task_type_recv || t->type == task_type_send) {