From ec77e6f514768c8c21ef6017e3f57183558ee8d7 Mon Sep 17 00:00:00 2001 From: "Peter W. Draper" <p.w.draper@durham.ac.uk> Date: Fri, 28 Apr 2023 15:18:50 +0100 Subject: [PATCH] Also make fake command multi-threaded --- swiftmpifakestepsim.c | 212 ++++++++++++++++++++++++++---------------- 1 file changed, 131 insertions(+), 81 deletions(-) diff --git a/swiftmpifakestepsim.c b/swiftmpifakestepsim.c index c1f3fa6..6b846ba 100644 --- a/swiftmpifakestepsim.c +++ b/swiftmpifakestepsim.c @@ -45,18 +45,20 @@ static long int default_seed = 1987654321; static MPI_Comm node_comms[512]; /* The local queues. */ -static struct mpiuse_log_entry **volatile reqs_queue; -static int volatile ind_req = 0; -static int volatile nr_reqs = 0; -static int volatile injecting = 1; -static struct mpiuse_log_entry **volatile recvs_queue; -static int volatile nr_recvs = 0; -static int volatile ind_recv = 0; -static int volatile todo_recv = 0; -static struct mpiuse_log_entry **volatile sends_queue; -static int volatile nr_sends = 0; -static int volatile ind_send = 0; -static int volatile todo_send = 0; +/* The local queues. */ +#define NITHREADS 4 +static struct mpiuse_log_entry **volatile reqs_queue[NITHREADS]; +static int volatile ind_req[NITHREADS] = {0}; +static int volatile nr_reqs[NITHREADS] = {0}; +static int volatile injecting = NITHREADS; + +static struct mpiuse_log_entry **volatile recvs_queue[NITHREADS]; +static int volatile nr_recvs[NITHREADS] = {0}; +static int volatile todo_recv[NITHREADS] = {0}; + +static struct mpiuse_log_entry **volatile sends_queue[NITHREADS]; +static int volatile nr_sends[NITHREADS] = {0}; +static int volatile todo_send[NITHREADS] = {0}; /** * @brief fill a data area with a pattern that can be checked for changes. @@ -88,22 +90,26 @@ static int datacheck_test(size_t size, void *data) { } /** - * @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests. + * @brief Runner for a injection thread, initiates MPI_Isend and MPI_Irecv + * requests. */ -static void *inject_thread(void *arg) { +static void injection_runner(int tid) { - if (verbose) message("%d: injection thread starts", *((int *)arg)); + if (verbose) message("%d: injection thread starts", tid); ticks starttics = getticks(); + struct mpiuse_log_entry **reqs = reqs_queue[tid]; + struct mpiuse_log_entry **sends = sends_queue[tid]; + struct mpiuse_log_entry **recvs = recvs_queue[tid]; - while (ind_req < nr_reqs) { - struct mpiuse_log_entry *log = reqs_queue[ind_req]; + while (ind_req[tid] < nr_reqs[tid]) { + struct mpiuse_log_entry *log = reqs[ind_req[tid]]; /* Initialise new log elements. */ log->done = 0; log->nr_tests = 0; log->tsum = 0.0; log->tmax = 0; - log->tmin = INT_MAX; + log->tmin = LONG_MAX; log->endtic = 0; log->injtic = getticks(); @@ -120,9 +126,9 @@ static void *inject_thread(void *arg) { node_comms[log->rank], &log->req); /* Add a new send request. */ - int ind = atomic_inc(&nr_sends); - sends_queue[ind] = log; - atomic_inc(&todo_send); + int ind = atomic_inc(&nr_sends[tid]); + sends[ind] = log; + atomic_inc(&todo_send[tid]); } else { @@ -132,24 +138,35 @@ static void *inject_thread(void *arg) { log->tag, node_comms[log->otherrank], &log->req); /* Add a new recv request. */ - int ind = atomic_inc(&nr_recvs); - recvs_queue[ind] = log; - atomic_inc(&todo_recv); + int ind = atomic_inc(&nr_recvs[tid]); + recvs[ind] = log; + atomic_inc(&todo_recv[tid]); } if (err != MPI_SUCCESS) error("Failed to activate send or recv"); - ind_req++; + ind_req[tid]++; } /* All done, thread exiting. */ if (verbose) { - message("%d injections completed, sends = %d, recvs = %d", ind_req, - nr_sends, nr_recvs); - message("remaining sends = %d, recvs = %d", todo_send, todo_recv); + message("%d injections completed, sends = %d, recvs = %d", ind_req[tid], + nr_sends[tid], nr_recvs[tid]); + message("remaining sends = %d, recvs = %d", todo_send[tid], todo_recv[tid]); } - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + message("%d took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics), clocks_getunit()); atomic_dec(&injecting); +} + +/** + * @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests. + * + * The requests are initiated in the time order of the original log and an + * attempt to start these with the same interval gap is made if usetics is + * set, otherwise we just do them as quickly as possible. + */ +static void *inject_thread(void *arg) { + injection_runner(*(int *)arg); return NULL; } @@ -175,7 +192,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, /* Global MPI_Test statistics. */ int lncalls = 0; double lsum = 0.0; - ticks lmint = INT_MAX; + ticks lmint = LONG_MAX; ticks lmaxt = 0; /* We loop while new requests are being injected and we still have requests @@ -236,22 +253,24 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, */ static void *send_thread(void *arg) { - if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting); + int tid = *((int *)arg); + if (verbose) message("%d: send thread starts", tid); ticks starttics = getticks(); int ncalls; double sum; ticks mint; ticks maxt; - queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt); + queue_runner(sends_queue[tid], &nr_sends[tid], &todo_send[tid], &sum, &ncalls, + &mint, &maxt); message( - "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " + "%d %d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " "%.3f (%s)", - ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), + tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); if (verbose) - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + message("%d took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics), clocks_getunit()); /* Thread exits. */ @@ -263,22 +282,24 @@ static void *send_thread(void *arg) { */ static void *recv_thread(void *arg) { - if (verbose) message("%d: recv thread starts", *((int *)arg)); + int tid = *((int *)arg); + if (verbose) message("%d: recv thread starts", tid); ticks starttics = getticks(); int ncalls; double sum; ticks mint; ticks maxt; - queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt); + queue_runner(recvs_queue[tid], &nr_recvs[tid], &todo_recv[tid], &sum, &ncalls, + &mint, &maxt); message( - "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " + "%d %d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " "%.3f (%s)", - ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), + tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); if (verbose) - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + message("%d took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics), clocks_getunit()); /* Thread exits. */ @@ -312,40 +333,33 @@ static void pick_logs(int random) { size_t nlogs = mpiuse_nr_logs(); /* Duplicate of logs. */ - reqs_queue = (struct mpiuse_log_entry **)calloc( + struct mpiuse_log_entry **reqs = (struct mpiuse_log_entry **)calloc( nlogs, sizeof(struct mpiuse_log_entry *)); - nr_reqs = 0; - sends_queue = (struct mpiuse_log_entry **)calloc( - nlogs, sizeof(struct mpiuse_log_entry *)); - nr_sends = 0; - recvs_queue = (struct mpiuse_log_entry **)calloc( - nlogs, sizeof(struct mpiuse_log_entry *)); - nr_recvs = 0; + int nreqs = 0; if (random == 0 || random == 2) { for (int k = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = mpiuse_get_log(k); if (log->rank == myrank && log->activation) { log->data = NULL; - reqs_queue[nr_reqs] = log; - nr_reqs++; + reqs[nreqs] = log; + nreqs++; } } if (random == 0) { /* Sort into increasing time. */ - qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs); + qsort(reqs, nreqs, sizeof(struct mpiuse_log_entry *), cmp_logs); } else { /* Randomize the order, so ranks do not all work in sequence. */ - mpiuse_shuffle_logs(reqs_queue, nr_reqs); + mpiuse_shuffle_logs(reqs, nreqs); } /* Check. */ if (random == 0) { - for (int k = 0; k < nr_reqs - 1; k++) { - if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic) - message("reqs_queue: %lld > %lld", reqs_queue[k]->tic, - reqs_queue[k + 1]->tic); + for (int k = 0; k < nreqs - 1; k++) { + if (reqs[k]->tic > reqs[k + 1]->tic) + message("reqs_queue: %lld > %lld", reqs[k]->tic, reqs[k + 1]->tic); } } } else { @@ -359,14 +373,14 @@ static void pick_logs(int random) { if (log->rank == myrank && log->activation == 1 && log->type == RECV_TYPE) { log->data = NULL; - reqs_queue[nr_reqs] = log; - nr_reqs++; + reqs[nreqs] = log; + nreqs++; nrecv++; } } /* These are sorted into log time order. */ - qsort(reqs_queue, nrecv, sizeof(struct mpiuse_log_entry *), cmp_logs); + qsort(reqs, nrecv, sizeof(struct mpiuse_log_entry *), cmp_logs); /* Now the sends. */ int nsend = 0; @@ -375,15 +389,46 @@ static void pick_logs(int random) { if (log->rank == myrank && log->activation == 1 && log->type == SEND_TYPE) { log->data = NULL; - reqs_queue[nr_reqs] = log; - nr_reqs++; + reqs[nreqs] = log; + nreqs++; nsend++; } } /* These are randomized. */ - mpiuse_shuffle_logs(&reqs_queue[nrecv], nsend); + mpiuse_shuffle_logs(&reqs[nrecv], nsend); } + + /* And partition into queues for injection. Use interleave pick so that + * close in time injects are on different queues. */ + for (int k = 0; k < NITHREADS; k++) { + reqs_queue[k] = (struct mpiuse_log_entry **)malloc( + sizeof(struct mpiuse_log_entry *) * nlogs); + nr_reqs[k] = 0; + + sends_queue[k] = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_sends[k] = 0; + + recvs_queue[k] = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_recvs[k] = 0; + + ind_req[k] = 0; + } + + for (int k = 0; k < nreqs; k++) { + int tid = k % NITHREADS; + reqs_queue[tid][nr_reqs[tid]] = reqs[k]; + nr_reqs[tid]++; + } + + if (verbose) { + for (int k = 0; k < NITHREADS; k++) { + message("nr_reqs[%d] = %d", k, nr_reqs[k]); + } + } + free(reqs); } /** @@ -539,28 +584,33 @@ int main(int argc, char *argv[]) { if (myrank == 0) { message("Start of MPI tests"); message("=================="); - if (verbose) { - if (datacheck) - message("checking data pattern on send and recv completion"); - } + message("Using %d threads each for injection, send and receive", NITHREADS); + if (datacheck) message("Checking data pattern on send and recv completion"); } - /* Make three threads, one for injecting tasks and two to check for + /* Make three types of thread, one for injecting tasks and two to check for * completions of the sends and recv independently. */ - pthread_t injectthread; - if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0) - error("Failed to create injection thread."); - pthread_t sendthread; - if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) - error("Failed to create send thread."); - pthread_t recvthread; - if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) - error("Failed to create recv thread."); - - /* Wait until all threads have exited and all MPI requests have completed. */ - pthread_join(injectthread, NULL); - pthread_join(sendthread, NULL); - pthread_join(recvthread, NULL); + static int ks[NITHREADS]; + pthread_t injectthreads[NITHREADS]; + pthread_t sendthreads[NITHREADS]; + pthread_t recvthreads[NITHREADS]; + for (int k = 0; k < NITHREADS; k++) { + ks[k] = k; + if (pthread_create(&injectthreads[k], NULL, &inject_thread, &ks[k]) != 0) + error("Failed to create injection thread."); + if (pthread_create(&sendthreads[k], NULL, &send_thread, &ks[k]) != 0) + error("Failed to create send thread."); + if (pthread_create(&recvthreads[k], NULL, &recv_thread, &ks[k]) != 0) + error("Failed to create recv thread."); + } + + /* Wait until all threads have exited and all MPI requests have completed. + */ + for (int k = 0; k < NITHREADS; k++) { + pthread_join(injectthreads[k], NULL); + pthread_join(sendthreads[k], NULL); + pthread_join(recvthreads[k], NULL); + } /* Dump the updated MPI logs. */ MPI_Barrier(MPI_COMM_WORLD); -- GitLab