diff --git a/README b/README index 59c362415a9199d08fb6dfbb1ed044c66e647254..a010c4595a6e3a113879659390b373553c6e07d3 100644 --- a/README +++ b/README @@ -14,6 +14,7 @@ See INSTALL.swift for install instructions. Usage: swift [OPTION] PARAMFILE Valid options are: + -a Pin runners using processor affinity -c Run with cosmological time integration -d Dry run. Read the parameter file, allocate memory but does not read the particles from ICs and exit before the start of time integration. diff --git a/configure.ac b/configure.ac index 59eea9c4e55d5aa22fd5a2f41e7f9e0a97ad9571..497107121753f212dd8b07f5a8e8eed7acdf82b5 100644 --- a/configure.ac +++ b/configure.ac @@ -329,7 +329,7 @@ fi AM_CONDITIONAL([HAVEPARALLELHDF5],[test "$have_parallel_hdf5" = "yes"]) # Check for setaffinity. -AC_CHECK_FUNC(pthread_setaffinity_np, AC_DEFINE([HAVE_SETAFFINITY],[true], +AC_CHECK_FUNC(pthread_setaffinity_np, AC_DEFINE([HAVE_SETAFFINITY],[1], [Defined if pthread_setaffinity_np exists.]) ) AM_CONDITIONAL(HAVESETAFFINITY, [test "$ac_cv_func_pthread_setaffinity_np" = "yes"]) diff --git a/examples/main.c b/examples/main.c index 19d2d7a7871200ba6e4902d5918a59511bf32b47..a35cf8776c78b6cc7fcab40163fea0b9ea8f1439 100644 --- a/examples/main.c +++ b/examples/main.c @@ -52,6 +52,7 @@ void print_help_message() { printf("\nUsage: swift [OPTION] PARAMFILE\n\n"); printf("Valid options are:\n"); + printf(" %2s %8s %s\n", "-a", "", "Pin runners using processor affinity"); printf(" %2s %8s %s\n", "-c", "", "Run with cosmological time integration"); printf( " %2s %8s %s\n", "-d", "", @@ -88,6 +89,7 @@ void print_help_message() { * @brief Main routine that loads a few particles and generates some output. * */ + int main(int argc, char *argv[]) { struct clocks_time tic, toc; @@ -121,24 +123,17 @@ int main(int argc, char *argv[]) { fflush(stdout); #endif +/* Let's pin the main thread */ #if defined(HAVE_SETAFFINITY) && defined(HAVE_LIBNUMA) && defined(_GNU_SOURCE) - if ((ENGINE_POLICY) & engine_policy_setaffinity) { - /* Ensure the NUMA node on which we initialise (first touch) everything - * doesn't change before engine_init allocates NUMA-local workers. - * Otherwise, we may be scheduled elsewhere between the two times. - */ - cpu_set_t affinity; - CPU_ZERO(&affinity); - CPU_SET(sched_getcpu(), &affinity); - if (sched_setaffinity(0, sizeof(cpu_set_t), &affinity) != 0) { - error("failed to set entry thread's affinity"); - } - } + if (((ENGINE_POLICY) & engine_policy_setaffinity) == + engine_policy_setaffinity) + engine_pin(); #endif /* Welcome to SWIFT, you made the right choice */ if (myrank == 0) greetings(); + int with_aff = 0; int dry_run = 0; int dump_tasks = 0; int with_cosmology = 0; @@ -153,7 +148,10 @@ int main(int argc, char *argv[]) { /* Parse the parameters */ int c; - while ((c = getopt(argc, argv, "cdef:gGhst:v:y:")) != -1) switch (c) { + while ((c = getopt(argc, argv, "acdef:gGhst:v:y:")) != -1) switch (c) { + case 'a': + with_aff = 1; + break; case 'c': with_cosmology = 1; break; @@ -393,6 +391,7 @@ int main(int argc, char *argv[]) { message("%zi parts in %i cells.", s.nr_parts, s.tot_cells); message("%zi gparts in %i cells.", s.nr_gparts, s.tot_cells); message("maximum depth is %d.", s.maxdepth); + fflush(stdout); } /* Verify that each particle is in it's proper cell. */ @@ -419,8 +418,9 @@ int main(int argc, char *argv[]) { /* Initialize the engine with the space and policies. */ if (myrank == 0) clocks_gettime(&tic); struct engine e; - engine_init(&e, &s, params, nr_nodes, myrank, nr_threads, engine_policies, - talking, &prog_const, &hydro_properties, &potential); + engine_init(&e, &s, params, nr_nodes, myrank, nr_threads, with_aff, + engine_policies, talking, &prog_const, &hydro_properties, + &potential); if (myrank == 0) { clocks_gettime(&toc); message("engine_init took %.3f %s.", clocks_diff(&tic, &toc), diff --git a/src/engine.c b/src/engine.c index 819e6907c5e62864fb69517502143101d90d442c..cd5c253c5a2cc5d2968a8b99262660a3b59978df 100644 --- a/src/engine.c +++ b/src/engine.c @@ -72,6 +72,11 @@ const char *engine_policy_names[13] = { /** The rank of the engine as a global variable (for messages). */ int engine_rank; +#ifdef HAVE_SETAFFINITY +/** The initial affinity of the main thread (set by engin_pin()) */ +static cpu_set_t entry_affinity; +#endif + /** * @brief Link a density/force task to a cell. * @@ -2381,23 +2386,58 @@ void engine_dump_snapshot(struct engine *e) { (float)clocks_diff(&time1, &time2), clocks_getunit()); } -#if defined(HAVE_LIBNUMA) && defined(_GNU_SOURCE) -static bool hyperthreads_present(void) { -#ifdef __linux__ - FILE *f = - fopen("/sys/devices/system/cpu/cpu0/topology/thread_siblings_list", "r"); +#ifdef HAVE_SETAFFINITY +/** + * @brief Returns the initial affinity the main thread is using. + */ +static cpu_set_t *engine_entry_affinity() { + + static int use_entry_affinity = 0; + + if (!use_entry_affinity) { + pthread_t engine = pthread_self(); + pthread_getaffinity_np(engine, sizeof(entry_affinity), &entry_affinity); + use_entry_affinity = 1; + } + + return &entry_affinity; +} +#endif + +/** + * @brief Ensure the NUMA node on which we initialise (first touch) everything + * doesn't change before engine_init allocates NUMA-local workers. + */ +void engine_pin() { - int c; - while ((c = fgetc(f)) != EOF && c != ',') +#ifdef HAVE_SETAFFINITY + cpu_set_t *entry_affinity = engine_entry_affinity(); + int pin; + for (pin = 0; pin < CPU_SETSIZE && !CPU_ISSET(pin, entry_affinity); ++pin) ; - fclose(f); - return c == ','; + cpu_set_t affinity; + CPU_ZERO(&affinity); + CPU_SET(pin, &affinity); + if (sched_setaffinity(0, sizeof(affinity), &affinity) != 0) { + error("failed to set engine's affinity"); + } #else - return true; // just guess + error("SWIFT was not compiled with support for pinning."); #endif } + +/** + * @brief Unpins the main thread. + */ +void engine_unpin() { +#ifdef HAVE_SETAFFINITY + pthread_t main_thread = pthread_self(); + pthread_setaffinity_np(main_thread, sizeof(entry_affinity), &entry_affinity); +#else + error("SWIFT was not compiled with support for pinning."); #endif +} /** * @brief init an engine with the given number of threads, queues, and @@ -2409,6 +2449,7 @@ static bool hyperthreads_present(void) { * @param nr_nodes The number of MPI ranks. * @param nodeID The MPI rank of this node. * @param nr_threads The number of threads per MPI rank. + * @param with_aff use processor affinity, if supported. * @param policy The queuing policy to use. * @param verbose Is this #engine talkative ? * @param physical_constants The #phys_const used for this run. @@ -2418,7 +2459,7 @@ static bool hyperthreads_present(void) { void engine_init(struct engine *e, struct space *s, const struct swift_params *params, int nr_nodes, int nodeID, - int nr_threads, int policy, int verbose, + int nr_threads, int with_aff, int policy, int verbose, const struct phys_const *physical_constants, const struct hydro_props *hydro, const struct external_potential *potential) { @@ -2474,74 +2515,118 @@ void engine_init(struct engine *e, struct space *s, if (nr_queues <= 0) nr_queues = e->nr_threads; s->nr_queues = nr_queues; +/* Deal with affinity. For now, just figure out the number of cores. */ #if defined(HAVE_SETAFFINITY) const int nr_cores = sysconf(_SC_NPROCESSORS_ONLN); - int cpuid[nr_cores]; + cpu_set_t *entry_affinity = engine_entry_affinity(); + const int nr_affinity_cores = CPU_COUNT(entry_affinity); + + if (nr_cores > CPU_SETSIZE) /* Unlikely, except on e.g. SGI UV. */ + error("must allocate dynamic cpu_set_t (too many cores per node)"); + + char *buf = malloc((nr_cores + 1) * sizeof(char)); + buf[nr_cores] = '\0'; + for (int j = 0; j < nr_cores; ++j) { + /* Reversed bit order from convention, but same as e.g. Intel MPI's + * I_MPI_PIN_DOMAIN explicit mask: left-to-right, LSB-to-MSB. */ + buf[j] = CPU_ISSET(j, entry_affinity) ? '1' : '0'; + } + + if (verbose && with_aff) message("Affinity at entry: %s", buf); + + int *cpuid = malloc(nr_affinity_cores * sizeof(int)); cpu_set_t cpuset; - if ((policy & engine_policy_cputight) == engine_policy_cputight) { - for (int k = 0; k < nr_cores; k++) cpuid[k] = k; - } else { - /* Get next highest power of 2. */ - int maxint = 1; - while (maxint < nr_cores) maxint *= 2; - cpuid[0] = 0; - int k = 1; - for (int i = 1; i < maxint; i *= 2) - for (int j = maxint / i / 2; j < maxint; j += maxint / i) - if (j < nr_cores && j != 0) cpuid[k++] = j; + int skip = 0; + for (int k = 0; k < nr_affinity_cores; k++) { + int c; + for (c = skip; c < CPU_SETSIZE && !CPU_ISSET(c, entry_affinity); ++c) + ; + cpuid[k] = c; + skip = c + 1; + } + + if (with_aff) { #if defined(HAVE_LIBNUMA) && defined(_GNU_SOURCE) - /* Ascending NUMA distance. Bubblesort(!) for stable equidistant CPUs. */ - if (numa_available() >= 0) { - if (nodeID == 0) message("prefer NUMA-local CPUs"); - - const int home = numa_node_of_cpu(sched_getcpu()); - const int half = nr_cores / 2; - const bool swap_hyperthreads = hyperthreads_present(); - bool done = false; - if (swap_hyperthreads && nodeID == 0) - message("prefer physical cores to hyperthreads"); - - while (!done) { - done = true; - for (int i = 1; i < nr_cores; i++) { - const int node_a = numa_node_of_cpu(cpuid[i - 1]); - const int node_b = numa_node_of_cpu(cpuid[i]); - - /* Avoid using local hyperthreads over unused remote physical cores. - * Assume two hyperthreads, and that cpuid >= half partitions them. - */ - const int thread_a = swap_hyperthreads && cpuid[i - 1] >= half; - const int thread_b = swap_hyperthreads && cpuid[i] >= half; - - bool swap = thread_a > thread_b; - if (thread_a == thread_b) - swap = numa_distance(home, node_a) > numa_distance(home, node_b); - - if (swap) { - const int t = cpuid[i - 1]; - cpuid[i - 1] = cpuid[i]; - cpuid[i] = t; - done = false; + if ((policy & engine_policy_cputight) != engine_policy_cputight) { + + if (numa_available() >= 0) { + if (nodeID == 0) message("prefer NUMA-distant CPUs"); + + /* Get list of numa nodes of all available cores. */ + int *nodes = malloc( nr_affinity_cores * sizeof(int)); + int nnodes = 0; + for (int i = 0; i < nr_affinity_cores; i++) { + nodes[i] = numa_node_of_cpu(cpuid[i]); + if (nodes[i] > nnodes) nnodes = nodes[i]; + } + nnodes += 1; + + /* Count cores per node. */ + int *core_counts = malloc( nnodes * sizeof(int)); + for (int i = 0; i < nr_affinity_cores; i++) { + core_counts[nodes[i]] = 0; + } + for (int i = 0; i < nr_affinity_cores; i++) { + core_counts[nodes[i]] += 1; + } + + /* Index cores within each node. */ + int *core_indices = malloc( nr_affinity_cores * sizeof(int)); + for (int i = nr_affinity_cores - 1; i >= 0; i--) { + core_indices[i] = core_counts[nodes[i]]; + core_counts[nodes[i]] -= 1; + } + + /* Now sort so that we pick adjacent cpuids from different nodes + * by sorting internal node core indices. */ + int done = 0; + while (!done) { + done = 1; + for (int i = 1; i < nr_affinity_cores; i++) { + if ( core_indices[i] < core_indices[i-1] ) { + int t = cpuid[i-1]; + cpuid[i-1] = cpuid[i]; + cpuid[i] = t; + + t = core_indices[i-1]; + core_indices[i-1] = core_indices[i]; + core_indices[i] = t; + done = 0; + } } } + + free(nodes); + free(core_counts); + free(core_indices); } } #endif + } + else { + if (nodeID == 0) message("no processor affinity used"); + + }/* with_aff */ - if (nodeID == 0) { + /* Avoid (unexpected) interference between engine and runner threads. We can + * do this once we've made at least one call to engine_entry_affinity and + * maybe numa_node_of_cpu(sched_getcpu()), even if the engine isn't already + * pinned. Also unpin this when asked to not pin at all (!with_aff). */ + engine_unpin(); +#endif + + if (with_aff) { #ifdef WITH_MPI - printf("[%04i] %s engine_init: cpu map is [ ", nodeID, - clocks_get_timesincestart()); + printf("[%04i] %s engine_init: cpu map is [ ", nodeID, + clocks_get_timesincestart()); #else - printf("%s engine_init: cpu map is [ ", clocks_get_timesincestart()); + printf("%s engine_init: cpu map is [ ", clocks_get_timesincestart()); #endif - for (int i = 0; i < nr_cores; i++) printf("%i ", cpuid[i]); - printf("].\n"); - } + for (int i = 0; i < nr_affinity_cores; i++) printf("%i ", cpuid[i]); + printf("].\n"); } -#endif /* Are we doing stuff in parallel? */ if (nr_nodes > 1) { @@ -2684,19 +2769,23 @@ void engine_init(struct engine *e, struct space *s, if (pthread_create(&e->runners[k].thread, NULL, &runner_main, &e->runners[k]) != 0) error("Failed to create runner thread."); - if (e->policy & engine_policy_setaffinity) { + + /* Try to pin the runner to a given core */ + if (with_aff && (e->policy & engine_policy_setaffinity) == engine_policy_setaffinity) { #if defined(HAVE_SETAFFINITY) /* Set a reasonable queue ID. */ - e->runners[k].cpuid = cpuid[k % nr_cores]; + int coreid = k % nr_affinity_cores; + e->runners[k].cpuid = cpuid[coreid]; + if (nr_queues < e->nr_threads) - e->runners[k].qid = cpuid[k % nr_cores] * nr_queues / nr_cores; + e->runners[k].qid = cpuid[coreid] * nr_queues / nr_affinity_cores; else e->runners[k].qid = k; /* Set the cpu mask to zero | e->id. */ CPU_ZERO(&cpuset); - CPU_SET(cpuid[k % nr_cores], &cpuset); + CPU_SET(cpuid[coreid], &cpuset); /* Apply this mask to the runner's pthread. */ if (pthread_setaffinity_np(e->runners[k].thread, sizeof(cpu_set_t), @@ -2710,9 +2799,23 @@ void engine_init(struct engine *e, struct space *s, e->runners[k].cpuid = k; e->runners[k].qid = k * nr_queues / e->nr_threads; } - /* message( "runner %i on cpuid=%i with qid=%i." , e->runners[k].id , */ - /* e->runners[k].cpuid , e->runners[k].qid ); */ + if (verbose) { + if (with_aff) + message("runner %i on cpuid=%i with qid=%i.", e->runners[k].id, + e->runners[k].cpuid, e->runners[k].qid); + else + message("runner %i using qid=%i no cpuid.", e->runners[k].id, + e->runners[k].qid); + } + } + +/* Free the affinity stuff */ +#if defined(HAVE_SETAFFINITY) + if (with_aff) { + free(cpuid); + free(buf); } +#endif /* Wait for the runner threads to be in place. */ while (e->barrier_running || e->barrier_launch) diff --git a/src/engine.h b/src/engine.h index cc6ed9bb038667d4bd548f33dafad07176be0750..f1e9add6a903d5939ce717d9bb43c42dfca0675c 100644 --- a/src/engine.h +++ b/src/engine.h @@ -197,7 +197,7 @@ void engine_compute_next_snapshot_time(struct engine *e); void engine_dump_snapshot(struct engine *e); void engine_init(struct engine *e, struct space *s, const struct swift_params *params, int nr_nodes, int nodeID, - int nr_threads, int policy, int verbose, + int nr_threads, int with_aff, int policy, int verbose, const struct phys_const *physical_constants, const struct hydro_props *hydro, const struct external_potential *potential); @@ -219,5 +219,7 @@ void engine_redistribute(struct engine *e); struct link *engine_addlink(struct engine *e, struct link *l, struct task *t); void engine_print_policy(struct engine *e); int engine_is_done(struct engine *e); +void engine_pin(); +void engine_unpin(); #endif /* SWIFT_ENGINE_H */