diff --git a/Makefile b/Makefile index 57a0353bb52f7f2bb21f7f5a9ee9d5a82fc5a051..38b9759e03e50b26dce48ef87239ad0467c611d8 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ #CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined -CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_ASSERT_ON +CFLAGS = -g -O3 -Wall -Iinfinity/include +#CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_ASSERT_ON #CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread INCLUDES = mpiuse.h atomic.h cycle.h clocks.h error.h @@ -9,11 +10,16 @@ DEPS = Makefile $(SOURCES) $(INCLUDES) INFINITY = -Linfinity -linfinity -libverbs -all: swiftmpistepsim swiftmpirdmastepsim swiftmpirdmaonestepsim swiftmpirdmastepsim2 swiftmpirdmastepsim3 +all: swiftmpistepsim swiftmpistepsim2 \ + swiftmpirdmastepsim swiftmpirdmastepsim2 swiftmpirdmastepsim3 \ + swiftmpirdmaonestepsim swiftmpistepsim: swiftmpistepsim.c $(DEPS) mpicxx $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c $(SOURCES) +swiftmpistepsim2: swiftmpistepsim2.c $(DEPS) + mpicxx $(CFLAGS) -o swiftmpistepsim2 swiftmpistepsim2.c $(SOURCES) + swiftmpirdmastepsim: swiftmpirdmastepsim.c $(DEPS) mpicxx $(CFLAGS) -o swiftmpirdmastepsim swiftmpirdmastepsim.c $(SOURCES) $(INFINITY) @@ -28,6 +34,7 @@ swiftmpirdmastepsim3: swiftmpirdmastepsim3.c $(DEPS) clean: rm -f swiftmpistepsim + rm -f swiftmpistepsim2 rm -f swiftmpirdmastepsim rm -f swiftmpirdmaonestepsim rm -f swiftmpirdmastepsim2 diff --git a/swiftmpistepsim2.c b/swiftmpistepsim2.c new file mode 100644 index 0000000000000000000000000000000000000000..c5fc3d60899285d7e16467807e647c5d1ba31c17 --- /dev/null +++ b/swiftmpistepsim2.c @@ -0,0 +1,478 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2019 Peter W. Draper + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +#include <limits.h> +#include <mpi.h> +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> + +#include "atomic.h" +#include "clocks.h" +#include "error.h" +#include "mpiuse.h" + +/* Global: Our rank for all to see. */ +int myrank = -1; + +/* Are we verbose. */ +static int verbose = 0; + +/* Scale to apply to the size of the messages we send. */ +static float messagescale = 1.0; + +/* Set a data pattern and check we get this back, slow... */ +static int datacheck = 0; + +/* Integer types of send and recv tasks, must match log. */ +static const int task_type_send = 22; +static const int task_type_recv = 23; + +/* Global communicators for each of the subtypes. */ +static const int task_subtype_count = 30; // Just some upper limit on subtype. +static MPI_Comm subtypeMPI_comms[30]; + +/* The local queues. */ +static struct mpiuse_log_entry **volatile reqs_queue; +static int volatile ind_req = 0; +static int volatile nr_reqs = 0; +static struct mpiuse_log_entry **volatile recvs_queue; +static int volatile nr_recvs = 0; +static int volatile ind_recv = 0; +static int volatile todo_recv = 0; +static struct mpiuse_log_entry **volatile sends_queue; +static int volatile nr_sends = 0; +static int volatile ind_send = 0; + +/* CPU frequency of the machine that created the MPI log. */ +// XXX need to store this in the data file. +static double log_clocks_cpufreq = 2194844448.0; + +/** + * @brief fill a data area with a pattern that can be checked for changes. + * + * @param size size of data in bytes. + * @param data the data to fill. + */ +static void datacheck_fill(size_t size, void *data) { + unsigned char *p = (unsigned char *)data; + for (size_t i = 0; i < size; i++) { + p[i] = 170; /* 10101010 in bits. */ + } +} + +/** + * @brief test a filled data area for our pattern. + * + * @param size size of data in bytes. + * @param data the data to fill. + * + * @result 1 on success, 0 otherwise. + */ +static int datacheck_test(size_t size, void *data) { + unsigned char *p = (unsigned char *)data; + for (size_t i = 0; i < size; i++) { + if (p[i] != 170) return 0; + } + return 1; +} + +/** + * @brief Injection thread, initiates MPI_Irecv requests. + */ +static void *inject_thread(void *arg) { + + if (verbose) message("%d: injection thread starts", *((int *)arg)); + ticks starttics = getticks(); + + while (ind_req < nr_reqs) { + struct mpiuse_log_entry *log = reqs_queue[ind_req]; + + /* Initialise new log elements. */ + log->injtic = getticks(); + log->done = 0; + log->nr_tests = 0; + log->tsum = 0.0; + log->tmax = 0; + log->tmin = INT_MAX; + log->endtic = 0; + + /* Differences to SWIFT: MPI_BYTE not the MPI_Type. */ + int err = 0; + if (log->type == task_type_recv) { + + /* Start ready to receive. */ + log->data = calloc(log->size, 1); + //message("recv frm %d, tag %d, size %zd", log->otherrank, log->tag, log->size); + err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, + subtypeMPI_comms[log->subtype], &log->req); + + /* Add a new recv request. */ + int ind = atomic_inc(&nr_recvs); + recvs_queue[ind] = log; + atomic_inc(&todo_recv); + } else { + + log->data = calloc(log->size, 1); + + /* Fill data with pattern. */ + if (datacheck) datacheck_fill(log->size, log->data); + + /* And add to queue for sending. */ + int ind = atomic_inc(&nr_sends); + sends_queue[ind] = log; + } + + if (err != MPI_SUCCESS) error("Failed to activate send or recv"); + + ind_req++; + } + + /* All done, thread exiting. */ + if (verbose) { + message("%d injections completed recvs = %d, sends = %d", + ind_req, nr_recvs, nr_sends); + } + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + return NULL; +} + +/** + * @brief send thread sends synchronous MPI_Send. Cannot start until + * injections are complete, so different to optimal behaviour. + */ +static void *send_thread(void *arg) { + + if (verbose) message("%d: send thread starts", *((int *)arg)); + ticks starttics = getticks(); + + for (int k = 0; k < nr_sends; k++) { + struct mpiuse_log_entry *log = sends_queue[k]; + log->injtic = getticks(); + //message("sending to %d, tag %d, size %zd", log->otherrank, log->tag, log->size); + MPI_Send(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, + subtypeMPI_comms[log->subtype]); + + log->done = 1; + log->endtic = getticks(); + free(log->data); + } + + /* All done, thread exiting. */ + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + return NULL; +} + +/** + * @brief main loop to run over a queue of MPI requests and test for when they + * complete. Returns the total amount of time spent in calls to MPI_Test and + * the number of times it was called. + * + * @param logs the list of logs pointing to requests. + * @param nr_logs pointer to the variable containing the current number of + * logs. + * @param todos pointer to the variable containing the number of requests that + * are still active. + * @param sum the total number of ticks spent in calls to MPI_Test. + * @param ncalls the total number of calls to MPI_Test. + * @param mint the minimum ticks an MPI_Test call took. + * @param maxt the maximum ticks an MPI_Test call took. + */ +static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, + int volatile *todos, double *sum, int *ncalls, + ticks *mint, ticks *maxt) { + + /* Global MPI_Test statistics. */ + int lncalls = 0; + double lsum = 0.0; + ticks lmint = INT_MAX; + ticks lmaxt = 0; + + /* We loop while new requests are being injected and we still have requests + * to complete. */ + while (*todos > 0) { + int nlogs = *nr_logs; + for (int k = 0; k < nlogs; k++) { + struct mpiuse_log_entry *log = logs[k]; + if (log != NULL && !log->done) { + ticks tics = getticks(); + int res; + MPI_Status stat; + int err = MPI_Test(&log->req, &res, &stat); + if (err != MPI_SUCCESS) { + error("MPI_Test call failed"); + } + + /* Increment etc. of statistics about time in MPI_Test. */ + ticks dt = getticks() - tics; + log->tsum += (double)dt; + lsum += (double)dt; + + log->nr_tests++; + lncalls++; + + if (dt < log->tmin) log->tmin = dt; + if (dt > log->tmax) log->tmax = dt; + if (dt < lmint) lmint = dt; + if (dt > lmaxt) lmaxt = dt; + + if (res) { + /* Check data sent data is unchanged and received data is as + * expected. */ + if (datacheck && !datacheck_test(log->size, log->data)) { + error("Data mismatch on completion"); + } + + /* Done, clean up. */ + log->done = 1; + log->endtic = getticks(); + free(log->data); + atomic_dec(todos); + } + } + } + } + + /* All done. */ + *sum = lsum; + *ncalls = lncalls; + *mint = lmint; + *maxt = lmaxt; + return; +} + +/** + * @brief Recv thread, checks if MPI_Irecv requests have completed. + */ +static void *recv_thread(void *arg) { + + if (verbose) message("%d: recv thread starts", *((int *)arg)); + ticks starttics = getticks(); + + int ncalls; + double sum; + ticks mint; + ticks maxt; + queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt); + + message( + "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " + "%.3f (%s)", + ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), + clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); + if (verbose) + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + + /* Thread exits. */ + return NULL; +} + +/** + * @brief Comparison function for logged times. + */ +static int cmp_logs(const void *p1, const void *p2) { + struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1; + struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2; + + /* Large unsigned values, so take care. */ + if (l1->tic > l2->tic) return 1; + if (l1->tic < l2->tic) return -1; + return 0; +} + +/** + * @brief Pick out the relevant logging data for our rank, i.e. all + * activations of sends and recvs. We ignore the original completions. + * The final list is sorted into increasing time of activation. + */ +static void pick_logs(void) { + size_t nlogs = mpiuse_nr_logs(); + + /* Duplicate of logs. */ + reqs_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_reqs = 0; + sends_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_sends = 0; + recvs_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_recvs = 0; + + for (size_t k = 0; k < nlogs; k++) { + struct mpiuse_log_entry *log = mpiuse_get_log(k); + if (log->rank == myrank && log->activation) { + if (log->type == task_type_send || log->type == task_type_recv) { + + /* And keep this log. */ + log->data = NULL; + reqs_queue[nr_reqs] = log; + nr_reqs++; + + /* Scale size. */ + log->size *= messagescale; + + } else { + error("task type '%d' is not a known send or recv task", log->type); + } + } + } + + /* Sort into increasing time. */ + qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs); + + /* Check. */ + for (int k = 0; k < nr_reqs - 1; k++) { + if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic) + message("reqs_queue: %lld > %lld", reqs_queue[k]->tic, + reqs_queue[k + 1]->tic); + } +} + +/** + * @brief usage help. + */ +static void usage(char *argv[]) { + fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n", + argv[0]); + fprintf(stderr, " options: -v verbose, -f fast injections\n"); + fflush(stderr); +} + +/** + * @brief main function. + */ +int main(int argc, char *argv[]) { + + /* Initiate MPI. */ + int prov = 0; + int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov); + if (res != MPI_SUCCESS) + error("Call to MPI_Init_thread failed with error %i.", res); + + int nr_nodes = 0; + res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes); + if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res); + + res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + if (res != MPI_SUCCESS) + error("Call to MPI_Comm_rank failed with error %i.", res); + + /* Handle the command-line, we expect a mpiuse data file to read and various + * options. */ + int opt; + while ((opt = getopt(argc, argv, "vds:")) != -1) { + switch (opt) { + case 'd': + datacheck = 1; + break; + case 's': + messagescale = atof(optarg); + break; + case 'v': + verbose = 1; + break; + default: + if (myrank == 0) usage(argv); + return 1; + } + } + if (optind >= argc - 1) { + if (myrank == 0) usage(argv); + return 1; + } + char *infile = argv[optind]; + char *logfile = argv[optind + 1]; + + /* Now we read the SWIFT MPI logger output that defines the communcations + * we will undertake and the time differences between injections into the + * queues. Note this has all ranks for a single steps, SWIFT outputs one MPI + * log per rank per step, so you need to combine all ranks from a step. */ + mpiuse_log_restore(infile); + int nranks = mpiuse_nr_ranks(); + + /* This should match the expected size. */ + if (nr_nodes != nranks) + error("The number of MPI ranks %d does not match the expected value %d", + nranks, nr_nodes); + + /* Create communicators for each subtype of the tasks. */ + for (int i = 0; i < task_subtype_count; i++) { + MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); + } + + /* Each rank requires its own queue, so extract them. */ + pick_logs(); + + /* Time to start time. Try to make it synchronous across the ranks. */ + clocks_set_cpufreq(0); + long long freq = clocks_get_cpufreq(); + MPI_Barrier(MPI_COMM_WORLD); + clocks_set_cpufreq(freq); + if (myrank == 0) { + message("Start of MPI tests"); + message("=================="); + if (verbose) { + if (datacheck) + message("checking data pattern on send and recv completion"); + } + } + + /* Make three threads, one for injecting recv tasks. One to do the sends and + * the other to poll for the recv completions. */ + + pthread_t injectthread; + if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0) + error("Failed to create injection thread."); + + // XXX need to wait for this phase to stop. + pthread_join(injectthread, NULL); + fflush(stdout); + MPI_Barrier(MPI_COMM_WORLD); + + pthread_t sendthread; + if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) + error("Failed to create send thread."); + pthread_t recvthread; + if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) + error("Failed to create recv thread."); + + /* Wait until all threads have exited and all MPI requests have completed. */ + pthread_join(sendthread, NULL); + pthread_join(recvthread, NULL); + + /* Dump the updated MPI logs. */ + MPI_Barrier(MPI_COMM_WORLD); + fflush(stdout); + if (myrank == 0) message("Dumping updated log"); + mpiuse_dump_logs(nranks, logfile); + + /* Shutdown MPI. */ + res = MPI_Finalize(); + if (res != MPI_SUCCESS) + error("call to MPI_Finalize failed with error %i.", res); + + if (myrank == 0) message("Bye"); + + return 0; +}