diff --git a/Makefile b/Makefile
index de4baeb7d220b451dd2e147edfcd5e897a2d8502..b202c5ed57d1540c2aa3cffbd8ec57129bc58fe6 100644
--- a/Makefile
+++ b/Makefile
@@ -1,5 +1,11 @@
+all: mpistalls mpistalls-timed
+
mpistalls: mpistalls.c mpiuse.c mpiuse.h atomic.h cycle.h
$(CC) -g -O0 -o mpistalls mpistalls.c mpiuse.c -I/usr/include/mpi -lmpi -lpthread
+mpistalls-timed: mpistalls-timed.c mpiuse.c mpiuse.h atomic.h cycle.h
+ $(CC) -g -O0 -o mpistalls-timed mpistalls-timed.c mpiuse.c -I/usr/include/mpi -lmpi -lpthread
+
clean:
- rm mpistalls
+ rm mpistalls mpistalls-timed
+
diff --git a/mpistalls-timed.c b/mpistalls-timed.c
new file mode 100644
index 0000000000000000000000000000000000000000..e71f5f7ffbd24cc20250ce62db42dbcdc0996ef4
--- /dev/null
+++ b/mpistalls-timed.c
@@ -0,0 +1,306 @@
+/**
+ * Attempt to reproduce the asynchronous stalling that we see for medium
+ * busy steps in SWIFT.
+ *
+ * Timed injection version.
+ *
+ * So we need to setup a multithreaded MPI program that performs asynchronous
+ * exchanges of various data sizes and continuously checks the requests for
+ * completion. Also need timers to record the time taken by all this...
+ */
+#include <stdio.h>
+#include <mpi.h>
+#include <pthread.h>
+#include <stdlib.h>
+
+#include "atomic.h"
+#include "error.h"
+#include "mpiuse.h"
+
+/* Global: Our rank for all to see. */
+int myrank = -1;
+
+/* Integer types of send and recv tasks, must match log. */
+static const int task_type_send = 22;
+static const int task_type_recv = 23;
+
+/* Global communicators for each of the subtypes. */
+static const int task_subtype_count = 30; // Just some upper limit on subtype.
+static MPI_Comm subtypeMPI_comms[30];
+
+/* The local queues. */
+static struct mpiuse_log_entry **volatile reqs_queue;
+static int volatile ind_req = 0;
+static int volatile nr_reqs = 0;
+static int volatile injecting = 1;
+static struct mpiuse_log_entry **volatile recvs_queue;
+static int volatile nr_recvs = 0;
+static int volatile ind_recv = 0;
+static int volatile todo_recv = 0;
+static struct mpiuse_log_entry **volatile sends_queue;
+static int volatile nr_sends = 0;
+static int volatile ind_send = 0;
+static int volatile todo_send = 0;
+
+/* Delta tics that we do not wait for. XXX measure this. */
+const ticks looptics = 10000;
+
+/* CPU frequency of the machine that created the MPI log. */
+// XXX need to store this in the data file.
+static double clocks_cpufreq = 2194844448.0;
+
+/* Injection thread, initiates MPI_Isend and MPI_Irecv requests at various
+ * times. */
+static void *inject_thread(void *arg) {
+ message("%d: injection thread starts", *((int *)arg));
+
+ /* Ticks of our last attempt. */
+ ticks basetic = reqs_queue[0]->tic;
+
+ while (ind_req < nr_reqs) {
+ struct mpiuse_log_entry *log = reqs_queue[ind_req];
+
+ /* Expect time between this request and the previous one. */
+ ticks dt = log->tic - basetic;
+ double ddt = (double)log->tic - (double)basetic;
+ ticks oldbasetic = basetic;
+ basetic = log->tic;
+
+ /* Guess some time below which we should not attempt to wait, otherwise
+ * we'll start to overrun, and just inject the next call if we are below
+ * that (basically how many ticks this loop takes without any
+ * waiting). Otherwise wait a while. */
+ if (dt > looptics) {
+ struct timespec sleep;
+ sleep.tv_sec = 0;
+
+ /* Remember to be fair and remove the looptics, then convert to
+ * nanoseconds. */
+ double ns = (double)(dt - looptics) / clocks_cpufreq * 1.0e9;
+ if (ns < 1.0e9) {
+ sleep.tv_nsec = (long) ns;
+ } else {
+ /* Wait more than one second. Must be an error, but complain and
+ * continue. */
+ sleep.tv_nsec = (long) 1.0e9;
+ message("wait greater than one second");
+ }
+
+ //message("injecting wait of: %ld ns", sleep.tv_nsec);
+ nanosleep(&sleep, NULL);
+ }
+
+ // Differences to SWIFT: MPI_BYTE might overflow, should use MPI_Type(?).
+ int err = 0;
+ if (log->type == task_type_send) {
+ err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank,
+ log->tag, subtypeMPI_comms[log->subtype], &log->req);
+
+ /* Add a new send request. */
+ int ind = atomic_inc(&nr_sends);
+ sends_queue[ind] = log;
+ atomic_inc(&todo_send);
+
+ } else {
+ err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank,
+ log->tag, subtypeMPI_comms[log->subtype], &log->req);
+
+ /* Add a new recv request. */
+ int ind = atomic_inc(&nr_recvs);
+ recvs_queue[ind] = log;
+ atomic_inc(&todo_recv);
+ }
+ if (err != MPI_SUCCESS) error("Failed to activate send or recv");
+
+ ind_req++;
+ }
+ message("%d injections completed, sends = %d, recvs = %d", ind_req,
+ nr_sends, nr_recvs);
+ message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
+ atomic_dec(&injecting);
+ return NULL;
+}
+
+/* Send thread, checks if MPI_Isend requests have completed. */
+static void *send_thread(void *arg) {
+ message("%d: send thread starts (%d)", *((int *)arg), injecting);
+
+ int res;
+ MPI_Status stat;
+
+ // Need a test that only exits when requests are all inserted and we have
+ // emptied our queue. */
+ size_t attempts = 0;
+ while (injecting || (!injecting && todo_send > 0)) {
+ int nsends = nr_sends;
+ for (int k = 0; k < nsends; k++) {
+ struct mpiuse_log_entry *log = sends_queue[k];
+ if (log != NULL) {
+ attempts++;
+ int err = MPI_Test(&log->req, &res, &stat);
+ if (err != MPI_SUCCESS) {
+ error("MPI_Test call failed");
+ }
+ if (res) {
+ /* Done, clean up. */
+ //message("MPI_Test successful");
+ free(log->data);
+ sends_queue[k] = NULL;
+ atomic_dec(&todo_send);
+ }
+ }
+ }
+ }
+ message("sends completed, required %zd attempts (left: %d)", attempts,
+ todo_send);
+ return NULL;
+}
+
+/* Recv thread, checks if MPI_Irecv requests have completed. */
+static void *recv_thread(void *arg) {
+ message("%d: recv thread starts", *((int *)arg));
+
+ int res;
+ MPI_Status stat;
+
+ size_t attempts = 0;
+ while (injecting || (!injecting && todo_recv > 0)) {
+ int nrecvs = nr_recvs;
+ for (int k = 0; k < nrecvs; k++) {
+ struct mpiuse_log_entry *log = recvs_queue[k];
+ if (log != NULL) {
+ attempts++;
+ int err = MPI_Test(&log->req, &res, &stat);
+ if (err != MPI_SUCCESS) {
+ error("MPI_Test call failed");
+ }
+ if (res) {
+ /* Done, clean up. */
+ //message("MPI_Test successful");
+ free(log->data);
+ recvs_queue[k] = NULL;
+ atomic_dec(&todo_recv);
+ }
+ }
+ }
+ }
+
+ message("recvs completed, required %zd attempts (left: %d)", attempts,
+ todo_recv);
+ return NULL;
+}
+
+/* Comparison function for logged times. */
+static int cmp_logs(const void *p1, const void *p2) {
+ struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
+ struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
+
+ /* Large unsigned values, so take care. */
+ if (l1->tic > l2->tic)
+ return 1;
+ if (l1->tic < l2->tic)
+ return -1;
+ return 0;
+}
+
+/* Pick out the relevant logging data for our rank, i.e. all activations of
+ * sends and recvs. */
+static void pick_logs(void) {
+ size_t nlogs = mpiuse_nr_logs();
+ int nranks = mpiuse_nr_ranks();
+
+ /* Duplicate of logs. XXX could loop twice to reduce memory use if needed. */
+ reqs_queue = (struct mpiuse_log_entry **)
+ malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
+ nr_reqs= 0;
+ sends_queue = (struct mpiuse_log_entry **)
+ malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
+ nr_sends= 0;
+ recvs_queue = (struct mpiuse_log_entry **)
+ malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
+ nr_recvs = 0;
+
+ for (int k = 0; k < nlogs; k++) {
+ struct mpiuse_log_entry *log = mpiuse_get_log(k);
+ if (log->rank == myrank && log->activation) {
+ if (log->type == task_type_send || log->type == task_type_recv) {
+
+ /* Allocate space for data. */
+ log->data = calloc(log->size, 1);
+
+ /* And keep this log. */
+ reqs_queue[nr_reqs] = log;
+ nr_reqs++;
+
+ } else {
+ error("task type '%d' is not a known send or recv task", log->type);
+ }
+ }
+ }
+
+ /* Sort into increasing time. */
+ qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
+}
+
+int main(int argc, char *argv[]) {
+
+ /* First we read the SWIFT MPI logger output that defines the communcations
+ * we will undertake and the time differences between injections into the
+ * queues. */
+ mpiuse_log_restore("testdata/mpiuse_report-step1.dat");
+ int nranks = mpiuse_nr_ranks();
+
+ /* Initiate MPI. */
+ int prov = 0;
+ int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
+ if (res != MPI_SUCCESS)
+ error("Call to MPI_Init_thread failed with error %i.", res);
+
+ int nr_nodes = 0;
+ res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
+ if (res != MPI_SUCCESS)
+ error("MPI_Comm_size failed with error %i.", res);
+
+ /* This should match the expected size. */
+ if (nr_nodes != nranks)
+ error("The number of MPI ranks %d does not match the expected value %d",
+ nranks, nr_nodes);
+
+ res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
+ if (res != MPI_SUCCESS)
+ error("Call to MPI_Comm_rank failed with error %i.", res);
+
+ /* Create communicators for each subtype of the tasks. */
+ for (int i = 0; i < task_subtype_count; i++) {
+ MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
+ }
+
+ /* Each rank requires its own queue, so extract them. */
+ pick_logs();
+
+ message("Start of MPI tests");
+
+ /* Make three threads, one for injecting tasks and two to check for
+ * completions of the sends and recv independently. */
+ pthread_t injectthread;
+ if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
+ error("Failed to create injection thread.");
+ pthread_t sendthread;
+ if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
+ error("Failed to create send thread.");
+ pthread_t recvthread;
+ if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
+ error("Failed to create recv thread.");
+
+ /* Wait until all threads have exited and all MPI requests have completed. */
+ pthread_join(injectthread, NULL);
+ pthread_join(sendthread, NULL);
+ pthread_join(recvthread, NULL);
+
+ /* Shutdown MPI. */
+ res = MPI_Finalize();
+ if (res != MPI_SUCCESS)
+ error("call to MPI_Finalize failed with error %i.", res);
+
+ return 0;
+}