diff --git a/.gitignore b/.gitignore index afa445084e6ff202c6d10f1a42d9af832100a8b4..85e218bbbd88df18d80801973279146558eb97c1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ swiftmpistepsim +notswiftmpistepsim +notswiftmpistepsim2 *.o *~ diff --git a/Makefile b/Makefile index 28d8855db2223d062079e0ea26085316bbd64d88..36e8cdc3d1396819321afc3af37b22dac028c945 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ CFLAGS = -g -O0 -Wall -all: swiftmpistepsim notswiftmpistepsim +all: swiftmpistepsim notswiftmpistepsim notswiftmpistepsim2 swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c mpicc $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c @@ -8,6 +8,10 @@ swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h c notswiftmpistepsim: notswiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c mpicc $(CFLAGS) -o notswiftmpistepsim notswiftmpistepsim.c mpiuse.c clocks.c +notswiftmpistepsim2: notswiftmpistepsim2.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c + mpicc $(CFLAGS) -o notswiftmpistepsim2 notswiftmpistepsim2.c mpiuse.c clocks.c + clean: rm -f swiftmpistepsim rm -f notswiftmpistepsim + rm -f notswiftmpistepsim2 diff --git a/notswiftmpistepsim2.c b/notswiftmpistepsim2.c new file mode 100644 index 0000000000000000000000000000000000000000..4e9f96c50bc0cc019e028dacb17728054762c909 --- /dev/null +++ b/notswiftmpistepsim2.c @@ -0,0 +1,616 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2020 Peter W. Draper + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +// XXX variation in which we join injection and completion into same threads. +// and also restrict the communicators to 2. +#include <limits.h> +#include <mpi.h> +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> + +#include "atomic.h" +#include "clocks.h" +#include "error.h" +#include "mpiuse.h" + +/* Global: Our rank for all to see. */ +int myrank = -1; + +/* Are we verbose. */ +static int verbose = 0; + +/* Attempt to keep original injection time differences. */ +static int usetics = 1; + +/* Scale to apply to the size of the messages we send. */ +static float messagescale = 1.0; + +/* Set a data pattern and check we get this back, slow... */ +static int datacheck = 0; + +/* The maximum tag value in logs, rounded up to the next power of 2. */ +static int maxtag = 0; + +/* Integer types of send and recv tasks, must match log. */ +static const int task_type_send = 22; +static const int task_type_recv = 23; + +/* Global communicators for each thread. */ +static MPI_Comm subtypeMPI_comms[2]; + +/* The local queues. We need to partition these to keep the MPI communications + * separate so we can use the MPI_THREAD_SPLIT option of the Intel 2020 MPI + * library. */ +static struct mpiuse_log_entry **volatile recv_logs; +static int volatile nr_recv_logs = 0; + +static struct mpiuse_log_entry **volatile send_logs; +static int volatile nr_send_logs = 0; + +static struct mpiuse_log_entry **volatile recvs_queue; +static int volatile nr_recvs_queue = 0; + +static struct mpiuse_log_entry **volatile sends_queue; +static int volatile nr_sends_queue = 0; + +/* CPU frequency of the machine that created the MPI log. */ +// XXX need to store this in the data file. +static double log_clocks_cpufreq = 2194844448.0; + +/** + * @brief fill a data area with a pattern that can be checked for changes. + * + * @param size size of data in bytes. + * @param data the data to fill. + */ +static void datacheck_fill(size_t size, void *data) { + unsigned char *p = (unsigned char *)data; + for (size_t i = 0; i < size; i++) { + p[i] = 170; /* 10101010 in bits. */ + } +} + +/** + * @brief test a filled data area for our pattern. + * + * @param size size of data in bytes. + * @param data the data to fill. + * + * @result 1 on success, 0 otherwise. + */ +static int datacheck_test(size_t size, void *data) { + unsigned char *p = (unsigned char *)data; + for (size_t i = 0; i < size; i++) { + if (p[i] != 170) return 0; + } + return 1; +} + +/** + * @brief send nitiates MPI_Isend requests and the waits for completion. + */ +static void *send_thread(void *arg) { + + if (verbose) message("send thread starts"); + ticks starttics = getticks(); + + /* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */ + ticks basetic = send_logs[0]->tic; + ticks looptics = 0; + double deadtime = 0.0; + struct timespec sleep; + sleep.tv_sec = 0; + + for (int k = 0; k < nr_send_logs; k++) { + struct mpiuse_log_entry *log = send_logs[k]; + + if (usetics) { + /* Expect time between this request and the previous one. */ + ticks dt = log->tic - basetic; + basetic = log->tic; + + /* We guess some time below which we should not attempt to wait, + * otherwise we'll start to overrun, and just inject the next call if we + * are below that (we time the ticks this loop takes without any waiting + * and use that). Otherwise we wait a while. Note we need to convert the + * ticks of the log file into nanoseconds, that requires the original + * CPU frequency. */ + if (dt > looptics) { + + /* Remember to be fair and remove the looptics, then convert to + * nanoseconds. */ + double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9; + if (ns < 1.0e9) { + sleep.tv_nsec = (long)ns; + } else { + /* Wait more than one second. Must be an error, but complain and + * continue. */ + sleep.tv_nsec = (long)1.0e9; + message("wait greater than one second"); + } + nanosleep(&sleep, NULL); + deadtime += sleep.tv_nsec; + } + } + + /* Initialise new log elements. */ + log->done = 0; + log->nr_tests = 0; + log->tsum = 0.0; + log->tmax = 0; + log->tmin = INT_MAX; + log->endtic = 0; + log->injtic = getticks(); + + /* Differences to SWIFT: MPI_BYTE not the MPI_Type. */ + int err = 0; + log->data = calloc(log->size, 1); + + /* Fill data with pattern. */ + if (datacheck) datacheck_fill(log->size, log->data); + + /* And send. Note using different communicator so need to construct + * different, still unique and computable, tag.*/ + err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, + log->tag + maxtag * log->subtype, + subtypeMPI_comms[0], &log->req); + + /* Add a new send request. */ + sends_queue[nr_sends_queue] = log; + nr_sends_queue++; + } + if (verbose) + message("send injections completed: %d", nr_sends_queue); + + + /* Now we wait for the sends to complete. */ + /* Global MPI_Test statistics. */ + int ncalls = 0; + double sum = 0.0; + ticks mint = INT_MAX; + ticks maxt = 0; + + /* We loop while requests we still have requests to complete. */ + int todo = nr_sends_queue; + while (todo > 0) { + for (int k = 0; k < nr_sends_queue; k++) { + struct mpiuse_log_entry *log = sends_queue[k]; + if (log != NULL && !log->done) { + ticks tics = getticks(); + int res; + MPI_Status stat; + int err = MPI_Test(&log->req, &res, &stat); + if (err != MPI_SUCCESS) { + error("MPI_Test call failed"); + } + + /* Increment etc. of statistics about time in MPI_Test. */ + ticks dt = getticks() - tics; + log->tsum += (double)dt; + sum += (double)dt; + + log->nr_tests++; + ncalls++; + + if (dt < log->tmin) log->tmin = dt; + if (dt > log->tmax) log->tmax = dt; + if (dt < mint) mint = dt; + if (dt > maxt) maxt = dt; + + if (res) { + /* Check data sent data is unchanged and received data is as + * expected. */ + if (datacheck && !datacheck_test(log->size, log->data)) { + error("Data mismatch on completion"); + } + + /* Done, clean up. */ + log->done = 1; + log->endtic = getticks(); + free(log->data); + + /* One down. */ + todo--; + } + } + } + } + + /* All done. */ + message("sends completed"); + message( + "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, " + "max time %.3f (%s)", + ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), + clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); + if (verbose) + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + + return NULL; +} + +/** + * @brief send nitiates MPI_Irecv requests and the waits for completion. + */ +static void *recv_thread(void *arg) { + + if (verbose) message("recv thread starts"); + ticks starttics = getticks(); + + /* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */ + ticks basetic = recv_logs[0]->tic; + ticks looptics = 0; + double deadtime = 0.0; + struct timespec sleep; + sleep.tv_sec = 0; + + for (int k = 0; k < nr_recv_logs; k++) { + struct mpiuse_log_entry *log = recv_logs[k]; + + if (usetics) { + /* Expect time between this request and the previous one. */ + ticks dt = log->tic - basetic; + basetic = log->tic; + + /* We guess some time below which we should not attempt to wait, + * otherwise we'll start to overrun, and just inject the next call if we + * are below that (we time the ticks this loop takes without any waiting + * and use that). Otherwise we wait a while. Note we need to convert the + * ticks of the log file into nanoseconds, that requires the original + * CPU frequency. */ + if (dt > looptics) { + + /* Remember to be fair and remove the looptics, then convert to + * nanoseconds. */ + double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9; + if (ns < 1.0e9) { + sleep.tv_nsec = (long)ns; + } else { + /* Wait more than one second. Must be an error, but complain and + * continue. */ + sleep.tv_nsec = (long)1.0e9; + message("wait greater than one second"); + } + nanosleep(&sleep, NULL); + deadtime += sleep.tv_nsec; + } + } + + /* Initialise new log elements. */ + log->done = 0; + log->nr_tests = 0; + log->tsum = 0.0; + log->tmax = 0; + log->tmin = INT_MAX; + log->endtic = 0; + log->injtic = getticks(); + + /* Differences to SWIFT: MPI_BYTE not the MPI_Type. */ + int err = 0; + log->data = calloc(log->size, 1); + + /* Fill data with pattern. */ + if (datacheck) datacheck_fill(log->size, log->data); + + /* And listen for send. Note using different communicator so need to + * construct different, still unique and computable, tag.*/ + err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, + log->tag + maxtag * log->subtype, + subtypeMPI_comms[1], &log->req); + + /* Add a new recv request. */ + recvs_queue[nr_recvs_queue] = log; + nr_recvs_queue++; + } + if (verbose) + message("recv injections completed: %d", nr_recvs_queue); + + + /* Now we wait for the recvs to complete. */ + /* Global MPI_Test statistics. */ + int ncalls = 0; + double sum = 0.0; + ticks mint = INT_MAX; + ticks maxt = 0; + + /* We loop while requests we still have requests to complete. */ + int todo = nr_recvs_queue; + while (todo > 0) { + for (int k = 0; k < nr_recvs_queue; k++) { + struct mpiuse_log_entry *log = recvs_queue[k]; + if (log != NULL && !log->done) { + ticks tics = getticks(); + int res; + MPI_Status stat; + int err = MPI_Test(&log->req, &res, &stat); + if (err != MPI_SUCCESS) { + error("MPI_Test call failed"); + } + + /* Increment etc. of statistics about time in MPI_Test. */ + ticks dt = getticks() - tics; + log->tsum += (double)dt; + sum += (double)dt; + + log->nr_tests++; + ncalls++; + + if (dt < log->tmin) log->tmin = dt; + if (dt > log->tmax) log->tmax = dt; + if (dt < mint) mint = dt; + if (dt > maxt) maxt = dt; + + if (res) { + /* Check data sent data is unchanged and received data is as + * expected. */ + if (datacheck && !datacheck_test(log->size, log->data)) { + error("Data mismatch on completion"); + } + + /* Done, clean up. */ + log->done = 1; + log->endtic = getticks(); + free(log->data); + + /* One down. */ + todo--; + } + } + } + } + + /* All done. */ + message("recvs completed"); + message( + "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, " + "max time %.3f (%s)", + ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), + clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); + if (verbose) + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + + return NULL; +} + +/** + * @brief Comparison function for logged times. + */ +static int cmp_logs(const void *p1, const void *p2) { + struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1; + struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2; + + /* Large unsigned values, so take care. */ + if (l1->tic > l2->tic) return 1; + if (l1->tic < l2->tic) return -1; + return 0; +} + +/** + * @brief Pick out the relevant logging data for our rank, i.e. all + * activations of sends and recvs, which we separate for processing + * in their own threads. + */ +static void pick_logs(void) { + size_t nlogs = mpiuse_nr_logs(); + + /* Duplicates of logs. */ + recv_logs = (struct mpiuse_log_entry **) + calloc(nlogs, sizeof(struct mpiuse_log_entry *)); + nr_recv_logs = 0; + + send_logs = (struct mpiuse_log_entry **) + calloc(nlogs, sizeof(struct mpiuse_log_entry *)); + nr_send_logs = 0; + + sends_queue = (struct mpiuse_log_entry **) + calloc(nlogs, sizeof(struct mpiuse_log_entry *)); + nr_sends_queue = 0; + + recvs_queue = (struct mpiuse_log_entry **) + calloc(nlogs, sizeof(struct mpiuse_log_entry *)); + nr_recvs_queue = 0; + + /* Select the send and recv logs that are initiators. */ + int nr_logs = 0; + for (int k = 0; k < nlogs; k++) { + struct mpiuse_log_entry *log = mpiuse_get_log(k); + + /* Need the maximum tag for all records to match between ranks. */ + if (log->tag > maxtag) maxtag = log->tag; + + if (log->rank == myrank && log->activation) { + if (log->type == task_type_send || log->type == task_type_recv) { + + /* Scale size. */ + log->size *= messagescale ; + + /* And keep this log. */ + log->data = NULL; + if (log->type == task_type_send) { + send_logs[nr_send_logs] = log; + nr_send_logs++; + } else { + recv_logs[nr_recv_logs] = log; + nr_recv_logs++; + } + nr_logs++; + + } else { + error("task type '%d' is not a known send or recv task", log->type); + } + } + } + message("Read %d active send and recv logs from input file", nr_logs); + + /* Sort into increasing time. */ + qsort(send_logs, nr_send_logs, sizeof(struct mpiuse_log_entry *), + cmp_logs); + qsort(recv_logs, nr_recv_logs, sizeof(struct mpiuse_log_entry *), + cmp_logs); + + /* Maximum tag rounded to next power of 2. */ + int power = 1; + while (power < maxtag) power = power << 1; + if (verbose) message("raw maxtag = %d, rounded maxtag = %d", maxtag, power); + maxtag = power; +} + +/** + * @brief usage help. + */ +static void usage(char *argv[]) { + fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n", + argv[0]); + fprintf(stderr, " options: -v verbose, -f fast injections, " + "-s message size (bytes)\n"); + fflush(stderr); +} + +/** + * @brief main function. + */ +int main(int argc, char *argv[]) { + + /* Initiate MPI. */ + int prov = 0; + int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov); + if (res != MPI_SUCCESS) + error("Call to MPI_Init_thread failed with error %i.", res); + + int nr_nodes = 0; + res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes); + if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res); + + res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + if (res != MPI_SUCCESS) + error("Call to MPI_Comm_rank failed with error %i.", res); + + /* Handle the command-line, we expect a mpiuse data file to read and various + * options. */ + int opt; + while ((opt = getopt(argc, argv, "vfds:")) != -1) { + switch (opt) { + case 'd': + datacheck = 1; + break; + case 'f': + usetics = 0; + break; + case 'v': + verbose = 1; + break; + case 's': + messagescale = atof(optarg); + break; + default: + if (myrank == 0) usage(argv); + return 1; + } + } + if (optind >= argc - 1) { + if (myrank == 0) usage(argv); + return 1; + } + char *infile = argv[optind]; + char *logfile = argv[optind + 1]; + + /* Initialize time for messages and timers. */ + clocks_set_cpufreq(0); + + /* Now we read the SWIFT MPI logger output that defines the communcations + * we will undertake and the time differences between injections into the + * queues. Note this has all ranks for a single steps, SWIFT outputs one MPI + * log per rank per step, so you need to combine all ranks from a step. */ + mpiuse_log_restore(infile); + int nranks = mpiuse_nr_ranks(); + + /* This should match the expected size. */ + if (nr_nodes != nranks) + error("The number of MPI ranks %d does not match the expected value %d", + nranks, nr_nodes); + + /* Create communicators for each subtype, these are associated with the + * threads somewhat randomly, we should seek to balance the work. */ + for (int i = 0; i < 2; i++) { + MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); + char str[8] = {0}; + sprintf(str, "%d", i); + MPI_Info info; + MPI_Info_create(&info); + MPI_Info_set(info, "thread_id", str); + MPI_Comm_set_info(subtypeMPI_comms[i], info); + MPI_Info_free(&info); + } + if (verbose) { + MPI_Barrier(MPI_COMM_WORLD); + fflush(stdout); + } + + /* Each rank requires its own queue, so extract them. */ + pick_logs(); + + /* Try to make it synchronous across the ranks and reset time to zero. */ + MPI_Barrier(MPI_COMM_WORLD); + clocks_set_cpufreq(0); + if (myrank == 0) { + message("Start of MPI tests"); + message("=================="); + if (messagescale != 1.0f) { + message(" "); + message(" Using message scale of %f", messagescale); + } + if (verbose) { + if (!usetics) message("using fast untimed injections"); + if (datacheck) + message("checking data pattern on send and recv completion"); + } + } + + + /* Need two threads, one for the sends and one for the recvs. */ + pthread_t sendthread; + pthread_t recvthread; + if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) + error("Failed to create send thread."); + if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) + error("Failed to create recv thread."); + + /* Wait until all threads have exited and all MPI requests have completed. */ + pthread_join(sendthread, NULL); + pthread_join(recvthread, NULL); + + /* Dump the updated MPI logs. */ + MPI_Barrier(MPI_COMM_WORLD); + fflush(stdout); + if (myrank == 0) message("Dumping updated log"); + mpiuse_dump_logs(nranks, logfile); + + /* Shutdown MPI. */ + res = MPI_Finalize(); + if (res != MPI_SUCCESS) + error("call to MPI_Finalize failed with error %i.", res); + + if (myrank == 0) message("Bye"); + + return 0; +}