From b179d6e2c20e7278d4971f7dffd1e1dc00b8e08f Mon Sep 17 00:00:00 2001
From: Aidan Chalk <d74ksy@cosma-e.cosma>
Date: Thu, 19 Nov 2015 16:18:33 +0000
Subject: [PATCH] Added all the changes to the setup made to improve scaling
 and use pthreads

---
 examples/test_qr.c     |  23 ++--
 examples/test_qr_mpi.c |   2 +-
 src/qsched.c           | 295 ++++++++++++++++++++++++-----------------
 src/queue.c            |  33 -----
 4 files changed, 188 insertions(+), 165 deletions(-)

diff --git a/examples/test_qr.c b/examples/test_qr.c
index c52db67..5c3a949 100644
--- a/examples/test_qr.c
+++ b/examples/test_qr.c
@@ -29,8 +29,8 @@
 #include <omp.h>
 #include <pthread.h>
 
-#include <cblas.h>
-#include <lapacke.h>
+//#include <cblas.h>
+//#include <lapacke.h>
 
 /* Local includes. */
 #include "quicksched.h"
@@ -41,6 +41,7 @@
  * This function is simply for validation and is implemented naively as we know
  * of no implementation to retrieve Q from the tiled QR.
  */
+#ifdef NO_COMPILE
 double* computeQ(double* HR, int size, int tilesize, double* tau, int tauNum) {
   double* Q = malloc(sizeof(double) * size * size);
   double* Qtemp = malloc(sizeof(double) * size * size);
@@ -186,7 +187,7 @@ double* tileToColumn(double* tileMatrix, int size, int m, int n, int tilesize) {
   }
   return ColumnMatrix;
 }
-
+#endif
 /* Routines for the tiled QR decomposition.*/
 
 /**
@@ -514,13 +515,13 @@ void test_qr(int m, int n, int K, int nr_threads, int runs, double* matrix) {
   bzero(tau, sizeof(double) * m * n * K);
 
   /* Dump A_orig. */
-  message( "A_orig = [" );
+/*  message( "A_orig = [" );
   for ( k = 0 ; k < m*K ; k++ ) {
       for ( j = 0 ; j < n*K ; j++ )
           printf( "%.3f " , A_orig[ j*m*K + k ] );
       printf( "\n" );
       }
-  printf( "];\n" );
+  printf( "];\n" );*/
 
   /* Initialize the scheduler. */
   qsched_init(&s, nr_threads, qsched_flag_none);
@@ -613,26 +614,28 @@ void test_qr(int m, int n, int K, int nr_threads, int runs, double* matrix) {
     qsched_run(&s, nr_threads, runner);
     toc_run = getticks();
     message("%ith run took %lli ticks...", k, toc_run - tic);
+    message("%ith run took %.3f ms...", k, (toc_run - tic)/2.67e6);
     tot_run += toc_run - tic;
   }
 
   /* Dump A. */
-  message( "A = [" );
+ /* message( "A = [" );
   for ( k = 0 ; k < m*K ; k++ ) {
       for ( j = 0 ; j < n*K ; j++ )
           printf( "%.3f " , A[ j*m*K + k ] );
       printf( "\n" );
       }
-  printf( "];\n" ); 
+  printf( "];\n" ); */
 
   /* Dump tau. */
+/*
    message( "tau = [" );
   for ( k = 0 ; k < m*K ; k++ ) {
       for ( j = 0 ; j < n ; j++ )
           printf( "%.3f " , tau[ j*m*K + k ] );
       printf( "\n" );
       }
-  printf( "];\n" ); 
+  printf( "];\n" ); */
 
   /* Dump the tasks. */
   /* for ( k = 0 ; k < s.count ; k++ ) {
@@ -655,7 +658,7 @@ void test_qr(int m, int n, int K, int nr_threads, int runs, double* matrix) {
   }
 
   /* Test if the decomposition was correct.*/
