From 18da8d251aeaecc4f240ea9b52dd3c09e7ed48f6 Mon Sep 17 00:00:00 2001
From: "Peter W. Draper" <p.w.draper@durham.ac.uk>
Date: Tue, 24 Nov 2020 18:10:42 +0000
Subject: [PATCH] Runs to first step, some issue with initialisation of the
 mpicache and engine_step remains

---
 src/mpicache.c | 83 +++++++++++++++++++++++++++++---------------------
 src/task.c     | 20 +++++++++---
 2 files changed, 64 insertions(+), 39 deletions(-)

diff --git a/src/mpicache.c b/src/mpicache.c
index daf7d7eb6..4e0a419e5 100644
--- a/src/mpicache.c
+++ b/src/mpicache.c
@@ -211,58 +211,73 @@ void mpicache_apply(struct mpicache *cache) {
   }
   cache->nr_entries = nr_uniq;
 
+  //for (size_t k = 0; k < cache->nr_entries; k++) {
+  //  task = cache->entries[k].task;
+  //  message("using node = %d, size = %zd, type = %d, subtype = %d, tag = %lld",
+  //          cache->entries[k].node, task->size, task->type, task->subtype,
+  //          task->flags);
+  //}
+
   /* Now we go through the entries and generate the offsets. */
-  int node = -1;
-  int subtype = -1;
   cache->window_sizes = calloc(26, sizeof(int));  // 26 *task_subtype_count?
   cache->window_nodes = calloc(26, sizeof(int));
   cache->window_subtypes = calloc(26, sizeof(int));
   cache->window_size = 26;
   cache->nr_windows = 0;
