diff --git a/Makefile b/Makefile index 75a2e49df9978b6383cf26aa2ad88be12d501164..baf96f8bfdf00761304cf284993ed56ba0064369 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,13 @@ CFLAGS = -g -O0 -Wall -all: swiftmpistepsim +all: swiftmpistepsim swiftmpifakestepsim swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c mpicc $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c +swiftmpifakestepsim: swiftmpifakestepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c + mpicc $(CFLAGS) -o swiftmpifakestepsim swiftmpifakestepsim.c mpiuse.c clocks.c + clean: rm -f swiftmpistepsim + rm -f swiftmpifakestepsim diff --git a/mpiuse.c b/mpiuse.c index 6bd7b3116e0910b152e49112be86057bad80a0f6..8c52cb5efc717d1ea3c3b1209b8ec8c3764d2161 100644 --- a/mpiuse.c +++ b/mpiuse.c @@ -210,7 +210,7 @@ void mpiuse_dump_logs(int nranks, const char *dumpfile) { ticks basetics = 0; for (size_t k = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = &mpiuse_log[k]; - if (log->rank == myrank && log->endtic > 0) { + if (log->rank == myrank) { if (basetics == 0) basetics = log->tic; fprintf(fd, "%lld %.4f %.4f %.4f %.6f %d %d %d %d %d %d %zd %d %.4f %.6f " @@ -257,3 +257,27 @@ struct mpiuse_log_entry *mpiuse_get_log(int ind) { if (ind < mpiuse_log_count && ind >= 0) return &mpiuse_log[ind]; return NULL; } + +/** + * @brief generate a list of fake exchanges as mpiuse logs. + * + * @param nr_nodes the number of ranks that will be used. + * @param nr_logs the number of logs to generate per rank. + * @param size bytes per message + */ +void mpiuse_log_generate(int nr_nodes, int nr_logs, int size) { + + /* Each rank exchange messages with all the others with a fixed size. */ + int tag = 1; + for (int k = 0; k < nr_logs; k++) { + for (int i = 0; i < nr_nodes; i++) { + for (int j = 0; j < nr_nodes; j++) { + if (i != j) { + mpiuse_log_allocation(i, 1, k, SEND_TYPE, NO_SUBTYPE, 1, size, j, tag); + mpiuse_log_allocation(j, 1, k, RECV_TYPE, NO_SUBTYPE, 1, size, i, tag); + } + } + } + tag++; + } +} diff --git a/mpiuse.h b/mpiuse.h index 707ab27547b14fc701fcfd53254d29d603007891..6f201aa460cbd728183930391bbcabacb91ebd68 100644 --- a/mpiuse.h +++ b/mpiuse.h @@ -84,6 +84,13 @@ struct mpiuse_log_entry { ticks tmin; }; +/* Flags for the types of request when generating fakes. */ +#ifndef SEND_TYPE +#define SEND_TYPE 22 +#define RECV_TYPE 23 +#define NO_SUBTYPE 0 +#endif + /* API. */ void mpiuse_log_allocation(int rank, int step, size_t tic, int type, int subtype, int activation, size_t size, @@ -94,4 +101,6 @@ int mpiuse_nr_logs(void); int mpiuse_nr_ranks(void); void mpiuse_dump_logs(int nranks, const char *logfile); +void mpiuse_log_generate(int nr_nodes, int nr_logs, int size); + #endif /* SWIFT_MPIUSE_H */ diff --git a/swiftmpifakestepsim.c b/swiftmpifakestepsim.c new file mode 100644 index 0000000000000000000000000000000000000000..de832390607765c7ca6afb9238a0ed91f4698307 --- /dev/null +++ b/swiftmpifakestepsim.c @@ -0,0 +1,445 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2021 Peter W. Draper + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +#include <limits.h> +#include <mpi.h> +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> + +#include "atomic.h" +#include "clocks.h" +#include "error.h" +#include "mpiuse.h" + +/* Global: Our rank for all to see. */ +int myrank = -1; + +/* Are we verbose. */ +static int verbose = 0; + +/* Set a data pattern and check we get this back, slow... */ +static int datacheck = 0; + +/* The local queues. */ +static struct mpiuse_log_entry **volatile reqs_queue; +static int volatile ind_req = 0; +static int volatile nr_reqs = 0; +static int volatile injecting = 1; +static struct mpiuse_log_entry **volatile recvs_queue; +static int volatile nr_recvs = 0; +static int volatile ind_recv = 0; +static int volatile todo_recv = 0; +static struct mpiuse_log_entry **volatile sends_queue; +static int volatile nr_sends = 0; +static int volatile ind_send = 0; +static int volatile todo_send = 0; + +/** + * @brief fill a data area with a pattern that can be checked for changes. + * + * @param size size of data in bytes. + * @param data the data to fill. + */ +static void datacheck_fill(size_t size, void *data) { + unsigned char *p = (unsigned char *)data; + for (size_t i = 0; i < size; i++) { + p[i] = 170; /* 10101010 in bits. */ + } +} + +/** + * @brief test a filled data area for our pattern. + * + * @param size size of data in bytes. + * @param data the data to fill. + * + * @result 1 on success, 0 otherwise. + */ +static int datacheck_test(size_t size, void *data) { + unsigned char *p = (unsigned char *)data; + for (size_t i = 0; i < size; i++) { + if (p[i] != 170) return 0; + } + return 1; +} + +/** + * @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests. + */ +static void *inject_thread(void *arg) { + + if (verbose) message("%d: injection thread starts", *((int *)arg)); + ticks starttics = getticks(); + + while (ind_req < nr_reqs) { + struct mpiuse_log_entry *log = reqs_queue[ind_req]; + + /* Initialise new log elements. */ + log->done = 0; + log->nr_tests = 0; + log->tsum = 0.0; + log->tmax = 0; + log->tmin = INT_MAX; + log->endtic = 0; + log->injtic = getticks(); + + /* Differences to SWIFT: MPI_BYTE not the MPI_Type. */ + int err = 0; + if (log->type == SEND_TYPE) { + log->data = calloc(log->size, 1); + + /* Fill data with pattern. */ + if (datacheck) datacheck_fill(log->size, log->data); + + /* And send. */ + err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, + MPI_COMM_WORLD, &log->req); + + /* Add a new send request. */ + int ind = atomic_inc(&nr_sends); + sends_queue[ind] = log; + atomic_inc(&todo_send); + + } else { + + /* Ready to receive. */ + log->data = calloc(log->size, 1); + err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, + MPI_COMM_WORLD, &log->req); + + /* Add a new recv request. */ + int ind = atomic_inc(&nr_recvs); + recvs_queue[ind] = log; + atomic_inc(&todo_recv); + } + if (err != MPI_SUCCESS) error("Failed to activate send or recv"); + + ind_req++; + } + + /* All done, thread exiting. */ + if (verbose) { + message("%d injections completed, sends = %d, recvs = %d", ind_req, + nr_sends, nr_recvs); + message("remaining sends = %d, recvs = %d", todo_send, todo_recv); + } + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + atomic_dec(&injecting); + return NULL; +} + +/** + * @brief main loop to run over a queue of MPI requests and test for when they + * complete. Returns the total amount of time spent in calls to MPI_Test and + * the number of times it was called. + * + * @param logs the list of logs pointing to requests. + * @param nr_logs pointer to the variable containing the current number of + * logs. + * @param todos pointer to the variable containing the number of requests that + * are still active. + * @param sum the total number of ticks spent in calls to MPI_Test. + * @param ncalls the total number of calls to MPI_Test. + * @param mint the minimum ticks an MPI_Test call took. + * @param maxt the maximum ticks an MPI_Test call took. + */ +static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, + int volatile *todos, double *sum, int *ncalls, + ticks *mint, ticks *maxt) { + + /* Global MPI_Test statistics. */ + int lncalls = 0; + double lsum = 0.0; + ticks lmint = INT_MAX; + ticks lmaxt = 0; + + /* We loop while new requests are being injected and we still have requests + * to complete. */ + while (injecting || (!injecting && *todos > 0)) { + int nlogs = *nr_logs; + for (int k = 0; k < nlogs; k++) { + struct mpiuse_log_entry *log = logs[k]; + if (log != NULL && !log->done) { + ticks tics = getticks(); + int res; + MPI_Status stat; + int err = MPI_Test(&log->req, &res, &stat); + if (err != MPI_SUCCESS) { + error("MPI_Test call failed"); + } + + /* Increment etc. of statistics about time in MPI_Test. */ + ticks dt = getticks() - tics; + log->tsum += (double)dt; + lsum += (double)dt; + + log->nr_tests++; + lncalls++; + + if (dt < log->tmin) log->tmin = dt; + if (dt > log->tmax) log->tmax = dt; + if (dt < lmint) lmint = dt; + if (dt > lmaxt) lmaxt = dt; + + if (res) { + /* Check data sent data is unchanged and received data is as + * expected. */ + if (datacheck && !datacheck_test(log->size, log->data)) { + error("Data mismatch on completion"); + } + + /* Done, clean up. */ + log->done = 1; + log->endtic = getticks(); + free(log->data); + atomic_dec(todos); + } + } + } + } + + /* All done. */ + *sum = lsum; + *ncalls = lncalls; + *mint = lmint; + *maxt = lmaxt; + return; +} + +/** + * @brief Send thread, checks if MPI_Isend requests have completed. + */ +static void *send_thread(void *arg) { + + if (verbose) message("%d: send thread starts (%d)", *((int *)arg), + injecting); + ticks starttics = getticks(); + + int ncalls; + double sum; + ticks mint; + ticks maxt; + queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt); + + message( + "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " + "%.3f (%s)", + ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), + clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); + if (verbose) + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + + /* Thread exits. */ + return NULL; +} + +/** + * @brief Recv thread, checks if MPI_Irecv requests have completed. + */ +static void *recv_thread(void *arg) { + + if (verbose) message("%d: recv thread starts", *((int *)arg)); + ticks starttics = getticks(); + + int ncalls; + double sum; + ticks mint; + ticks maxt; + queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt); + + message( + "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " + "%.3f (%s)", + ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), + clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); + if (verbose) + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + + /* Thread exits. */ + return NULL; +} + +/** + * @brief Comparison function for logged times. + */ +static int cmp_logs(const void *p1, const void *p2) { + struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1; + struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2; + + /* Large unsigned values, so take care. */ + if (l1->tic > l2->tic) return 1; + if (l1->tic < l2->tic) return -1; + return 0; +} + +/** + * @brief Pick out the relevant logging data for our rank, i.e. all + * activations of sends and recvs. We ignore the original completions. + * The final list is sorted into increasing time of activation. + */ +static void pick_logs(void) { + size_t nlogs = mpiuse_nr_logs(); + + /* Duplicate of logs. */ + reqs_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_reqs = 0; + sends_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_sends = 0; + recvs_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_recvs = 0; + + for (int k = 0; k < nlogs; k++) { + struct mpiuse_log_entry *log = mpiuse_get_log(k); + if (log->rank == myrank && log->activation) { + log->data = NULL; + reqs_queue[nr_reqs] = log; + nr_reqs++; + } + } + + /* Sort into increasing time. */ + qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs); + + /* Check. */ + for (int k = 0; k < nr_reqs - 1; k++) { + if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic) + message("reqs_queue: %lld > %lld", reqs_queue[k]->tic, + reqs_queue[k + 1]->tic); + } +} + +/** + * @brief usage help. + */ +static void usage(char *argv[]) { + fprintf(stderr, "Usage: %s [-vf] nr_messages logfile.dat\n", + argv[0]); + fprintf(stderr, " options: -v verbose, -d data check, -s size (bytes)\n"); + fflush(stderr); +} + +/** + * @brief main function. + */ +int main(int argc, char *argv[]) { + + /* Initiate MPI. */ + int prov = 0; + int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov); + if (res != MPI_SUCCESS) + error("Call to MPI_Init_thread failed with error %i.", res); + + int nr_nodes = 0; + res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes); + if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res); + + res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + if (res != MPI_SUCCESS) + error("Call to MPI_Comm_rank failed with error %i.", res); + + /* Handle the command-line, we expect the number of messages to exchange per + * rank an output log and some options. */ + int size = 1024; + int opt; + while ((opt = getopt(argc, argv, "vds:")) != -1) { + switch (opt) { + case 'd': + datacheck = 1; + break; + case 's': + size = atoi(optarg); + break; + case 'v': + verbose = 1; + break; + default: + if (myrank == 0) usage(argv); + return 1; + } + } + if (optind >= argc -1) { + if (myrank == 0) usage(argv); + return 1; + } + + int nr_logs = atoi(argv[optind]); + if (nr_logs == 0) + error("Expected number of messages to exchange, got: %s", argv[optind]); + char *logfile = argv[optind + 1]; + + /* Generate the fake logs for the exchanges. */ + if (myrank == 0) message("Generating %d fake logs for %d ranks of size %d", + nr_logs, nr_nodes, size); + mpiuse_log_generate(nr_nodes, nr_logs, size); + int nranks = mpiuse_nr_ranks(); + + /* Each rank requires its own queue, so extract them. */ + pick_logs(); + + /* Time to start time. Try to make it synchronous across the ranks. */ + MPI_Barrier(MPI_COMM_WORLD); + clocks_set_cpufreq(0); + if (myrank == 0) { + message("Start of MPI tests"); + message("=================="); + if (verbose) { + if (datacheck) message("checking data pattern on send and recv completion"); + } + } + + /* Make three threads, one for injecting tasks and two to check for + * completions of the sends and recv independently. */ + pthread_t injectthread; + if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0) + error("Failed to create injection thread."); + pthread_t sendthread; + if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) + error("Failed to create send thread."); + pthread_t recvthread; + if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) + error("Failed to create recv thread."); + + /* Wait until all threads have exited and all MPI requests have completed. */ + pthread_join(injectthread, NULL); + pthread_join(sendthread, NULL); + pthread_join(recvthread, NULL); + + /* Dump the updated MPI logs. */ + MPI_Barrier(MPI_COMM_WORLD); + fflush(stdout); + if (myrank == 0) message("Dumping updated log"); + mpiuse_dump_logs(nranks, logfile); + + /* Shutdown MPI. */ + res = MPI_Finalize(); + if (res != MPI_SUCCESS) + error("call to MPI_Finalize failed with error %i.", res); + + if (myrank == 0) message("Bye"); + + return 0; +}