From e64712c9728c04f8786b38c1c83cd33cc31e62a6 Mon Sep 17 00:00:00 2001
From: "Peter W. Draper" <p.w.draper@durham.ac.uk>
Date: Thu, 19 May 2016 16:25:25 +0100
Subject: [PATCH] Change affinity so that cores from different numa nodes are
 selected in the cpuid map. These are now offered in numa node order rather
 than core order. This means that hyperthreads should be picked last.

With this fix we look to work well with Intel MPI using multiple ranks
on the same workstation, but PLATFORM MPI remains uncooperative.
To help with that a new command-line option -a has been added to
disable processor affinity, even when the numa policy is set
and available.
---
 examples/main.c |  16 +++++--
 src/engine.c    | 119 +++++++++++++++++++++++++++++++++---------------
 src/engine.h    |   2 +-
 3 files changed, 96 insertions(+), 41 deletions(-)

diff --git a/examples/main.c b/examples/main.c
index b607791729..ca3a696816 100644
--- a/examples/main.c
+++ b/examples/main.c
@@ -52,6 +52,7 @@ void print_help_message() {
 
   printf("\nUsage: swift [OPTION] PARAMFILE\n\n");
   printf("Valid options are:\n");
+  printf("  %2s %8s %s\n", "-a", "[01]", "Use processor affinity");
   printf("  %2s %8s %s\n", "-c", "", "Run with cosmological time integration");
   printf(
       "  %2s %8s %s\n", "-d", "",
@@ -132,6 +133,7 @@ int main(int argc, char *argv[]) {
   /* Welcome to SWIFT, you made the right choice */
   if (myrank == 0) greetings();
 
+  int with_aff = 1;
   int dry_run = 0;
   int dump_tasks = 0;
   int with_cosmology = 0;
@@ -146,7 +148,14 @@ int main(int argc, char *argv[]) {
 
   /* Parse the parameters */
   int c;
-  while ((c = getopt(argc, argv, "cdef:gGhst:v:y:")) != -1) switch (c) {
+  while ((c = getopt(argc, argv, "a:cdef:gGhst:v:y:")) != -1) switch (c) {
+      case 'a':
+        if (sscanf(optarg, "%d", &with_aff) != 1) {
+          if (myrank == 0) printf("Error parsing affinity switch (-a).\n");
+          if (myrank == 0) print_help_message();
+          return 1;
+        }
+        break;
       case 'c':
         with_cosmology = 1;
         break;
@@ -411,8 +420,9 @@ int main(int argc, char *argv[]) {
   /* Initialize the engine with the space and policies. */
   if (myrank == 0) clocks_gettime(&tic);
   struct engine e;
-  engine_init(&e, &s, params, nr_nodes, myrank, nr_threads, engine_policies,
-              talking, &prog_const, &hydro_properties, &potential);
+  engine_init(&e, &s, params, nr_nodes, myrank, nr_threads, with_aff,
+              engine_policies, talking, &prog_const, &hydro_properties,
+              &potential);
   if (myrank == 0) {
     clocks_gettime(&toc);
     message("engine_init took %.3f %s.", clocks_diff(&tic, &toc),
diff --git a/src/engine.c b/src/engine.c
index 31cc9abd62..9751b93fc8 100644
--- a/src/engine.c
+++ b/src/engine.c
@@ -2445,6 +2445,7 @@ void engine_unpin() {
  * @param nr_nodes The number of MPI ranks.
  * @param nodeID The MPI rank of this node.
  * @param nr_threads The number of threads per MPI rank.
+ * @param with_aff use processor affinity, if supported.
  * @param policy The queuing policy to use.
  * @param verbose Is this #engine talkative ?
  * @param physical_constants The #phys_const used for this run.
@@ -2454,7 +2455,7 @@ void engine_unpin() {
 
 void engine_init(struct engine *e, struct space *s,
                  const struct swift_params *params, int nr_nodes, int nodeID,
-                 int nr_threads, int policy, int verbose,
+                 int nr_threads, int with_aff, int policy, int verbose,
                  const struct phys_const *physical_constants,
                  const struct hydro_props *hydro,
                  const struct external_potential *potential) {
@@ -2527,7 +2528,7 @@ void engine_init(struct engine *e, struct space *s,
     buf[j] = CPU_ISSET(j, entry_affinity) ? '1' : '0';
   }
 
-  if (verbose) message("Affinity at entry: %s", buf);
+  if (verbose && with_aff) message("Affinity at entry: %s", buf);
 
   int *cpuid = malloc(nr_affinity_cores * sizeof(int));
   cpu_set_t cpuset;
@@ -2541,51 +2542,87 @@ void engine_init(struct engine *e, struct space *s,
     skip = c + 1;
   }
 
+  if (with_aff) {
+
 #if defined(HAVE_LIBNUMA) && defined(_GNU_SOURCE)
-  if ((policy & engine_policy_cputight) != engine_policy_cputight) {
-    /* Ascending NUMA distance. Bubblesort(!) for stable equidistant CPUs. */
-    if (numa_available() >= 0) {
-      if (nodeID == 0) message("prefer NUMA-local CPUs");
-
-      const int home = numa_node_of_cpu(sched_getcpu());
-      int done = 0;
-
-      while (!done) {
-        done = 1;
-        for (int i = 1; i < nr_affinity_cores; i++) {
-          const int node_a = numa_node_of_cpu(cpuid[i - 1]);
-          const int node_b = numa_node_of_cpu(cpuid[i]);
-
-          const int swap =
-              numa_distance(home, node_a) > numa_distance(home, node_b);
-
-          if (swap) {
-            const int t = cpuid[i - 1];
-            cpuid[i - 1] = cpuid[i];
-            cpuid[i] = t;
-            done = 0;
+    if ((policy & engine_policy_cputight) != engine_policy_cputight) {
+
+      if (numa_available() >= 0) {
+        if (nodeID == 0) message("prefer NUMA-distant CPUs");
+
+        /* Get list of numa nodes of all available cores. */
+        int *nodes = malloc( nr_affinity_cores * sizeof(int));
+        int nnodes = 0;
+        for (int i = 0; i < nr_affinity_cores; i++) {
+          nodes[i] = numa_node_of_cpu(cpuid[i]);
+          if (nodes[i] > nnodes) nnodes = nodes[i];
+        }
+        nnodes += 1;
+
+        /* Count cores per node. */
+        int *core_counts = malloc( nnodes * sizeof(int));
+        for (int i = 0; i < nr_affinity_cores; i++) {
+          core_counts[nodes[i]] = 0;
+        }
+        for (int i = 0; i < nr_affinity_cores; i++) {
+          core_counts[nodes[i]] += 1;
+        }
+
+        /* Index cores within each node. */
+        int *core_indices = malloc( nr_affinity_cores * sizeof(int));
+        for (int i = nr_affinity_cores - 1; i >= 0; i--) {
+          core_indices[i] = core_counts[nodes[i]];
+          core_counts[nodes[i]] -= 1;
+        }
+
+        /* Now sort so that we pick adjacent cpuids from different nodes
+         * by sorting internal node core indices. */
+        int done = 0;
+        while (!done) {
+          done = 1;
+          for (int i = 1; i < nr_affinity_cores; i++) {
+            if ( core_indices[i] < core_indices[i-1] ) {
+              int t = cpuid[i-1];
+              cpuid[i-1] = cpuid[i];
+              cpuid[i] = t;
+
+              t = core_indices[i-1];
+              core_indices[i-1] = core_indices[i];
+              core_indices[i] = t;
+              done = 0;
+            }
           }
         }
+
+        free(nodes);
+        free(core_counts);
+        free(core_indices);
       }
     }
-  }
 #endif
+  }
+  else {
+    if (nodeID == 0) message("no processor affinity used");
+
+  }/* with_aff */
 
   /* Avoid (unexpected) interference between engine and runner threads. We can
    * do this once we've made at least one call to engine_entry_affinity and
    * maybe numa_node_of_cpu(sched_getcpu()), even if the engine isn't already
-   * pinned. */
+   * pinned. Also unpin this when asked to not pin at all (!with_aff). */
   engine_unpin();
 #endif
 
+  if (with_aff) {
 #ifdef WITH_MPI
-  printf("[%04i] %s engine_init: cpu map is [ ", nodeID,
-         clocks_get_timesincestart());
+    printf("[%04i] %s engine_init: cpu map is [ ", nodeID,
+           clocks_get_timesincestart());
 #else
-  printf("%s engine_init: cpu map is [ ", clocks_get_timesincestart());
+    printf("%s engine_init: cpu map is [ ", clocks_get_timesincestart());
 #endif
-  for (int i = 0; i < nr_affinity_cores; i++) printf("%i ", cpuid[i]);
-  printf("].\n");
+    for (int i = 0; i < nr_affinity_cores; i++) printf("%i ", cpuid[i]);
+    printf("].\n");
+  }
 
   /* Are we doing stuff in parallel? */
   if (nr_nodes > 1) {
@@ -2730,12 +2767,13 @@ void engine_init(struct engine *e, struct space *s,
       error("Failed to create runner thread.");
 
     /* Try to pin the runner to a given core */
-    if ((e->policy & engine_policy_setaffinity) == engine_policy_setaffinity) {
+    if (with_aff && (e->policy & engine_policy_setaffinity) == engine_policy_setaffinity) {
 #if defined(HAVE_SETAFFINITY)
 
       /* Set a reasonable queue ID. */
       int coreid = k % nr_affinity_cores;
       e->runners[k].cpuid = cpuid[coreid];
+
       if (nr_queues < e->nr_threads)
         e->runners[k].qid = cpuid[coreid] * nr_queues / nr_affinity_cores;
       else
@@ -2757,15 +2795,22 @@ void engine_init(struct engine *e, struct space *s,
       e->runners[k].cpuid = k;
       e->runners[k].qid = k * nr_queues / e->nr_threads;
     }
-    if (verbose)
-      message("runner %i on cpuid=%i with qid=%i.", e->runners[k].id,
-              e->runners[k].cpuid, e->runners[k].qid);
+    if (verbose) {
+      if (with_aff) 
+        message("runner %i on cpuid=%i with qid=%i.", e->runners[k].id,
+                e->runners[k].cpuid, e->runners[k].qid);
+      else
+        message("runner %i using qid=%i no cpuid.", e->runners[k].id,
+                e->runners[k].qid);
+    }
   }
 
 /* Free the affinity stuff */
 #if defined(HAVE_SETAFFINITY)
-  free(cpuid);
-  free(buf);
+  if (with_aff) {
+    free(cpuid);
+    free(buf);
+  }
 #endif
 
   /* Wait for the runner threads to be in place. */
diff --git a/src/engine.h b/src/engine.h
index 15abc4b639..6e7987ee9e 100644
--- a/src/engine.h
+++ b/src/engine.h
@@ -197,7 +197,7 @@ void engine_compute_next_snapshot_time(struct engine *e);
 void engine_dump_snapshot(struct engine *e);
 void engine_init(struct engine *e, struct space *s,
                  const struct swift_params *params, int nr_nodes, int nodeID,
-                 int nr_threads, int policy, int verbose,
+                 int nr_threads, int with_aff, int policy, int verbose, 
                  const struct phys_const *physical_constants,
                  const struct hydro_props *hydro,
                  const struct external_potential *potential);
-- 
GitLab