From 4e2e7b570706d25543f7a116329c40bc6687637c Mon Sep 17 00:00:00 2001
From: "Peter W. Draper" <p.w.draper@durham.ac.uk>
Date: Tue, 5 May 2020 21:25:16 +0100
Subject: [PATCH] Passes datacheck tests

---
 swiftmpirdmastepsim.c | 71 ++++++++++++++++++++++++-------------------
 1 file changed, 40 insertions(+), 31 deletions(-)

diff --git a/swiftmpirdmastepsim.c b/swiftmpirdmastepsim.c
index 49c39fe..2833780 100644
--- a/swiftmpirdmastepsim.c
+++ b/swiftmpirdmastepsim.c
@@ -55,10 +55,10 @@ static int UNLOCKED = -3;
  * as we need to align in memory. */
 #define BLOCKTYPE size_t
 #define MPI_BLOCKTYPE MPI_AINT
-static int BYTESINBLOCK = sizeof(BLOCKTYPE);
+static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
 
 /* Size of message header in blocks. The flag, size and tag. */
-static size_t HEADER_SIZE = 3;
+static const size_t HEADER_SIZE = 3;
 
 /* Size of a message board in blocks, we have one of these per rank per
  * communicator (i.e. per window). */
@@ -100,7 +100,7 @@ static int volatile todo_recv = 0;
  * @result the number of blocks needed.
  */
 static int toblocks(BLOCKTYPE nr_bytes) {
-  return (nr_bytes * (BYTESINBLOCK - 1)) / (BYTESINBLOCK * BYTESINBLOCK);
+  return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
 }
 
 /**
@@ -110,35 +110,35 @@ static int toblocks(BLOCKTYPE nr_bytes) {
  *
  * @result the number of bytes.
  */
-static BLOCKTYPE tobytes(int nr_blocks) {
-  return (nr_blocks * BYTESINBLOCK);
-}
+static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); }
 
 /**
- * @brief fill a data area with a pattern that can be checked for changes.
+ * @brief fill a data area with our rank.
  *
  * @param size size of data in bytes.
  * @param data the data to fill.
  */
-static void datacheck_fill(size_t size, void *data) {
-  unsigned char *p = (unsigned char *)data;
-  for (size_t i = 0; i < size; i++) {
-    p[i] = 170; /* 10101010 in bits. */
+static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
+  for (BLOCKTYPE i = 0; i < size; i++) {
+    data[i] = myrank; 
   }
 }
 
 /**
- * @brief test a filled data area for our pattern.
+ * @brief test a filled data area for a value.
  *
  * @param size size of data in bytes.
- * @param data the data to fill.
+ * @param data the data to check.
+ * @param rank the value to, i.e. original rank.
  *
  * @result 1 on success, 0 otherwise.
  */
-static int datacheck_test(size_t size, void *data) {
-  unsigned char *p = (unsigned char *)data;
+static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) {
   for (size_t i = 0; i < size; i++) {
-    if (p[i] != 170) return 0;
+    if (data[i] != rank) {
+      message("see %zd expected %d @ %zd", data[i], rank, i);
+      return 0;
+    }
   }
   return 1;
 }
@@ -162,7 +162,8 @@ static void *send_thread(void *arg) {
     log->data = dataptr;
 
     /* Fill data with pattern. */
-    if (datacheck) datacheck_fill(datasize, dataptr);
+    if (datacheck) datacheck_fill(toblocks(log->size),
+                                  &dataptr[HEADER_SIZE]);
 
     /* First element is marked as LOCKED, so only we can update. */
     dataptr[0] = LOCKED;
@@ -170,10 +171,12 @@ static void *send_thread(void *arg) {
     dataptr[2] = log->tag;
 
     /* And send data to other rank. */
-    MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
-                   MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE,
-                   MPI_REPLACE, mpi_window[log->subtype]);
-
+    int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
+                             MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE,
+                             MPI_REPLACE, mpi_window[log->subtype]);
+    if (ret != MPI_SUCCESS)
+      error("Failed to accumulate data: %d", ret);
+    
     /* Now we change the last element to UNLOCKED so that the remote end can
      * find out that the data has arrived. */
     BLOCKTYPE newval[1];
@@ -186,7 +189,7 @@ static void *send_thread(void *arg) {
 
     MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
 
-    //if (oldval[0] == dataptr[0]) {
+    // if (oldval[0] == dataptr[0]) {
     //  message("sent a message to %d/%d (%zd:%zd:%zd @ %zd)", log->otherrank,
     //          log->subtype, dataptr[0], oldval[0], newval[0],
     //          MESSAGE_SIZE * myrank);
@@ -201,7 +204,7 @@ static void *send_thread(void *arg) {
      * so we can use MPI_Test to get some local progression. */
     newval[0] = UNLOCKED;
     while (newval[0] != LOCKED) {
-      //MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
+      // MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
 
       MPI_Request request;
       MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank,
@@ -213,7 +216,8 @@ static void *send_thread(void *arg) {
       }
     }
 
-    message("sent and received... %d/%d/%d", k, nr_send, ((char *)log->data)[0]);
+    //message("sent and received... %d/%d/%d", k, nr_send,
+    //        ((char *)log->data)[0]);
     /* Ready the next send. */
   }
 
@@ -251,7 +255,8 @@ static void *recv_thread(void *arg) {
         if (todo_recv <= 0) break;
 
         //MPI_Win_flush(n, mpi_window[j]); // XXX emergency measure
-        BLOCKTYPE lockval = mpi_ptr[j][n * MESSAGE_SIZE];
+        BLOCKTYPE *dataptr = &mpi_ptr[j][n * MESSAGE_SIZE];
+        BLOCKTYPE lockval = dataptr[0];
 
         if (lockval == UNLOCKED) {
 
@@ -260,16 +265,19 @@ static void *recv_thread(void *arg) {
           for (int k = 0; k < nr_recv; k++) {
             struct mpiuse_log_entry *log = recv_queue[k];
             if (log != NULL && !log->done && log->otherrank == n &&
-                log->subtype == j) {
+                log->subtype == j &&
+                log->size == dataptr[1] && log->tag == dataptr[2]) {
               found = 1;
 
               //message("We have a ready message %d/%d at %zd: lockval %zd",
-              //        log->rank, log->subtype, log->otherrank * MESSAGE_SIZE,
+              //        log->rank, log->subtype, n * MESSAGE_SIZE,
               //        lockval);
 
               /* Check data sent data is unchanged and received data is as
                * expected. */
-              if (datacheck && !datacheck_test(log->size, &mpi_ptr[j][n * MESSAGE_SIZE])) {
+              if (datacheck &&
+                  !datacheck_test(toblocks(log->size),
+                                  &dataptr[HEADER_SIZE], n)) {
                 message("Data mismatch on completion");
               }
 
@@ -279,7 +287,7 @@ static void *recv_thread(void *arg) {
               atomic_dec(&todo_recv);
 
               /* Now ready for next message. */
-              mpi_ptr[j][n * MESSAGE_SIZE] = LOCKED;
+              dataptr[0] = LOCKED;
 
               break;
             }
@@ -358,8 +366,9 @@ static size_t pick_logs() {
   qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
   qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
 
-  if (verbose) message("maxsize = %zd, nr_send = %d, nr_recv = %d",
-                       maxsize, nr_send, nr_recv);
+  if (verbose)
+    message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
+            nr_recv);
   return maxsize;
 }
 
-- 
GitLab