From 0cd762493302586c900efc98d0ac6e7790d5ac97 Mon Sep 17 00:00:00 2001
From: "Peter W. Draper" <p.w.draper@durham.ac.uk>
Date: Fri, 15 May 2020 17:40:41 +0100
Subject: [PATCH] Cleanup

---
 swiftmpirdmastepsim.c | 195 ++++++++++++++----------------------------
 1 file changed, 63 insertions(+), 132 deletions(-)

diff --git a/swiftmpirdmastepsim.c b/swiftmpirdmastepsim.c
index f72ddbd..cc78e8a 100644
--- a/swiftmpirdmastepsim.c
+++ b/swiftmpirdmastepsim.c
@@ -20,23 +20,22 @@
 // Fully one sided approach with passive target communication. This means only
 // the sending side updates the window buffer and since we have threaded
 // access we can only use flushes with a shared lock that is permanently open
-// to move data. The send side has no window, as it only pushes data.
+// to move data. The send side has no associated window, as it only pushes data.
 //
 // So each rank needs a receive window that has room for all the expected
-// sends, plus an additional element for controlling the readyness of the data
+// sends, plus additional elements for controlling the readiness of the data
 // (this is an atomic send that should be guaranteed to only be updated after
-// the send of the main data).
+// the send of the main data) and correctness.
 //
 // In this implementation the size of the receive buffer per rank is just the
 // sum of all the messages we know we are about to get. The order of that
-// buffer is determined by the rank and tags, which gives us a list of offsets
-// into the buffer mapped by the ranktag, which we need to share with any rank
-// that is expected to send us data. We'll send this data using normal MPI
-// (could be done either as another extension into the window, which we get,
-// but we'd need to synchronize that across all ranks, or we could use the
-// global communicator to share this in a similar fashion).
-//
-// XXX ranktag per subtype is not unique we need to include the size.
+// buffer is determined by the send and receive rank and the tag, which gives
+// us a list of offsets into the buffer mapped by the ranktag, which we need
+// to share with any rank that is expected to send us data. We'll send this
+// data using normal MPI (could be done either as another extension into the
+// window, which we get, but we'd need to synchronize that across all ranks,
+// or we could use the global communicator to share this in a similar
+// fashion).
 
 #include <limits.h>
 #include <mpi.h>
@@ -67,21 +66,24 @@ int myrank = -1;
 /* Number of ranks. */
 static int nr_ranks;
 
+/* Bit shift to accomodate all the bits of the maximum rank id. */
+static int rank_shift = 0;
+
 /* Maximum no. of messages (logs). */
 static size_t max_logs = 0;
 
-/* Flags for controlling access. */
+/* Flags for controlling access. High end of size_t. */
 static size_t UNLOCKED = (((size_t)2 << 63) - 1);
 
 /* Size of a block of memory. All addressible memory chunks need to be a
- * multiple of this as as we need to align sends in memory. */
+ * multiple of this as we need to align sends and receives in memory. */
 #define BLOCKTYPE size_t
 #define MPI_BLOCKTYPE MPI_AINT
 static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
 
-/* Size of message header in blocks. The flag, size and tag. Note size and tag
- * are just for sanity checks. The flag value controls access to the main data
- * areas. */
+/* Size of message header in blocks. The unlocked flag, size and tag. Note
+ * size and tag are just for sanity checks. The flag value controls access to
+ * the main data areas. */
 static const size_t HEADER_SIZE = 3;
 
 /* Are we verbose. */
@@ -106,7 +108,6 @@ static BLOCKTYPE *mpi_ptr[task_subtype_count] = {NULL};
 static size_t ranktag_sizes[task_subtype_count] = {0};
 static size_t *ranktag_counts;
 static size_t *ranktag_offsets;
-;
 static size_t *ranktag_lists;
 
 /* The local send queue. */