-  for (size_t k = 0; k < cache->nr_entries; k++) {
 
-    /* New node started, so we loop until the start of the next one. */
-    node = cache->entries[k].node;
-    for (; k < cache->nr_entries; k++) {
-      if (cache->entries[k].node != node) break;
+  /* Preload the first entry into the first window. */
+  cache->window_sizes[cache->nr_windows] = 0;
+  cache->window_nodes[cache->nr_windows] = cache->entries[0].node;
+  cache->window_subtypes[cache->nr_windows] = cache->entries[0].task->subtype;
 
-      /* New subtype started, so we start a new set of offsets. */
-      subtype = cache->entries[k].task->subtype;
+  size_t offset = 0;
+  for (size_t k = 0; k < cache->nr_entries; k++) {
 
+    /* New node started, so start the accumulation. */
+    int node = cache->entries[k].node;
+    int subtype = cache->entries[k].task->subtype;
+
+    /* Window sizes are in bytes. */
+    cache->window_sizes[cache->nr_windows] += cache->entries[k].task->size +
+      scheduler_osmpi_tobytes(scheduler_osmpi_header_size);
+
+    /* Offsets are in blocks. */
+    cache->entries[k].task->offset = offset;
+    offset += scheduler_osmpi_toblocks(cache->entries[k].task->size) +
+      scheduler_osmpi_header_size;
+
+    /* Check next task to see if this breaks the run of subtypes or nodes. */
+    if (k < (cache->nr_entries - 1) && 
+        (cache->entries[k+1].task->subtype != subtype ||
+         cache->entries[k+1].node != node)) {
+      
+      /* Yes, so we start a new window. */
+      cache->nr_windows++;
       if (cache->nr_windows == cache->window_size) {
         cache->window_size += 26;
         cache->window_sizes =
-            realloc(cache->window_sizes, cache->window_size * sizeof(int));
+          realloc(cache->window_sizes, cache->window_size * sizeof(int));
         cache->window_nodes =
-            realloc(cache->window_nodes, cache->window_size * sizeof(int));
+          realloc(cache->window_nodes, cache->window_size * sizeof(int));
         cache->window_subtypes =
-            realloc(cache->window_subtypes, cache->window_size * sizeof(int));
+          realloc(cache->window_subtypes, cache->window_size * sizeof(int));
       }
+
       cache->window_sizes[cache->nr_windows] = 0;
-      cache->window_nodes[cache->nr_windows] = node;
-      cache->window_subtypes[cache->nr_windows] = subtype;
-
-      size_t offset = 0;
-      for (; k < cache->nr_entries; k++) {
-        task = cache->entries[k].task;
-        if (task->subtype != subtype) break;
-
-        /* Offsets are in osmpi blocks, but window sizes are in bytes. */
-        cache->window_sizes[cache->nr_windows] += task->size +
-          scheduler_osmpi_tobytes(scheduler_osmpi_header_size);
-        task->offset = offset;
-
-        /* Make room for this message and the control header next loop. */
-        offset += scheduler_osmpi_toblocks(task->size) +
-                  scheduler_osmpi_header_size;
-      }
-      message("window: %d, node: %d, subtype: %s, size: %d", cache->nr_windows,
-              cache->window_nodes[cache->nr_windows],
-              subtaskID_names[cache->window_subtypes[cache->nr_windows]],
-              cache->window_sizes[cache->nr_windows]);
-      cache->nr_windows++;
+      cache->window_nodes[cache->nr_windows] = cache->entries[k+1].node;
+      cache->window_subtypes[cache->nr_windows] = cache->entries[k+1].task->subtype;
+      offset = 0;
     }
   }
+  cache->nr_windows++;
+
+  //for (int k = 0; k < cache->nr_windows; k++) {
+  //  message("window %d %d %d %d", k, 
+  //          cache->window_sizes[k],
+  //          cache->window_nodes[k],
+  //          cache->window_subtypes[k]);
+  //}
+
 
 #endif /* WITH_MPI */
 }
diff --git a/src/task.c b/src/task.c
index b468bce5c..9c027c79e 100644
--- a/src/task.c
+++ b/src/task.c
@@ -652,9 +652,9 @@ int task_lock(struct scheduler *s, struct task *t) {
       if (s->space->e->verbose)
         message(
             "Sending message to %d from %d subtype: %d, tag %zd, size %zd"
-            " (cf %lld %zd)",
+            " (cf %lld %zd), offset: %zd",
             cj->nodeID, ci->nodeID, subtype, dataptr[2], dataptr[1], t->flags,
-            t->size);
+            t->size, t->offset);
 
       /* And send to the destination rank. Could put this into the task? */
       union key {
@@ -669,7 +669,7 @@ int task_lock(struct scheduler *s, struct task *t) {
         error("Failed to lookup osmpi window index");
       }
       int window_index = child->value;
-      message("picked window: %d, node: %d, subtype: %d, size: %d", window_index,
+      message("send picked window: %d, node: %d, subtype: %d, size: %d", window_index,
               s->send_mpicache->window_nodes[window_index],
               s->send_mpicache->window_subtypes[window_index],
               s->send_mpicache->window_sizes[window_index]);
@@ -741,6 +741,10 @@ int task_lock(struct scheduler *s, struct task *t) {
         error("Failed to lookup osmpi window index");
       }
       int window_index = child->value;
+      message("recv picked window: %d, node: %d, subtype: %d, size: %d", window_index,
+              s->send_mpicache->window_nodes[window_index],
+              s->send_mpicache->window_subtypes[window_index],
+              s->send_mpicache->window_sizes[window_index]);
 
       volatile scheduler_osmpi_blocktype *dataptr =
           &(s->osmpi_ptrs[window_index])[t->offset];
@@ -751,8 +755,9 @@ int task_lock(struct scheduler *s, struct task *t) {
         if (t->flags == (int)dataptr[2] && t->size == dataptr[1]) {
 
           if (s->space->e->verbose)
-            message("Accepted from %d, subtype: %d, tag: %lld, size %zd",
-                    ci->nodeID, subtype, t->flags, t->size);
+            message("Accepted from %d, subtype: %d, tag: %lld, size %zd,"
+                    " offset %zd", ci->nodeID, subtype, t->flags,
+                    t->size, t->offset);
 
           /* And log deactivation, if logging enabled. */
           mpiuse_log_allocation(type, subtype, &t->buff, 0, 0, 0, 0);
@@ -773,6 +778,11 @@ int task_lock(struct scheduler *s, struct task *t) {
           if (ret != MPI_SUCCESS) mpi_error(ret, "MPI_Iprobe failed");
 
           return 1;
+        } else {
+          message("missed remote send at our offset %zd from %d, "
+                  "subtype: %d, tag: %lld, size %zd, see %zd/%zd/%zd",
+                  t->offset, ci->nodeID, subtype, t->flags,
+                  t->size, dataptr[2], dataptr[1], dataptr[0]);
         }
       }
       return 0;
-- 
GitLab