From 9b5b4d58efa96f0a923f613b64171d78343d397b Mon Sep 17 00:00:00 2001
From: "Peter W. Draper" <p.w.draper@durham.ac.uk>
Date: Tue, 17 Sep 2019 14:36:32 +0100
Subject: [PATCH] Add timed variant that attempts to mimic the original
 injection time

---
 Makefile          |   8 +-
 mpistalls-timed.c | 306 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 313 insertions(+), 1 deletion(-)
 create mode 100644 mpistalls-timed.c

diff --git a/Makefile b/Makefile
index de4baeb..b202c5e 100644
--- a/Makefile
+++ b/Makefile
@@ -1,5 +1,11 @@
+all: mpistalls mpistalls-timed
+
 mpistalls: mpistalls.c mpiuse.c mpiuse.h atomic.h cycle.h
 	$(CC) -g -O0 -o mpistalls mpistalls.c mpiuse.c -I/usr/include/mpi -lmpi -lpthread
 
+mpistalls-timed: mpistalls-timed.c mpiuse.c mpiuse.h atomic.h cycle.h
+	$(CC) -g -O0 -o mpistalls-timed mpistalls-timed.c mpiuse.c -I/usr/include/mpi -lmpi -lpthread
+
 clean:
-	rm mpistalls
+	rm mpistalls mpistalls-timed
+
diff --git a/mpistalls-timed.c b/mpistalls-timed.c
new file mode 100644
index 0000000..e71f5f7
--- /dev/null
+++ b/mpistalls-timed.c
@@ -0,0 +1,306 @@
+/**
+ * Attempt to reproduce the asynchronous stalling that we see for medium
+ * busy steps in SWIFT.
+ *
+ * Timed injection version.
+ *
+ * So we need to setup a multithreaded MPI program that performs asynchronous
+ * exchanges of various data sizes and continuously checks the requests for
+ * completion. Also need timers to record the time taken by all this...
+ */
+#include <stdio.h>
+#include <mpi.h>
+#include <pthread.h>
+#include <stdlib.h>
+
+#include "atomic.h"
+#include "error.h"
+#include "mpiuse.h"
+
+/* Global: Our rank for all to see. */
+int myrank = -1;
+
+/* Integer types of send and recv tasks, must match log. */
+static const int task_type_send = 22;
+static const int task_type_recv = 23;
+
+/* Global communicators for each of the subtypes. */
+static const int task_subtype_count = 30; // Just some upper limit on subtype.
+static MPI_Comm subtypeMPI_comms[30];
+
+/* The local queues. */
+static struct mpiuse_log_entry **volatile reqs_queue;
+static int volatile ind_req = 0;
+static int volatile nr_reqs = 0;
+static int volatile injecting = 1;
+static struct mpiuse_log_entry **volatile recvs_queue;
+static int volatile nr_recvs = 0;
+static int volatile ind_recv = 0;
+static int volatile todo_recv = 0;
+static struct mpiuse_log_entry **volatile sends_queue;
+static int volatile nr_sends = 0;
+static int volatile ind_send = 0;
+static int volatile todo_send = 0;
+
+/* Delta tics that we do not wait for. XXX measure this. */
+const ticks looptics = 10000;
+
+/* CPU frequency of the machine that created the MPI log. */
+// XXX need to store this in the data file.
+static double clocks_cpufreq = 2194844448.0;
+
+/* Injection thread, initiates MPI_Isend and MPI_Irecv requests at various
+ * times. */
+static void *inject_thread(void *arg) {
+  message("%d: injection thread starts", *((int *)arg));
+
+  /* Ticks of our last attempt. */
+  ticks basetic = reqs_queue[0]->tic;
+
+  while (ind_req < nr_reqs) {
+    struct mpiuse_log_entry *log = reqs_queue[ind_req];
+
+    /* Expect time between this request and the previous one. */
+    ticks dt = log->tic - basetic;
+    double ddt = (double)log->tic - (double)basetic;
+    ticks oldbasetic = basetic;
+    basetic = log->tic;
+
+    /* Guess some time below which we should not attempt to wait, otherwise
+     * we'll start to overrun, and just inject the next call if we are below
+     * that (basically how many ticks this loop takes without any
+     * waiting). Otherwise wait a while. */
+    if (dt > looptics) {
+      struct timespec sleep;
+      sleep.tv_sec = 0;
+
+      /* Remember to be fair and remove the looptics, then convert to
+       * nanoseconds. */
+      double ns = (double)(dt - looptics) / clocks_cpufreq * 1.0e9;
+      if (ns < 1.0e9) {
+        sleep.tv_nsec = (long) ns;
+      } else {
+        /* Wait more than one second. Must be an error, but complain and
+         * continue.  */
+        sleep.tv_nsec = (long) 1.0e9;
+        message("wait greater than one second");
+      }
+
+      //message("injecting wait of: %ld ns", sleep.tv_nsec);
+      nanosleep(&sleep, NULL);
+    }
+
+    // Differences to SWIFT: MPI_BYTE might overflow, should use MPI_Type(?).
+    int err = 0;
+    if (log->type == task_type_send) {
+      err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank,
+                      log->tag, subtypeMPI_comms[log->subtype], &log->req);
+
+      /* Add a new send request. */
+      int ind = atomic_inc(&nr_sends);
+      sends_queue[ind] = log;
+      atomic_inc(&todo_send);
+
+    } else {
+      err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank,
+                      log->tag, subtypeMPI_comms[log->subtype], &log->req);
+
+      /* Add a new recv request. */
+      int ind = atomic_inc(&nr_recvs);
+      recvs_queue[ind] = log;
+      atomic_inc(&todo_recv);
+    }
+    if (err != MPI_SUCCESS) error("Failed to activate send or recv");
+
+    ind_req++;
+  }
+  message("%d injections completed, sends = %d, recvs = %d", ind_req,
+          nr_sends, nr_recvs);
+  message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
+  atomic_dec(&injecting);
+  return NULL;
+}
+
+/* Send thread, checks if MPI_Isend requests have completed. */
+static void *send_thread(void *arg) {
+  message("%d: send thread starts (%d)", *((int *)arg), injecting);
+
+  int res;
+  MPI_Status stat;
+
+  // Need a test that only exits when requests are all inserted and we have
+  // emptied our queue. */
+  size_t attempts = 0;
+  while (injecting || (!injecting && todo_send > 0)) {
+    int nsends = nr_sends;
+    for (int k = 0; k < nsends; k++) {
+      struct mpiuse_log_entry *log = sends_queue[k];
+      if (log != NULL) {
+        attempts++;
+        int err = MPI_Test(&log->req, &res, &stat);
+        if (err != MPI_SUCCESS) {
+          error("MPI_Test call failed");
+        }
+        if (res) {
+          /* Done, clean up. */
+          //message("MPI_Test successful");
+          free(log->data);
+          sends_queue[k] = NULL;
+          atomic_dec(&todo_send);
+        }
+      }
+    }
+  }
+  message("sends completed, required %zd attempts (left: %d)", attempts,
+          todo_send);
+  return NULL;
+}
+
+/* Recv thread, checks if MPI_Irecv requests have completed. */
+static void *recv_thread(void *arg) {
+  message("%d: recv thread starts", *((int *)arg));
+
+  int res;
+  MPI_Status stat;
+
+  size_t attempts = 0;
+  while (injecting || (!injecting && todo_recv > 0)) {
+    int nrecvs = nr_recvs;
+    for (int k = 0; k < nrecvs; k++) {
+      struct mpiuse_log_entry *log = recvs_queue[k];
+      if (log != NULL) {
+        attempts++;
+        int err = MPI_Test(&log->req, &res, &stat);
+        if (err != MPI_SUCCESS) {
+          error("MPI_Test call failed");
+        }
+        if (res) {
+          /* Done, clean up. */
+          //message("MPI_Test successful");
+          free(log->data);
+          recvs_queue[k] = NULL;
+          atomic_dec(&todo_recv);
+        }
+      }
+    }
+  }
+
+  message("recvs completed, required %zd attempts (left: %d)", attempts,
+          todo_recv);
+  return NULL;
+}
+
+/* Comparison function for logged times. */
+static int cmp_logs(const void *p1, const void *p2) {
+  struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
+  struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
+
+  /* Large unsigned values, so take care. */
+  if (l1->tic > l2->tic)
+    return 1;
+  if (l1->tic < l2->tic)
+    return -1;
+  return 0;
+}
+
+/* Pick out the relevant logging data for our rank, i.e. all activations of
+ * sends and recvs. */
+static void pick_logs(void) {
+  size_t nlogs = mpiuse_nr_logs();
+  int nranks = mpiuse_nr_ranks();
+
+  /* Duplicate of logs. XXX could loop twice to reduce memory use if needed. */
+  reqs_queue = (struct mpiuse_log_entry **)
+    malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
+  nr_reqs= 0;
+  sends_queue = (struct mpiuse_log_entry **)
+    malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
+  nr_sends= 0;
+  recvs_queue = (struct mpiuse_log_entry **)
+    malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
+  nr_recvs = 0;
+
+  for (int k = 0; k < nlogs; k++) {
+    struct mpiuse_log_entry *log = mpiuse_get_log(k);
+    if (log->rank == myrank && log->activation) {
+      if (log->type == task_type_send || log->type == task_type_recv) {
+
+        /* Allocate space for data. */
+        log->data = calloc(log->size, 1);
+
+        /* And keep this log. */
+        reqs_queue[nr_reqs] = log;
+        nr_reqs++;
+
+      } else {
+        error("task type '%d' is not a known send or recv task", log->type);
+      }
+    }
+  }
+
+  /* Sort into increasing time. */
+  qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
+}
+
+int main(int argc, char *argv[]) {
+
+  /* First we read the SWIFT MPI logger output that defines the communcations
+   * we will undertake and the time differences between injections into the
+   * queues. */
+  mpiuse_log_restore("testdata/mpiuse_report-step1.dat");
+  int nranks = mpiuse_nr_ranks();
+
+  /* Initiate MPI. */
+  int prov = 0;
+  int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
+  if (res != MPI_SUCCESS)
+    error("Call to MPI_Init_thread failed with error %i.", res);
+
+  int nr_nodes = 0;
+  res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
+  if (res != MPI_SUCCESS)
+    error("MPI_Comm_size failed with error %i.", res);
+
+  /* This should match the expected size. */
+  if (nr_nodes != nranks)
+    error("The number of MPI ranks %d does not match the expected value %d",
+          nranks, nr_nodes);
+
+  res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
+  if (res != MPI_SUCCESS)
+    error("Call to MPI_Comm_rank failed with error %i.", res);
+
+  /* Create communicators for each subtype of the tasks. */
+  for (int i = 0; i < task_subtype_count; i++) {
+    MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
+  }
+
+  /* Each rank requires its own queue, so extract them. */
+  pick_logs();
+
+  message("Start of MPI tests");
+
+  /* Make three threads, one for injecting tasks and two to check for
+   * completions of the sends and recv independently. */
+  pthread_t injectthread;
+  if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
+    error("Failed to create injection thread.");
+  pthread_t sendthread;
+  if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
+    error("Failed to create send thread.");
+  pthread_t recvthread;
+  if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
+    error("Failed to create recv thread.");
+
+  /* Wait until all threads have exited and all MPI requests have completed. */
+  pthread_join(injectthread, NULL);
+  pthread_join(sendthread, NULL);
+  pthread_join(recvthread, NULL);
+
+  /* Shutdown MPI. */
+  res = MPI_Finalize();
+  if (res != MPI_SUCCESS)
+    error("call to MPI_Finalize failed with error %i.", res);
+
+  return 0;
+}
-- 
GitLab