@@ -120,7 +121,7 @@ static int volatile nr_recv = 0;
 static int volatile todo_recv = 0;
 
 /**
- * @brief Convert ranks and tag into a single unique value.
+ * @brief Convert two ranks and tag into a single unique value.
  *
  * Assumes there is enough space in a size_t for these values.
  *
@@ -131,21 +132,10 @@ static int volatile todo_recv = 0;
  * @result a unique value based on both values
  */
 static size_t toranktag(int sendrank, int recvrank, int tag) {
-  
-  int shift = (sizeof(int) * 8) - __builtin_clz(nr_ranks); /* XXX could precalc. */
-  //message("nr_ranks = %d, shift = %d", nr_ranks, shift);
-  size_t result = sendrank | recvrank << shift | tag << (shift * 2);
+  size_t result = sendrank | recvrank << rank_shift | tag << (rank_shift * 2);
   return result;
 }
 
-static char *tokey(struct mpiuse_log_entry *log) {
-  static char buf[256];
-  sprintf(buf, "%d/%d/%d/%zd on %d ranktags %zd/%zd", 
-          log->otherrank, log->tag, log->subtype, log->size, log->rank,
-          toranktag(log->rank, log->otherrank, log->tag), toranktag(log->rank, log->otherrank, log->tag));
-  return buf;
-}
-
 /**
  * @brief Convert a byte count into a number of blocks, rounds up.
  *
@@ -179,7 +169,8 @@ static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
 }
 
 /**
- * @brief test a filled data area for a value.
+ * @brief test a filled data area for a value, reports if any unexpected value
+ * is found.
  *
  * @param size size of data in bytes.
  * @param data the data to check.
@@ -229,37 +220,27 @@ static void *send_thread(void *arg) {
      * subtype, tag and rank. Need to search the ranktag_lists for our ranktag
      * value. XXX bisection search XXX */
     size_t ranktag = toranktag(log->rank, log->otherrank, log->tag);
-    //message("looking for %s", tokey(log));
     size_t counts = ranktag_counts[INDEX2(task_subtype_count, log->subtype,
                                           log->otherrank)];
     size_t offset = 0;
-    int found = 0;
-    struct mpiuse_log_entry *keeplog = NULL;
-
-    counts = max_logs;
 
+    int found = 0;
+    counts = max_logs;  // XXX do we still need this?
     for (size_t j = 0; j < counts; j++) {
       if (ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype,
                                log->otherrank, j)] == ranktag) {
-        if (found) message("duplicate %zd: %s of %s", ranktag, tokey(log), tokey(keeplog));
         offset = ranktag_offsets[INDEX3(task_subtype_count, nr_ranks,
                                         log->subtype, log->otherrank, j)];
-        keeplog = log;
-        //message("%d/%d located offset %zd one of %zd ranktag: %zd", log->otherrank,
-        //        log->subtype, offset, counts, ranktag);
         found = 1;
-        //break;
+        break;
       }
     }
     if (!found) {
-      fflush(stdout); fflush(stderr);
-      error("Failed sending a message of size %zd to %d/%d @ %zd\n, no offset"
-            " found for ranktag %zd, counts = %zd", datasize, log->otherrank, log->subtype,
-            offset, ranktag, counts);
+      error(
+          "Failed sending a message of size %zd to %d/%d "
+          "@ %zd\n, no offset found for ranktag %zd, counts = %zd",
+          datasize, log->otherrank, log->subtype, offset, ranktag, counts);
     }
-    //message("Sending a message of size %zd to %d/%d @ %zd", datasize,
-    //        log->otherrank, log->subtype, offset);
-
 
     /* And send data to other rank. */
     int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
@@ -286,19 +267,6 @@ static void *send_thread(void *arg) {
 
     ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
     if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
-
-    //if (verbose) {
-    //  if (oldval[0] == dataptr[0]) {
-    //    message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)",
-    //            log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
-    //            offset, k, nr_send);
-    //  } else {
-    //    message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd %d/%d",
-    //            log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
-    //            offset, k, nr_send);
-    //  }
-    //}
-    //sleep(1);
   }
 
   message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
@@ -318,19 +286,12 @@ static void *recv_thread(void *arg) {
       *((int *)arg), nr_recv, nr_ranks, task_subtype_count);
   ticks starttics = getticks();
 