-  double *tempMatrix = tileToColumn(A, m*n*K*K, m, n, K);
+  /*double *tempMatrix = tileToColumn(A, m*n*K*K, m, n, K);
    double *Q = computeQ(tempMatrix, m*K, K, tau, m);
    double *R = getR(tempMatrix, m*K);
    cblas_dgemm(CblasColMajor, CblasNoTrans, CblasNoTrans, m*K, m*K, m*K, 1.0, Q,
@@ -671,7 +674,7 @@ void test_qr(int m, int n, int K, int nr_threads, int runs, double* matrix) {
    }
    free(tempMatrix);
    free(Q);
-   free(R);
+   free(R);*/
 
   /* Clean up. */
   free(A);
diff --git a/examples/test_qr_mpi.c b/examples/test_qr_mpi.c
index 3e5b45c..f23dff3 100644
--- a/examples/test_qr_mpi.c
+++ b/examples/test_qr_mpi.c
@@ -626,7 +626,7 @@ if(MpiThreadLevel != MPI_THREAD_MULTIPLE)
     #endif
   }
 
-    qsched_init(&s, nr_threads, qsched_flag_none, MPI_COMM_WORLD);
+    qsched_init(&s, nr_threads, qsched_flag_yield | qsched_flag_pthread, MPI_COMM_WORLD);
 
 #ifdef TASK_TIMERS
 task_start = (long long int*)calloc(sizeof(long long int), m*n*m*n);
diff --git a/src/qsched.c b/src/qsched.c
index a127279..09ce4b7 100644
--- a/src/qsched.c
+++ b/src/qsched.c
@@ -3494,9 +3494,126 @@ if(tid == NULL)
 
 }
 
