From 9c25a2e1e9666cb7622cd111d13336c1484de087 Mon Sep 17 00:00:00 2001
From: "Peter W. Draper" <p.w.draper@durham.ac.uk>
Date: Thu, 7 May 2020 19:30:22 +0100
Subject: [PATCH] Only poll for active ranks and subtypes

Doesn't gain much
---
 swiftmpirdmastepsim.c | 78 +++++++++++++++++++++++++++++++------------
 1 file changed, 57 insertions(+), 21 deletions(-)

diff --git a/swiftmpirdmastepsim.c b/swiftmpirdmastepsim.c
index f2e21e3..7911b39 100644
--- a/swiftmpirdmastepsim.c
+++ b/swiftmpirdmastepsim.c
@@ -92,6 +92,12 @@ static struct mpiuse_log_entry **volatile recv_queue;
 static int volatile nr_recv = 0;
 static int volatile todo_recv = 0;
 
+/* Flags of active ranks and subtypes. */
+static int *volatile active_ranks = NULL;
+static int volatile nr_active_ranks = 0;
+static int *volatile active_subtypes = NULL;
+static int volatile nr_active_subtypes = 0;
+
 /**
  * @brief Convert a byte count into a number of blocks, rounds up.
  *
@@ -207,7 +213,6 @@ static void *send_thread(void *arg) {
      * on a get, as the local window is only used for receiving. Use an Rget
      * so we can use MPI_Test to get some local progression. */
     newval[0] = UNLOCKED;
-    size_t counter = 0;
     while (newval[0] != LOCKED) {
 
       MPI_Request request;
@@ -218,21 +223,13 @@ static void *send_thread(void *arg) {
       if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Rget failed");
 
       /* After the rget to make sure we get a chance at completion. */
-      ret =
-          MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);  // emergency
+      ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
       if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
 
       int flag = 0;
       while (flag == 0) {
         ret = MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
         if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Test failed");
-
-        counter++;
-        if (counter > 1000000) {
-          message("message sent to %d/%d %zd %d/%d stuck in test",
-                  log->otherrank, log->subtype, newval[0], k, nr_send);
-          counter = 0;
-        }
       }
     }
 
@@ -253,8 +250,8 @@ static void *send_thread(void *arg) {
 static void *recv_thread(void *arg) {
 
   message(
-      "%d: recv thread starts, checking for %d messages %d ranks "
-      " %d communicators",
+      "%d: recv thread starts, checking for %d messages %d "
+      "ranks %d communicators",
       *((int *)arg), nr_recv, nr_ranks, task_subtype_count);
   ticks starttics = getticks();
 
@@ -270,34 +267,37 @@ static void *recv_thread(void *arg) {
   /* We loop while new requests are being send and we still have messages
    * to receive. */
   while (todo_recv > 0) {
-    for (int n = 0; n < nr_ranks; n++) {
+    for (int n = 0; n < nr_active_ranks; n++) {
       if (todo_recv <= 0) break;
-      if (n == myrank) continue;
 
-      for (int j = 0; j < task_subtype_count; j++) {
+      int rank = active_ranks[n];
+      if (rank == myrank) continue;
+
+      for (int j = 0; j < nr_active_subtypes; j++) {
         if (todo_recv <= 0) break;
+        int subtype = active_subtypes[j];
 
-        BLOCKTYPE *dataptr = &mpi_ptr[j][n * MESSAGE_SIZE];
+        BLOCKTYPE *dataptr = &mpi_ptr[subtype][rank * MESSAGE_SIZE];
         if (dataptr[0] == UNLOCKED) {
 
           /* We have a message waiting to be handled, find the log. */
           int found = 0;
           for (int k = 0; k < nr_recv; k++) {
             struct mpiuse_log_entry *log = recv_queue[k];
-            if (log != NULL && !log->done && log->otherrank == n &&
-                log->subtype == j && log->size == dataptr[1] &&
+            if (log != NULL && !log->done && log->otherrank == rank &&
+                log->subtype == subtype && log->size == dataptr[1] &&
                 log->tag == dataptr[2]) {
               found = 1;
 
               if (verbose)
                 message("receive message %d/%d from %d @ %zd: dataptr[0] %zd",
-                        log->rank, log->subtype, n, n * MESSAGE_SIZE,
+                        log->rank, log->subtype, rank, rank * MESSAGE_SIZE,
                         dataptr[0]);
 
               /* Check data sent data is unchanged and received data is as
                * expected. */
               if (datacheck && !datacheck_test(toblocks(log->size),
-                                               &dataptr[HEADER_SIZE], n)) {
+                                               &dataptr[HEADER_SIZE], rank)) {
                 message("Data mismatch on completion");
               }
 
@@ -363,6 +363,12 @@ static size_t pick_logs() {
       nlogs, sizeof(struct mpiuse_log_entry *));
   nr_recv = 0;
 
+  /* Flags for active elements. */
+  active_ranks = (int *)calloc(nr_ranks, sizeof(int));
+  nr_active_ranks = 0;
+  active_subtypes = (int *)calloc(task_subtype_count, sizeof(int));
+  nr_active_subtypes = 0;
+
   for (int k = 0; k < nlogs; k++) {
     struct mpiuse_log_entry *log = mpiuse_get_log(k);
     if (log->activation) {
@@ -378,19 +384,47 @@ static size_t pick_logs() {
         } else {
           error("task type '%d' is not a known send or recv task", log->type);
         }
+
+        /* Activative rank and subtype. */
+        active_ranks[log->otherrank] = 1;
+        active_subtypes[log->subtype] = 1;
       }
+
       /* Across all ranks. */
       if (log->size > maxsize) maxsize = log->size;
     }
   }
 
+  /* Active ranks and subtypes into indices. */
+  nr_active_ranks = 0;
+  for (int k = 0; k < nr_ranks; k++) {
+    if (active_ranks[k]) {
+      active_ranks[nr_active_ranks] = k;
+      message("active rank: %d", k);
+      nr_active_ranks++;
+    }
+  }
+
+  nr_active_subtypes = 0;
+  for (int k = 0; k < task_subtype_count; k++) {
+    if (active_subtypes[k]) {
+      active_subtypes[nr_active_subtypes] = k;
+      message("active subtype: %d", k);
+      nr_active_subtypes++;
+    }
+  }
+
   /* Sort into increasing tag. */
   qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
   qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
 
-  if (verbose)
+  if (verbose) {
     message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
             nr_recv);
+    message("active ranks = %d, active subtypes = %d", nr_active_ranks,
+            nr_active_subtypes);
+  }
+
   return maxsize;
 }
 
@@ -459,6 +493,8 @@ int main(int argc, char *argv[]) {
 
   /* Extract the send and recv messages for our rank. */
   size_t maxsize = pick_logs();
+  message("active ranks = %d, active subtypes = %d", nr_active_ranks,
+          nr_active_subtypes);
 
   /* Size of a message board. Needs to align on size_t. */
   MESSAGE_SIZE = toblocks(maxsize) + HEADER_SIZE;
-- 
GitLab