diff --git a/swiftmpistepsim.c b/swiftmpistepsim.c index f6aaadc396c731fd99130ba42b7b9e380eaca39f..bbfe53c7264ba352f8bf562e50385a676af9d5ac 100644 --- a/swiftmpistepsim.c +++ b/swiftmpistepsim.c @@ -1,6 +1,6 @@ /******************************************************************************* * This file is part of SWIFT. - * Copyright (c) 2019 Peter W. Draper + * Copyright (c) 2020 Peter W. Draper * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published @@ -29,6 +29,9 @@ #include "error.h" #include "mpiuse.h" +/* Number of threads used to partition the requests. */ +#define NTHREADS 2 + /* Global: Our rank for all to see. */ int myrank = -1; @@ -47,21 +50,23 @@ static const int task_type_recv = 23; /* Global communicators for each of the subtypes. */ static const int task_subtype_count = 30; // Just some upper limit on subtype. -static MPI_Comm subtypeMPI_comms[30]; - -/* The local queues. */ -static struct mpiuse_log_entry **volatile reqs_queue; -static int volatile ind_req = 0; -static int volatile nr_reqs = 0; -static int volatile injecting = 1; -static struct mpiuse_log_entry **volatile recvs_queue; -static int volatile nr_recvs = 0; -static int volatile ind_recv = 0; -static int volatile todo_recv = 0; -static struct mpiuse_log_entry **volatile sends_queue; -static int volatile nr_sends = 0; -static int volatile ind_send = 0; -static int volatile todo_send = 0; +static MPI_Comm subtypeMPI_comms[NTHREADS][30]; + +/* The local queues. We need to partition these to keep the MPI communications + * separate so we can use the MPI_THREAD_SPLIT option of the Intel 2020 MPI + * library. */ +static struct mpiuse_log_entry **volatile reqs_queue[NTHREADS]; +static int volatile ind_req[NTHREADS] = {0}; +static int volatile nr_reqs[NTHREADS] = {0}; +static int volatile injecting[NTHREADS] = {0}; +static struct mpiuse_log_entry **volatile recvs_queue[NTHREADS]; +static int volatile nr_recvs[NTHREADS] = {0}; +static int volatile ind_recv[NTHREADS] = {0}; +static int volatile todo_recv[NTHREADS] = {0}; +static struct mpiuse_log_entry **volatile sends_queue[NTHREADS]; +static int volatile nr_sends[NTHREADS] = {0}; +static int volatile ind_send[NTHREADS] = {0}; +static int volatile todo_send[NTHREADS] = {0}; /* CPU frequency of the machine that created the MPI log. */ // XXX need to store this in the data file. @@ -76,7 +81,7 @@ static double log_clocks_cpufreq = 2194844448.0; static void datacheck_fill(size_t size, void *data) { unsigned char *p = (unsigned char *)data; for (size_t i = 0; i < size; i++) { - p[i] = 170; /* 10101010 in bits. */ + p[i] = 170; /* 10101010 in bits. */ } } @@ -104,19 +109,20 @@ static int datacheck_test(size_t size, void *data) { * set, otherwise we just do them as quickly as possible. */ static void *inject_thread(void *arg) { + int tid = *((int *)arg); - if (verbose) message("%d: injection thread starts", *((int *)arg)); + if (verbose) message("%d: injection thread starts", tid); ticks starttics = getticks(); /* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */ - ticks basetic = reqs_queue[0]->tic; + ticks basetic = reqs_queue[tid][0]->tic; ticks looptics = 0; double deadtime = 0.0; struct timespec sleep; sleep.tv_sec = 0; - while (ind_req < nr_reqs) { - struct mpiuse_log_entry *log = reqs_queue[ind_req]; + while (ind_req[tid] < nr_reqs[tid]) { + struct mpiuse_log_entry *log = reqs_queue[tid][ind_req[tid]]; if (usetics) { /* Expect time between this request and the previous one. */ @@ -166,28 +172,28 @@ static void *inject_thread(void *arg) { /* And send. */ err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, - subtypeMPI_comms[log->subtype], &log->req); + subtypeMPI_comms[tid][log->subtype], &log->req); /* Add a new send request. */ - int ind = atomic_inc(&nr_sends); - sends_queue[ind] = log; - atomic_inc(&todo_send); + int ind = atomic_inc(&nr_sends[tid]); + sends_queue[tid][ind] = log; + atomic_inc(&todo_send[tid]); } else { /* Ready to receive. */ log->data = calloc(log->size, 1); err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, - subtypeMPI_comms[log->subtype], &log->req); + subtypeMPI_comms[tid][log->subtype], &log->req); /* Add a new recv request. */ - int ind = atomic_inc(&nr_recvs); - recvs_queue[ind] = log; - atomic_inc(&todo_recv); + int ind = atomic_inc(&nr_recvs[tid]); + recvs_queue[tid][ind] = log; + atomic_inc(&todo_recv[tid]); } if (err != MPI_SUCCESS) error("Failed to activate send or recv"); - ind_req++; + ind_req[tid]++; /* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are * equally timed. Note we include a nanosleep, they are slow. */ @@ -196,42 +202,45 @@ static void *inject_thread(void *arg) { nanosleep(&sleep, NULL); looptics = (getticks() - starttics); if (verbose) - message("injection loop took %.3f %s.", clocks_from_ticks(looptics), - clocks_getunit()); + message("%d: injection loop took %.3f %s.", tid, + clocks_from_ticks(looptics), clocks_getunit()); } } /* All done, thread exiting. */ if (verbose) { - message("%d injections completed, sends = %d, recvs = %d", ind_req, - nr_sends, nr_recvs); - message("remaining sends = %d, recvs = %d", todo_send, todo_recv); - if (usetics) message("deadtime %.3f ms", deadtime / 1.0e6); + message("%d: %d injections completed, sends = %d, recvs = %d", tid, + ind_req[tid], nr_sends[tid], nr_recvs[tid]); + message("%d: remaining sends = %d, recvs = %d", tid, todo_send[tid], + todo_recv[tid]); + if (usetics) message("%d: deadtime %.3f ms", tid, deadtime / 1.0e6); } - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + message("%d: took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics), clocks_getunit()); - atomic_dec(&injecting); + atomic_dec(&injecting[tid]); return NULL; } /** * @brief main loop to run over a queue of MPI requests and test for when they - * complete. Returns the total amount of time spent in calls to MPI_Test and - * the number of times it was called. + * complete for a thread. Returns the total amount of time spent in calls to + * MPI_Test and the number of times it was called. * * @param logs the list of logs pointing to requests. * @param nr_logs pointer to the variable containing the current number of * logs. * @param todos pointer to the variable containing the number of requests that * are still active. + * @param injecting pointer to the variable indicating if there are still some + * injections. * @param sum the total number of ticks spent in calls to MPI_Test. * @param ncalls the total number of calls to MPI_Test. * @param mint the minimum ticks an MPI_Test call took. * @param maxt the maximum ticks an MPI_Test call took. */ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, - int volatile *todos, double *sum, int *ncalls, - ticks *mint, ticks *maxt) { + int volatile *todos, int volatile *injecting, + double *sum, int *ncalls, ticks *mint, ticks *maxt) { /* Global MPI_Test statistics. */ int lncalls = 0; @@ -241,7 +250,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, /* We loop while new requests are being injected and we still have requests * to complete. */ - while (injecting || (!injecting && *todos > 0)) { + while (*injecting || (!*injecting && *todos > 0)) { int nlogs = *nr_logs; for (int k = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = logs[k]; @@ -293,26 +302,28 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, } /** - * @brief Send thread, checks if MPI_Isend requests have completed. + * @brief A send thread, checks if MPI_Isend requests have completed. */ static void *send_thread(void *arg) { + int tid = *((int *)arg); - if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting); + if (verbose) message("%d: send thread starts (%d)", tid, injecting[tid]); ticks starttics = getticks(); int ncalls; double sum; ticks mint; ticks maxt; - queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt); + queue_runner(sends_queue[tid], &nr_sends[tid], &todo_send[tid], + &injecting[tid], &sum, &ncalls, &mint, &maxt); message( - "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " - "%.3f (%s)", - ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), + "%d: %d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, " + "max time %.3f (%s)", + tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); if (verbose) - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + message("%d: took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics), clocks_getunit()); /* Thread exits. */ @@ -320,26 +331,28 @@ static void *send_thread(void *arg) { } /** - * @brief Recv thread, checks if MPI_Irecv requests have completed. + * @brief A recv thread, checks if MPI_Irecv requests have completed. */ static void *recv_thread(void *arg) { + int tid = *((int *)arg); - if (verbose) message("%d: recv thread starts", *((int *)arg)); + if (verbose) message("%d: recv thread starts (%d)", tid, injecting[tid]); ticks starttics = getticks(); int ncalls; double sum; ticks mint; ticks maxt; - queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt); + queue_runner(recvs_queue[tid], &nr_recvs[tid], &todo_recv[tid], + &injecting[tid], &sum, &ncalls, &mint, &maxt); message( - "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " - "%.3f (%s)", - ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), + "%d: %d MPI_Test calls took: %.3f, mean time %.3f, " + "min time %.3f, max time %.3f (%s)", + tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); if (verbose) - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + message("%d: took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics), clocks_getunit()); /* Thread exits. */ @@ -361,23 +374,30 @@ static int cmp_logs(const void *p1, const void *p2) { /** * @brief Pick out the relevant logging data for our rank, i.e. all - * activations of sends and recvs. We ignore the original completions. - * The final list is sorted into increasing time of activation. + * activations of sends and recvs and split into NTHREADS parts. + * We ignore the original completions. The final lists are sorted into + * increasing time of activation. */ static void pick_logs(void) { size_t nlogs = mpiuse_nr_logs(); - /* Duplicate of logs. */ - reqs_queue = (struct mpiuse_log_entry **)calloc( - nlogs, sizeof(struct mpiuse_log_entry *)); - nr_reqs = 0; - sends_queue = (struct mpiuse_log_entry **)calloc( - nlogs, sizeof(struct mpiuse_log_entry *)); - nr_sends = 0; - recvs_queue = (struct mpiuse_log_entry **)calloc( - nlogs, sizeof(struct mpiuse_log_entry *)); - nr_recvs = 0; + /* Duplicates of logs. */ + int size = nlogs / NTHREADS + 1; + for (int k = 0; k < NTHREADS; k++) { + reqs_queue[k] = (struct mpiuse_log_entry **)calloc( + size, sizeof(struct mpiuse_log_entry *)); + nr_reqs[k] = 0; + sends_queue[k] = (struct mpiuse_log_entry **)calloc( + size, sizeof(struct mpiuse_log_entry *)); + nr_sends[k] = 0; + recvs_queue[k] = (struct mpiuse_log_entry **)calloc( + size, sizeof(struct mpiuse_log_entry *)); + nr_recvs[k] = 0; + } + /* Index of the thread logs to update. */ + int nt = 0; + int nr_logs = 0; for (int k = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = mpiuse_get_log(k); if (log->rank == myrank && log->activation) { @@ -385,23 +405,31 @@ static void pick_logs(void) { /* And keep this log. */ log->data = NULL; - reqs_queue[nr_reqs] = log; - nr_reqs++; + reqs_queue[nt][nr_reqs[nt]] = log; + nr_reqs[nt]++; + + nt++; + if (nt == NTHREADS) nt = 0; + nr_logs++; } else { error("task type '%d' is not a known send or recv task", log->type); } } } + message("Read %d active send and recv logs from input file", nr_logs); /* Sort into increasing time. */ - qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs); - - /* Check. */ - for (int k = 0; k < nr_reqs - 1; k++) { - if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic) - message("reqs_queue: %lld > %lld", reqs_queue[k]->tic, - reqs_queue[k + 1]->tic); + for (int j = 0; j < NTHREADS; j++) { + qsort(reqs_queue[j], nr_reqs[j], sizeof(struct mpiuse_log_entry *), + cmp_logs); + + /* Check. */ + for (int k = 0; k < nr_reqs[j] - 1; k++) { + if (reqs_queue[j][k]->tic > reqs_queue[j][k + 1]->tic) + message("reqs_queue: %lld > %lld", reqs_queue[j][k]->tic, + reqs_queue[j][k + 1]->tic); + } } } @@ -472,9 +500,11 @@ int main(int argc, char *argv[]) { error("The number of MPI ranks %d does not match the expected value %d", nranks, nr_nodes); - /* Create communicators for each subtype of the tasks. */ + /* Create communicators for each subtype of the tasks and each thread. */ for (int i = 0; i < task_subtype_count; i++) { - MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); + for (int j = 0; j < NTHREADS; j++) { + MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[j][i]); + } } /* Each rank requires its own queue, so extract them. */ @@ -488,26 +518,35 @@ int main(int argc, char *argv[]) { message("=================="); if (verbose) { if (!usetics) message("using fast untimed injections"); - if (datacheck) message("checking data pattern on send and recv completion"); + if (datacheck) + message("checking data pattern on send and recv completion"); } } - /* Make three threads, one for injecting tasks and two to check for - * completions of the sends and recv independently. */ - pthread_t injectthread; - if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0) - error("Failed to create injection thread."); - pthread_t sendthread; - if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) - error("Failed to create send thread."); - pthread_t recvthread; - if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) - error("Failed to create recv thread."); + /* Make three batches of NTHREADS threads, one batch for injecting tasks and + * two batches to check for completions of the sends and recv + * independently. */ + pthread_t injectthread[NTHREADS]; + pthread_t sendthread[NTHREADS]; + pthread_t recvthread[NTHREADS]; + static int tids[NTHREADS]; + for (int j = 0; j < NTHREADS; j++) { + tids[j] = j; + injecting[j] = 1; + if (pthread_create(&injectthread[j], NULL, &inject_thread, &tids[j]) != 0) + error("Failed to create injection thread."); + if (pthread_create(&sendthread[j], NULL, &send_thread, &tids[j]) != 0) + error("Failed to create send thread."); + if (pthread_create(&recvthread[j], NULL, &recv_thread, &tids[j]) != 0) + error("Failed to create recv thread."); + } /* Wait until all threads have exited and all MPI requests have completed. */ - pthread_join(injectthread, NULL); - pthread_join(sendthread, NULL); - pthread_join(recvthread, NULL); + for (int j = 0; j < NTHREADS; j++) { + pthread_join(injectthread[j], NULL); + pthread_join(sendthread[j], NULL); + pthread_join(recvthread[j], NULL); + } /* Dump the updated MPI logs. */ MPI_Barrier(MPI_COMM_WORLD);