+/**
+ * @brief Barrier function when running with pthreads.
+ *
+ * @param s Pointer to the #qsched.
+ * @param tid ID of the calling thread. This is needed in case
+ *        we don't want to launch all threads.
+ */
+ 
+void qsched_barrier_wait ( struct qsched *s , int tid ) {
+
+#if defined( HAVE_PTHREAD )
+    /* First, get the barrier mutex. */
+    if ( pthread_mutex_lock( &s->barrier_mutex ) != 0 )
+        error( "Failed to get barrier mutex." );
+        
+    /* The callee's thread stops running. */
+    s->barrier_running -= 1;
+        
+    /* If all threads are in, send a signal... */
+    if ( s->barrier_running == 0 )
+        if ( pthread_cond_broadcast( &s->barrier_cond ) != 0 )
+            error( "Failed to broadcast barrier full condition." );
+            
+    /* Wait for the barrier to open. */
+    while ( s->barrier_count == 0 || tid >= s->barrier_launchcount )
+        if ( pthread_cond_wait( &s->barrier_cond , &s->barrier_mutex ) != 0 )
+            error( "Eror waiting for barrier to close." );
+            
+    /* This thread is leaving, decrease the barrier count, increase
+       the number of threads running. */
+    s->barrier_count -= 1;
+    s->barrier_running += 1;
+        
+    /* If I'm the last one out, signal the condition again. */
+    if ( s->barrier_count == 0 )
+        if ( pthread_cond_broadcast( &s->barrier_cond ) != 0 )
+            error( "Failed to broadcast empty barrier condition." );
+            
+    /* Last but not least, release the mutex. */
+    if ( pthread_mutex_unlock( &s->barrier_mutex ) != 0 )
+        error( "Failed to get unlock the barrier mutex." );
+#endif
+
+  }
+    
+
+/**
+ * @brief Launch a given number of threads and wait for them to finish.
+ *
+ * @param s Pointer to the #qsched.
+ * @param nr_threads Number of threads to let through the barrier.
+ *
+ * Lets the specified number of threads through the barrier. Note that
+ * this assumes that at least that many threads are waiting there,
+ * otherwise this function will hang indefinitely.
+ */
+void qsched_launch_threads ( struct qsched *s , int nr_threads ) {
+
+#if defined( HAVE_PTHREAD )
+    /* Wait for all the runners to have entered the barrier. */
+    while ( s->barrier_running )
+        if ( pthread_cond_wait( &s->barrier_cond , &s->barrier_mutex ) != 0 )
+            error( "Error while waiting for barrier." );
+
+    /* Cry havoc and let loose the dogs of war. */
+    s->barrier_count = nr_threads;
+    s->barrier_launchcount = nr_threads;
+    if ( pthread_cond_broadcast( &s->barrier_cond ) != 0 )
+        error( "Failed to broadcast barrier open condition." );
+
+    /* Lean back and wait for the runners to come home. */
+    while ( s->barrier_count || s->barrier_running )
+        if ( pthread_cond_wait( &s->barrier_cond , &s->barrier_mutex ) != 0 )
+            error( "Error while waiting for barrier." );
+#endif
+            
+    }
+    
+
+void *qsched_pthread_run ( void *in ) {
+
+  struct qsched_pthread_runner *r = (struct qsched_pthread_runner *)in;
+  struct qsched *s = r->s;
+  int tid = r->tid;
+  struct task *t;
+  
+  /* Main loop. */
+  while ( 1 ) {
+  
+    /* Wait at the barrier. */
+    qsched_barrier_wait( s , tid );
+    
+    /* If there is no function to execute, then just quit. */
+    if (s->fun == NULL) pthread_exit(NULL);
+  
+    /* Loop as long as there are tasks. */
+    while ( ( t = qsched_gettask( s , tid ) ) != NULL ) {
+
+        /* Call the user-supplied function on the task with its data. */
+        #ifndef WITH_MPI
+        s->fun( t->type , &s->data[ t->data ] );
+        #else
+        s->fun( s,  t->type , &s->data[ t->data ] );        
+        #endif
+        /* Mark that task as done. */
+        qsched_done( s , t );
+
+        } /* loop as long as there are tasks. */
+            
+    } /* main loop. */
+
+  }
+    
+    
+
 
 void qsched_run_MPI ( struct qsched *s, int nr_threads, qsched_funtype fun ) {
 #ifdef WITH_MPI
+
+
 #if defined( HAVE_MPI )
 
     ticks tic = getticks();
@@ -3512,6 +3629,50 @@ printf("Rank[%i]: qsched_prepare_mpi took %lli (= %e) ticks\n", s->rank,
     #endif
     message("Beginning execution of quicksched");
     tic = getticks();
+#if defined( HAVE_PTHREAD )
+    /* Make sure we have enough threads. */
+    if ( nr_threads > s->runners_count ) {
+    
+      /* Reallocate the threads array? */
+      if ( nr_threads > s->runners_size ) {
+        int runners_size_new = s->runners_size;
+        while ( runners_size_new < nr_threads )
+          runners_size_new *= qsched_stretch;
+        struct qsched_pthread_runner *runners_new;
+        if ( ( runners_new = malloc( sizeof(struct qsched_pthread_runner) * runners_size_new ) ) == NULL )
+          error( "Failed to allocate new thread array." );
+        memcpy( runners_new , s->runners , sizeof(pthread_t) * s->runners_count );
+        free( s->runners );
+        s->runners = runners_new;
+        s->runners_size = runners_size_new;
+      }
+      
+      /* Launch the missing threads. */
+      for ( int tid = s->runners_count ; tid < nr_threads ; tid++ ) {
+        s->barrier_running += 1;
+        s->runners[tid].tid = tid;
+        s->runners[tid].s = s;
+        if ( pthread_create( &s->runners[tid].thread , NULL , qsched_pthread_run , (void *)&s->runners[tid] ) != 0 )
+            error( "Failed to create pthread." );
+        s->runners_count += 1;
+      }
+    }
+    
+    /* Set the runner function. */
+    s->fun = fun;
+    message("Launching %i pthreads", s->runners_count);
+    /* Launch the threads. */
+    qsched_launch_threads( s , nr_threads );
+
+toc = getticks();
+
+        printf("Rank[%i]: Execution took %lli (= %e) ticks\n", s->rank,
+         toc - tic, (float)(toc - tic));
+    //TIMINGS BASED ON COSMA4!
+    message("Execution took %.3f milliseconds.", (toc-tic)/2.67e6);
+    MPI_Barrier(s->comm);
+            
+#else
     #if defined( HAVE_OPENMP )
 /* Parallel loop. */
     #pragma omp parallel num_threads( nr_threads )
@@ -3542,16 +3703,17 @@ printf("Rank[%i]: qsched_prepare_mpi took %lli (= %e) ticks\n", s->rank,
             
         } /* parallel loop. */
 
-    MPI_Barrier(s->comm);
+
     toc = getticks();
 
         printf("Rank[%i]: Execution took %lli (= %e) ticks\n", s->rank,
          toc - tic, (float)(toc - tic));
     //TIMINGS BASED ON COSMA4!
     message("Execution took %.3f milliseconds.", (toc-tic)/2.67e6);
-
+    MPI_Barrier(s->comm);
 #else
-    error( "QuickSched was not compiled with OpenMP support." );
+    error( "QuickSched was not compiled with Pthread orOpenMP support." );
+#endif
 #endif
 #else
 
@@ -3615,120 +3777,7 @@ void qsched_run_openmp ( struct qsched *s , int nr_threads , qsched_funtype fun
     }
     
     
-/**
- * @brief Barrier function when running with pthreads.
- *
- * @param s Pointer to the #qsched.
- * @param tid ID of the calling thread. This is needed in case
- *        we don't want to launch all threads.
- */
- 
-void qsched_barrier_wait ( struct qsched *s , int tid ) {
-
-#if defined( HAVE_PTHREAD )
-    /* First, get the barrier mutex. */
-    if ( pthread_mutex_lock( &s->barrier_mutex ) != 0 )
-        error( "Failed to get barrier mutex." );
-        
-    /* The callee's thread stops running. */
-    s->barrier_running -= 1;
-        
-    /* If all threads are in, send a signal... */
-    if ( s->barrier_running == 0 )
-        if ( pthread_cond_broadcast( &s->barrier_cond ) != 0 )
-            error( "Failed to broadcast barrier full condition." );
-            
-    /* Wait for the barrier to open. */
-    while ( s->barrier_count == 0 || tid >= s->barrier_launchcount )
-        if ( pthread_cond_wait( &s->barrier_cond , &s->barrier_mutex ) != 0 )
-            error( "Eror waiting for barrier to close." );
-            
-    /* This thread is leaving, decrease the barrier count, increase
-       the number of threads running. */
-    s->barrier_count -= 1;
-    s->barrier_running += 1;
-        
-    /* If I'm the last one out, signal the condition again. */
-    if ( s->barrier_count == 0 )
-        if ( pthread_cond_broadcast( &s->barrier_cond ) != 0 )
-            error( "Failed to broadcast empty barrier condition." );
-            
-    /* Last but not least, release the mutex. */
-    if ( pthread_mutex_unlock( &s->barrier_mutex ) != 0 )
-        error( "Failed to get unlock the barrier mutex." );
-#endif
-
-  }
-    
-
-/**
- * @brief Launch a given number of threads and wait for them to finish.
- *
- * @param s Pointer to the #qsched.
- * @param nr_threads Number of threads to let through the barrier.
- *
- * Lets the specified number of threads through the barrier. Note that
- * this assumes that at least that many threads are waiting there,
- * otherwise this function will hang indefinitely.
- */
-void qsched_launch_threads ( struct qsched *s , int nr_threads ) {
-
-#if defined( HAVE_PTHREAD )
-    /* Wait for all the runners to have entered the barrier. */
-    while ( s->barrier_running )
-        if ( pthread_cond_wait( &s->barrier_cond , &s->barrier_mutex ) != 0 )
-            error( "Error while waiting for barrier." );
-
-    /* Cry havoc and let loose the dogs of war. */
-    s->barrier_count = nr_threads;
-    s->barrier_launchcount = nr_threads;
-    if ( pthread_cond_broadcast( &s->barrier_cond ) != 0 )
-        error( "Failed to broadcast barrier open condition." );
-
-    /* Lean back and wait for the runners to come home. */
-    while ( s->barrier_count || s->barrier_running )
-        if ( pthread_cond_wait( &s->barrier_cond , &s->barrier_mutex ) != 0 )
-            error( "Error while waiting for barrier." );
-#endif
-            
-    }
-    
-
-void *qsched_pthread_run ( void *in ) {
-
-  struct qsched_pthread_runner *r = (struct qsched_pthread_runner *)in;
-  struct qsched *s = r->s;
-  int tid = r->tid;
-  struct task *t;
-  
-  /* Main loop. */
-  while ( 1 ) {
-  
-    /* Wait at the barrier. */
-    qsched_barrier_wait( s , tid );
-    
-    /* If there is no function to execute, then just quit. */
-    if (s->fun == NULL) pthread_exit(NULL);
-  
-    /* Loop as long as there are tasks. */
-    while ( ( t = qsched_gettask( s , tid ) ) != NULL ) {
 
-        /* Call the user-supplied function on the task with its data. */
-        #ifndef WITH_MPI
-        s->fun( t->type , &s->data[ t->data ] );
-        #else
-        s->fun( s,  t->type , &s->data[ t->data ] );        
-        #endif
-        /* Mark that task as done. */
-        qsched_done( s , t );
-
-        } /* loop as long as there are tasks. */
-            
-    } /* main loop. */
-
-  }
-    
-    
 /**
  * @brief Execute all the tasks in the current scheduler using
  *        pthreads.
@@ -3847,10 +3896,6 @@ void qsched_enqueue ( struct qsched *s , struct task *t ) {
     int j, qid, scores[ s->nr_queues ], oid;
 
 
-   if(t->type < -150 || t->type == 0)
-    {
-        message("Enqueueing a wierd task with type %i", t->type);
-    }
     /* When we enqueue the send or recv tasks we emit the Isend/Irecv commands. */
     if(t->type == task_type_send ){
         #ifdef WITH_MPI
@@ -3969,6 +4014,14 @@ void qsched_enqueue ( struct qsched *s , struct task *t ) {
         qsched_done( s , t );
         
         }
+    /* If its a send or recv place it in queue 0. */
+    /*TODO else if (t->type == task_type_send || t->type == task_type_recv){
+        #ifdef WITH_MPI
+        queue_put(&s->queues[qid], s, t->id);
+        #else
+        error("Task of type send/recv in non-MPI process - please don't use these reserved types.");
+        #endif
+    }*/
     /* Otherwise, find a home (queue) for it. */
     else {
     
@@ -4313,7 +4366,7 @@ struct task *qsched_gettask ( struct qsched *s , int qid ) {
             
         /* Otherwise, take a nap? */
         #ifdef HAVE_PTHREAD
-        else if ( s->flags & qsched_flag_yield ) {
+        else if ( s->flags & qsched_flag_yield  && qid != 0) {
             TIMER_TOC( s , qsched_timer_gettask );
             pthread_mutex_lock( &s->mutex );
             if ( s->waiting )
diff --git a/src/queue.c b/src/queue.c
index 10f1eb1..6306c33 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -139,39 +139,6 @@ long long int queue_get ( struct queue *q , struct qsched *s , int insist ) {
     /* Otherwise, clear the task ID. */
     else
     {
-//        message("q->count = %i", q->count);
-       /* for(k = 0; k < q->count; k++)
-        {
-            if(s->tasks[gettaskindex(q->inds[k],s)].type == task_type_recv)
-            {
-                struct task *t = &s->tasks[gettaskindex(q->inds[k], s)];
-                if(t == NULL)
-                    error("We got a null task?");
-               // int *data = (int*)(&s->data[t->data]);
-                //int *data = (int*)(&s->data[s->tasks[gettaskindex(q->inds[k],s)].data]);
-             //   int tag = data[4];
-             //   message("Failed to receive data (yet) with tag %i", tag);
-            }
-            else if(s->tasks[gettaskindex(q->inds[k],s)].type == task_type_send)
-            {
-                struct task *t = &s->tasks[gettaskindex(q->inds[k], s)];
-                if(t == NULL)
-                    error("We got a null task?");
-                int *data = (int*)(&s->data[t->data]);
-                int tag = data[4];
-            }
-            else if (s->tasks[gettaskindex(q->inds[k],s)].type < 0)
-            {
-                int i;
-                for(i = 0; i < q->count; i++)
-                {
-                    message("q[%i] = task %lli, index %i, type %i", i, q->inds[i],gettaskindex(q->inds[i],s ), s->tasks[gettaskindex(q->inds[i],s)].type);
-                }
-                #ifdef WITH_MPI
-                error("Got here somehow, type is %i, nr_locks = %i. k = %i, count = %i. s->count = %i", s->tasks[gettaskindex(q->inds[k],s)].type, s->tasks[gettaskindex(q->inds[k],s)].nr_locks, k, q->count, s->task_ranks[s->count_ranks]);
-#endif
-            }
-        } */  
         tid = qsched_task_none;
     }     
        
-- 
GitLab