-  /* Global statistics. */
-  //int lncalls = 0;
-  //double lsum = 0.0;
-  //ticks lmint = INT_MAX;
-  //ticks lmaxt = 0;
-
   /* No. of receives to process. */
   todo_recv = nr_recv;
 
   /* We loop while new requests are being send and we still have messages
    * to receive. */
   while (todo_recv > 0) {
-    //sleep(5);
     for (int k = 0; k < nr_recv; k++) {
       struct mpiuse_log_entry *log = recv_queue[k];
       if (log != NULL && !log->done) {
@@ -341,28 +302,18 @@ static void *recv_thread(void *arg) {
         /* Check if that part of the window has been unlocked. */
         BLOCKTYPE *dataptr = &mpi_ptr[log->subtype][offset];
         BLOCKTYPE volatile lock = dataptr[0];
-        //message("Checking: %s @ %zd", (dataptr[0] == UNLOCKED?"unlocked":"locked"), offset);
-        //if (atomic_cas(&dataptr[0], UNLOCKED, UNLOCKED)) {
         if (lock == UNLOCKED) {
-          //message("Checked: %s @ %zd", (dataptr[0] == UNLOCKED?"unlocked":"locked"), offset);
 
           /* OK, so data should be ready for use, check the tag and size. */
-          if ((size_t)log->size == dataptr[1] && (size_t)log->tag == dataptr[2]) {
-            if (verbose)
-              //message("receive message %d/%d from %d @ offset %zd: "
-              //        "dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
-              //        log->otherrank, offset, dataptr[0], log->size,
-              //        toblocks(log->size) + HEADER_SIZE);
-
-            /* Check data sent data is unchanged. */
-            if (datacheck) {
-              if (!datacheck_test(toblocks(log->size),
-                                  &dataptr[HEADER_SIZE], log->otherrank)) {
-                fflush(stdout);
-                fflush(stderr);
-                error("Data mismatch on completion");
+          if ((size_t)log->size == dataptr[1] &&
+              (size_t)log->tag == dataptr[2]) {
+            if (verbose) /* Check data sent data is unchanged. */
+              if (datacheck) {
+                if (!datacheck_test(toblocks(log->size), &dataptr[HEADER_SIZE],
+                                    log->otherrank)) {
+                  error("Data mismatch on completion");
+                }
               }
-            }
 
             /* Done, clean up. */
             log->done = 1;
@@ -370,20 +321,10 @@ static void *recv_thread(void *arg) {
             if (todo_recv == 0) break;
 
           } else {
-            message("bad unlocked message %d/%d from %d @ offset %zd: "
-                    "dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
-                    log->otherrank, offset, dataptr[0], log->size,
-                    toblocks(log->size) + HEADER_SIZE);
             error("Unlocked data has incorrect tag or size: %zd/%zd %d/%zd",
                   log->size, dataptr[1], log->tag, dataptr[2]);
           }
-        } 
-        //else {
-        //  message("unlocked message %d/%d from %d @ offset %zd: "
-        //          "dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
-        //          log->otherrank, offset, dataptr[0], log->size,
-        //          toblocks(log->size) + HEADER_SIZE);
-        //}
+        }
 
         /* Need to allow for some MPI progession. Since we make no MPI calls
          * (by intent receive is a passive target so only the sender should
@@ -448,7 +389,6 @@ static void pick_logs() {
         log->done = 0;
         log->data = NULL;
         log->ranktag = toranktag(log->otherrank, log->rank, log->tag);
-        //message("add %s", tokey(log));
         if (log->type == task_type_send) {
           send_queue[nr_send] = log;
           nr_send++;
@@ -478,20 +418,18 @@ static void pick_logs() {
    * of the windows. */
   for (int k = 0; k < nr_recv; k++) {
     struct mpiuse_log_entry *log = recv_queue[k];
-    ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank, k)] = log->ranktag;
-    ranktag_offsets[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank, k)] = ranktag_sizes[log->subtype];
+    ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank,
+                         k)] = log->ranktag;
+    ranktag_offsets[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank,
+                           k)] = ranktag_sizes[log->subtype];
     log->offset = ranktag_sizes[log->subtype];
 
     /* Need to use a multiple of blocks to keep alignment. */
     size_t size = toblocks(log->size) + HEADER_SIZE;
-    //message("%d increment offset by %zd blocks", log->subtype, size);
     ranktag_sizes[log->subtype] += size;
 
     ranktag_counts[INDEX2(task_subtype_count, log->subtype, myrank)]++;
   }
