From e95cd7c13d7797eb05030b89357c8fe171e41041 Mon Sep 17 00:00:00 2001
From: "Peter W. Draper" <p.w.draper@durham.ac.uk>
Date: Mon, 23 Sep 2019 16:31:38 +0100
Subject: [PATCH] Increase no. of injection threads to 2

---
 swiftmpistepsim.c | 107 ++++++++++++++++++++++++++++++++--------------
 1 file changed, 74 insertions(+), 33 deletions(-)

diff --git a/swiftmpistepsim.c b/swiftmpistepsim.c
index 9f57fb7..02e8f67 100644
--- a/swiftmpistepsim.c
+++ b/swiftmpistepsim.c
@@ -47,14 +47,16 @@ static const int task_subtype_count = 30;  // Just some upper limit on subtype.
 static MPI_Comm subtypeMPI_comms[30];
 
 /* The local queues. */
-static struct mpiuse_log_entry **volatile reqs_queue;
-static int volatile ind_req = 0;
-static int volatile nr_reqs = 0;
-static int volatile injecting = 1;
+static struct mpiuse_log_entry **volatile reqs_queue[2];
+static int volatile ind_req[2] = {0,0};
+static int volatile nr_reqs[2] = {0,0};
+static int volatile injecting[2] = {1,1};
+
 static struct mpiuse_log_entry **volatile recvs_queue;
 static int volatile nr_recvs = 0;
 static int volatile ind_recv = 0;
 static int volatile todo_recv = 0;
+
 static struct mpiuse_log_entry **volatile sends_queue;
 static int volatile nr_sends = 0;
 static int volatile ind_send = 0;
@@ -64,25 +66,18 @@ static int volatile todo_send = 0;
 // XXX need to store this in the data file.
 static double log_clocks_cpufreq = 2194844448.0;
 
