From 2502a55ea1ef89045bf85955d5bc2e9dcbb0724d Mon Sep 17 00:00:00 2001
From: "Peter W. Draper" <p.w.draper@durham.ac.uk>
Date: Wed, 10 Jun 2020 14:39:16 +0100
Subject: [PATCH] Split the work of handling the local finalization of sends
 into two and use two threads to handle that

---
 swiftmpirdmastepsim.c | 74 ++++++++++++++++++++++++++++++++-----------
 1 file changed, 56 insertions(+), 18 deletions(-)

diff --git a/swiftmpirdmastepsim.c b/swiftmpirdmastepsim.c
index 8169235..5b4906d 100644
--- a/swiftmpirdmastepsim.c
+++ b/swiftmpirdmastepsim.c
@@ -125,10 +125,15 @@ static int volatile nr_recv = 0;
 static int volatile todo_recv = 0;
 
 /* The local requests queue. */
-static struct mpiuse_log_entry **volatile req_queue;
-static int volatile ind_req = 0;
-static int volatile nr_req = 0;
-static int volatile todo_req = 0;
+static struct mpiuse_log_entry **volatile req_queue1;
+static int volatile ind_req1 = 0;
+static int volatile nr_req1 = 0;
+static int volatile todo_req1 = 0;
+
+static struct mpiuse_log_entry **volatile req_queue2;
+static int volatile ind_req2 = 0;
+static int volatile nr_req2 = 0;
+static int volatile todo_req2 = 0;
 
 /**
  * @brief Convert two ranks and tag into a single unique value.
@@ -263,10 +268,16 @@ static void *send_thread(void *arg) {
                               mpi_window[log->subtype], &log->req);
     if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data");
 
-    /* Add to the requests queue. */
-    int ind = atomic_inc(&nr_req);
-    req_queue[ind] = log;
-    atomic_inc(&todo_req);
+    /* Add to the requests one of the queues. */
+    if (k%2) {
+      int ind = atomic_inc(&nr_req1);
+      req_queue1[ind] = log;
+      atomic_inc(&todo_req1);
+    } else {
+      int ind = atomic_inc(&nr_req2);
+      req_queue2[ind] = log;
+      atomic_inc(&todo_req2);
+    }
   }
 
   /* All done. */
@@ -283,11 +294,27 @@ static void *send_thread(void *arg) {
  */
 static void *req_thread(void *arg) {
 
-  message("%d: req thread starts with %d messages", *((int *)arg), nr_req);
+  /* Our thread index for selecting queue. */
+  int us = *((int *)arg);
+
+  struct mpiuse_log_entry **volatile req_queue = NULL;
+  int volatile *todo_req = NULL;
+  int volatile *nr_req = NULL;
+  if (us == 1) {
+    req_queue = req_queue1;
+    todo_req = &todo_req1;
+    nr_req = &nr_req1;
+  } else {
+    req_queue = req_queue2;
+    todo_req = &todo_req2;
+    nr_req = &nr_req2;
+  }
+
+  message("%d: req thread starts with %d messages", us, *nr_req);
   ticks starttics = getticks();
 
-  while (injecting || (!injecting && todo_req > 0)) {
-    int nlogs = nr_req;
+  while (injecting || (!injecting && *todo_req > 0)) {
+    int nlogs = *nr_req;
     for (int k = 0; k < nlogs; k++) {
       struct mpiuse_log_entry *log = req_queue[k];
       if (log != NULL && !log->done) {
@@ -326,13 +353,13 @@ static void *req_thread(void *arg) {
           log->done = 1;
           log->endtic = getticks();
           free(log->data);
-          atomic_dec(&todo_req);
+          atomic_dec(todo_req);
         }
       }
     }
   }
 
-  message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
+  message("%d: took %.3f %s.", us, clocks_from_ticks(getticks() - starttics),
           clocks_getunit());
 
   return NULL;
@@ -445,9 +472,12 @@ static void pick_logs() {
   int nlogs = mpiuse_nr_logs();
 
   /* Duplicate of logs. Bit large... */
-  req_queue = (struct mpiuse_log_entry **)calloc(
+  req_queue1 = (struct mpiuse_log_entry **)calloc(
       nlogs, sizeof(struct mpiuse_log_entry *));
-  nr_req = 0;
+  nr_req1 = 0;
+  req_queue2 = (struct mpiuse_log_entry **)calloc(
+      nlogs, sizeof(struct mpiuse_log_entry *));
+  nr_req2 = 0;
   send_queue = (struct mpiuse_log_entry **)calloc(
       nlogs, sizeof(struct mpiuse_log_entry *));
   nr_send = 0;
@@ -643,16 +673,24 @@ int main(int argc, char *argv[]) {
   pthread_t sendthread;
   if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
     error("Failed to create send thread.");
-  pthread_t reqthread;
-  if (pthread_create(&reqthread, NULL, &req_thread, &myrank) != 0)
+
+  int threadno1 = 1;
+  pthread_t reqthread1;
+  if (pthread_create(&reqthread1, NULL, &req_thread, &threadno1) != 0)
+    error("Failed to create send thread.");
+  int threadno2 = 2;
+  pthread_t reqthread2;
+  if (pthread_create(&reqthread2, NULL, &req_thread, &threadno2) != 0)
     error("Failed to create send thread.");
+
   pthread_t recvthread;
   if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
     error("Failed to create recv thread.");
 
   /* Wait until all threads have exited and all message exchanges have
    * completed. */
-  pthread_join(reqthread, NULL);
+  pthread_join(reqthread1, NULL);
+  pthread_join(reqthread2, NULL);
   pthread_join(sendthread, NULL);
   pthread_join(recvthread, NULL);
 
-- 
GitLab