-
-  if (verbose)
-    message("local logs picked, nr_send = %d, nr_recv = %d ", nr_send, nr_recv);
 }
 
 /**
@@ -557,39 +495,36 @@ int main(int argc, char *argv[]) {
     error("The number of MPI ranks %d does not match the expected value %d",
           nranks, nr_ranks);
 
+  /* Index of most significant bit in the maximum rank id. Assumes GCC
+   * intrinsic. */
+  rank_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(nr_ranks);
+
   /* We all need to agree on a maximum count of logs, so we can exchange the
-   * offset arrays (would be ragged otherwise). */
+   * offset arrays (would be ragged otherwise and difficult to exchange). */
   max_logs = mpiuse_nr_logs() / 2 + 1;
   MPI_Allreduce(MPI_IN_PLACE, &max_logs, 1, MPI_AINT, MPI_MAX, MPI_COMM_WORLD);
-  if (verbose) message("maximum log count = %zd", max_logs);
 
-  /* Extract the send and recv messages for our rank. */
+  /* Extract the send and recv messages for our rank and populate the queues. */
   pick_logs();
 
   /* Now for the one-sided setup... Each rank needs a buffer per communicator
    * with space for all the expected messages. */
   for (int i = 0; i < task_subtype_count; i++) {
-      MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
-      size_t size = tobytes(ranktag_sizes[i]);
-      if (size == 0) size = BYTESINBLOCK;
-      MPI_Win_allocate(size, BYTESINBLOCK, MPI_INFO_NULL,
-                       subtypeMPI_comms[i], &mpi_ptr[i], &mpi_window[i]);
-
-      memset(mpi_ptr[i], 170, tobytes(ranktag_sizes[i]));
-
-
-      //if (verbose)
-      //  message("Allocated window of size %zd for subtype %d",
-      //          ranktag_sizes[i], i);
-
-      /* Assert a shared lock with all the other processes on this
-       * window. Needed as we use threads, so cannot lock or unlock as a means
-       * of synchronization. */
-      MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]);
+    MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
+    size_t size = tobytes(ranktag_sizes[i]);
+    if (size == 0) size = BYTESINBLOCK;
+    MPI_Win_allocate(size, BYTESINBLOCK, MPI_INFO_NULL, subtypeMPI_comms[i],
+                     &mpi_ptr[i], &mpi_window[i]);
+
+    memset(mpi_ptr[i], 170, tobytes(ranktag_sizes[i]));
+
+    /* Assert a shared lock with all the other processes on this window.
+     * Strictly needed as we use threads, so cannot lock or unlock as
+     * a means of synchronization. */
+    MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]);
   }
-  //message("Windows allocated");
-  
-  /* We need to share all the offsets for each communucator with all the other
+
+  /* We need to share all the offsets for each communicator with all the other
    * ranks so they can push data into the correct parts of our receive
    * window. */
   MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets,
@@ -600,7 +535,6 @@ int main(int argc, char *argv[]) {
   MPI_Allreduce(MPI_IN_PLACE, ranktag_lists,
                 task_subtype_count * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
                 MPI_COMM_WORLD);
-  //message("check: local logs picked, nr_send = %d, nr_recv = %d ", nr_send, nr_recv);
 
   /* Time to start time. Try to make it synchronous across the ranks. */
   MPI_Barrier(MPI_COMM_WORLD);
@@ -618,9 +552,6 @@ int main(int argc, char *argv[]) {
   pthread_t sendthread;
   if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
     error("Failed to create send thread.");
-
-  /* XXX could have more than one of these... With a partition of the
-   * subtypes. */
   pthread_t recvthread;
   if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
     error("Failed to create recv thread.");
@@ -630,7 +561,7 @@ int main(int argc, char *argv[]) {
   pthread_join(sendthread, NULL);
   pthread_join(recvthread, NULL);
 
-  /* Free the window locks. Once we all arrive. */
+  /* Free the window locks. Only after we all arrive. */
   MPI_Barrier(MPI_COMM_WORLD);
   for (int i = 0; i < task_subtype_count; i++) {
     MPI_Win_unlock_all(mpi_window[i]);
-- 
GitLab