-/**
- * @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
- *
- * The requests are initiated in the time order of the original log and an
- * attempt to start these with the same interval gap is made if usetics is
- * set, otherwise we just do them as quickly as possible.
- */
-static void *inject_thread(void *arg) {
-
-  if (verbose) message("%d: injection thread starts", *((int *)arg));
+static void injection_runner(int qid) {
+  if (verbose) message("%d: injection thread starts", qid);
   ticks starttics = getticks();
+  struct mpiuse_log_entry **reqs = reqs_queue[qid];
 
   /* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */
-  ticks basetic = reqs_queue[0]->tic;
+  ticks basetic = reqs[0]->tic;
   ticks looptics = 0;
   double deadtime = 0.0;
 
-  while (ind_req < nr_reqs) {
-    struct mpiuse_log_entry *log = reqs_queue[ind_req];
+  while (ind_req[qid] < nr_reqs[qid]) {
+    struct mpiuse_log_entry *log = reqs[ind_req[qid]];
 
     if (usetics) {
       /* Expect time between this request and the previous one. */
@@ -101,7 +96,8 @@ static void *inject_thread(void *arg) {
 
         /* Remember to be fair and remove the looptics, then convert to
          * nanoseconds. */
-        double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9;
+        //double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9;
+        double ns = (double)(looptics) / log_clocks_cpufreq * 1.0e9;
         if (ns < 1.0e9) {
           sleep.tv_nsec = (long)ns;
         } else {
@@ -146,12 +142,12 @@ static void *inject_thread(void *arg) {
     }
     if (err != MPI_SUCCESS) error("Failed to activate send or recv");
 
-    ind_req++;
+    ind_req[qid]++;
 
     /* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are
      * equally timed. */
     if (looptics == 0 && usetics) {
-      looptics = getticks() - starttics;
+        looptics = getticks() - starttics;
       if (verbose)
         message("injection loop took %.3f %s.", clocks_from_ticks(looptics),
                 clocks_getunit());
@@ -160,16 +156,39 @@ static void *inject_thread(void *arg) {
 
   /* All done, thread exiting. */
   if (verbose) {
-    message("%d injections completed, sends = %d, recvs = %d", ind_req,
+    message("%d injections completed, sends = %d, recvs = %d", ind_req[qid],
             nr_sends, nr_recvs);
     message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
     if (usetics) message("deadtime %.3f ms", deadtime / 1.0e6);
   }
   message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
           clocks_getunit());
-  atomic_dec(&injecting);
+  atomic_dec(&injecting[qid]);
+}
+
+/**
+ * @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
+ *
+ * The requests are initiated in the time order of the original log and an
+ * attempt to start these with the same interval gap is made if usetics is
+ * set, otherwise we just do them as quickly as possible.
+ */
+static void *inject_thread1(void *arg) {
+  injection_runner(0);
   return NULL;
 }
+/**
+ * @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
+ *
+ * The requests are initiated in the time order of the original log and an
+ * attempt to start these with the same interval gap is made if usetics is
+ * set, otherwise we just do them as quickly as possible.
+ */
+static void *inject_thread2(void *arg) {
+  injection_runner(1);
+  return NULL;
+}
+
 
 /**
  * @brief main loop to run over a queue of MPI requests and test for when they
@@ -198,7 +217,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
 
   /* We loop while new requests are being injected and we still have requests
    * to complete. */
-  while (injecting || (!injecting && *todos > 0)) {
+  while ((injecting[0] || injecting[1]) || (!injecting[0] && !injecting[1] && *todos > 0)) {
     int nlogs = *nr_logs;
     for (int k = 0; k < nlogs; k++) {
       struct mpiuse_log_entry *log = logs[k];
@@ -248,7 +267,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
  */
 static void *send_thread(void *arg) {
 
-  if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting);
+  if (verbose) message("%d: send thread starts", *((int *)arg));
   ticks starttics = getticks();
 
   int ncalls;
@@ -319,9 +338,8 @@ static void pick_logs(void) {
   size_t nlogs = mpiuse_nr_logs();
 
   /* Duplicate of logs. */
-  reqs_queue = (struct mpiuse_log_entry **)malloc(
-      sizeof(struct mpiuse_log_entry *) * nlogs);
-  nr_reqs = 0;
+  struct mpiuse_log_entry **reqs = (struct mpiuse_log_entry **)malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
+  int nreqs = 0;
   sends_queue = (struct mpiuse_log_entry **)malloc(
       sizeof(struct mpiuse_log_entry *) * nlogs);
   nr_sends = 0;
@@ -338,8 +356,8 @@ static void pick_logs(void) {
         log->data = calloc(log->size, 1);
 
         /* And keep this log. */
-        reqs_queue[nr_reqs] = log;
-        nr_reqs++;
+        reqs[nreqs] = log;
+        nreqs++;
 
       } else {
         error("task type '%d' is not a known send or recv task", log->type);
@@ -348,7 +366,26 @@ static void pick_logs(void) {
   }
 
   /* Sort into increasing time. */
-  qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
+  qsort(reqs, nreqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
+
+  /* And partition into queues for injection. Use interleave pick so that
+   * close in time injects are on different queues. */
+  reqs_queue[0] = (struct mpiuse_log_entry **)malloc(
+      sizeof(struct mpiuse_log_entry *) * nlogs);
+  nr_reqs[0] = 0;
+  reqs_queue[1] = (struct mpiuse_log_entry **)malloc(
+      sizeof(struct mpiuse_log_entry *) * nlogs);
+  nr_reqs[1] = 0;
+
+  for (int k = 0; k < nreqs; k++) {
+    int qid = k % 2;
+    reqs_queue[qid][nr_reqs[qid]] = reqs[k];
+    nr_reqs[qid]++;
+  }
+
+  message("nr_reqs = %d, %d", nr_reqs[0], nr_reqs[1]);
+
+  free(reqs);
 }
 
 /**
@@ -432,8 +469,11 @@ int main(int argc, char *argv[]) {
 
   /* Make three threads, one for injecting tasks and two to check for
    * completions of the sends and recv independently. */
-  pthread_t injectthread;
-  if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
+  pthread_t injectthread1;
+  if (pthread_create(&injectthread1, NULL, &inject_thread1, &myrank) != 0)
+    error("Failed to create injection thread.");
+  pthread_t injectthread2;
+  if (pthread_create(&injectthread2, NULL, &inject_thread2, &myrank) != 0)
     error("Failed to create injection thread.");
   pthread_t sendthread;
   if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
@@ -443,7 +483,8 @@ int main(int argc, char *argv[]) {
     error("Failed to create recv thread.");
 
   /* Wait until all threads have exited and all MPI requests have completed. */
-  pthread_join(injectthread, NULL);
+  pthread_join(injectthread1, NULL);
+  pthread_join(injectthread2, NULL);
   pthread_join(sendthread, NULL);
   pthread_join(recvthread, NULL);
 
-- 
GitLab