From 71a0d978f16e5ad3f6cb9a4d21647fdcd7125352 Mon Sep 17 00:00:00 2001 From: "Peter W. Draper" <p.w.draper@durham.ac.uk> Date: Thu, 12 Mar 2020 16:07:58 +0000 Subject: [PATCH] Abstract swiftmpiproxies into a non-SWIFT code --- exchange-test.c | 352 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 exchange-test.c diff --git a/exchange-test.c b/exchange-test.c new file mode 100644 index 0000000..7c70c23 --- /dev/null +++ b/exchange-test.c @@ -0,0 +1,352 @@ +/******************************************************************************* + * Copyright (c) 2020 Peter W. Draper + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +#include <limits.h> +#include <mpi.h> +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> + +/* Exit in error macro. */ +#define error(s, ...) \ + ({ \ + fflush(stdout); \ + fprintf(stderr, "[%03i] %s:%s():%i: " s "\n", myrank, \ + __FILE__, __FUNCTION__, __LINE__, \ + ##__VA_ARGS__); \ + MPI_Abort(MPI_COMM_WORLD, -1); \ + }) + +/* Print a message */ +#define message(s, ...) \ + ({ \ + fprintf(stdout, "[%04i] %s: " s "\n", myrank, \ + __FUNCTION__, ##__VA_ARGS__); \ + }) + +/* Print MPI error as a string. */ +#define mpi_error_string(res, s, ...) \ + ({ \ + fprintf(stderr, "[%03i] %s:%s():%i: " s "\n", myrank, \ + __FILE__, __FUNCTION__, __LINE__, \ + ##__VA_ARGS__); \ + int len = 1024; \ + char buf[len]; \ + MPI_Error_string(res, buf, &len); \ + fprintf(stdout, "%s\n\n", buf); \ + }) + + +/* Global: Our rank for all to see. */ +int myrank = -1; + +/* Are we verbose. */ +static int verbose = 0; + +/* Maximum main loops. */ +static int maxloops = 1000; + +/* Size of data to exchange. 32 MB on 16 ranks fails on COSMA Mellanox. */ +static int datasize = 32 * 1024 * 1024; + +/* Tag arithmetic. */ +#define tag_shift 8 +#define tag_size 0 +#define tag_data 1 + +/** + * @brief fill a data area with a pattern that can be checked for changes. + * + * @param fill value used in fill, note data type. + * @param size size of data in bytes. + * @param data the data to fill. + */ +static void datacheck_fill(unsigned char fill, size_t size, void *data) { + unsigned char *p = (unsigned char *)data; + for (size_t i = 0; i < size; i++) { + p[i] = fill; + } +} + +/** + * @brief test a filled data area for the given value. Returns 0 if not found + * in all elements of data. + * + * @param fill value used in fill, note data type. + * @param size size of data in bytes. + * @param data the data to fill. + * + * @result 1 on success, 0 otherwise. + */ +static int datacheck_test(unsigned char fill, size_t size, void *data) { + unsigned char *p = (unsigned char *)data; + for (size_t i = 0; i < size; i++) { + if (p[i] != fill) { + if (verbose) { + message("%d != %d", p[i], fill); + fflush(stdout); + } + return 0; + } + } + return 1; +} + +/** + * @brief check a data area reporting some statistics about the content. + * + * Assumes datacheck_test() has already failed. + * + * @param size size of data in bytes. + * @param data the data to fill. + */ +static void datacheck_fulltest(size_t size, void *data) { + + unsigned char *p = (unsigned char *)data; + double sum = 0.0; + unsigned char pmin = 255; + unsigned char pmax = 0; + for (size_t i = 0; i < size; i++) { + sum += p[i]; + if (p[i] > pmax) pmax = p[i]; + if (p[i] < pmin) pmin = p[i]; + } + message("sum: %.2f, mean: %.2f, min: %d, max: %d", sum, sum / (double)size, + pmin, pmax); +} + +/** + * @brief usage help. + */ +static void usage(char *argv[]) { + fprintf(stderr, "Usage: %s\n", argv[0]); + fprintf(stderr, " options: -s size -n maxloops, -v verbose\n"); + fflush(stderr); +} + +/** + * @brief main function. + */ +int main(int argc, char *argv[]) { + + /* Initiate MPI. */ + int prov = 0; + int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov); + if (res != MPI_SUCCESS) + error("Call to MPI_Init_thread failed with error %i.", res); + + /* How many ranks are communicating? */ + int nr_nodes = 0; + res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes); + if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res); + + res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + if (res != MPI_SUCCESS) + error("Call to MPI_Comm_rank failed with error %i.", res); + + /* Handle the command-line options. */ + int opt; + while ((opt = getopt(argc, argv, "vn:s:")) != -1) { + switch (opt) { + case 'n': + maxloops = atoi(optarg); + break; + case 'v': + verbose = 1; + break; + case 's': + datasize = atoi(optarg); + break; + default: + if (myrank == 0) usage(argv); + return 1; + } + } + + /* Space for the in flight requests. */ + MPI_Request req_send_size[nr_nodes]; + MPI_Request req_recv_size[nr_nodes]; + MPI_Request req_data_out[nr_nodes]; + MPI_Request req_data_in[nr_nodes]; + void *send_datavalues[nr_nodes]; + void *recv_datavalues[nr_nodes]; + int send_datasizes[nr_nodes]; + int recv_datasizes[nr_nodes]; + + /* Loop over all the exchanges between nodes. This simulates the proxy cell + * exchanges in SWIFT that fails with Intel 2020. */ + for (int nloop = 0; nloop < maxloops; nloop++) { + MPI_Barrier(MPI_COMM_WORLD); + if (myrank == 0) + message("*** Proxy simulation exchange loop: %d ***", nloop); + + /* Note in SWIFT we use the threadpool to launch these. */ + for (int k = 0; k < nr_nodes; k++) { + if (k != myrank) { + + /* Base value for tags for sending size and data. */ + int basetag = myrank * tag_shift; + + /* First send the size of the data we're about to exchange. */ + send_datasizes[k] = datasize; + res = MPI_Isend(&send_datasizes[k], 1, MPI_INT, k, + basetag + tag_size, MPI_COMM_WORLD, + &req_send_size[k]); + if (res != MPI_SUCCESS) error("MPI_Isend failed to send size."); + + /* Start Isend of data which we fill with our rank value. */ + send_datavalues[k] = malloc(datasize); + datacheck_fill(myrank, datasize, send_datavalues[k]); + + res = MPI_Isend(send_datavalues[k], datasize, MPI_BYTE, k, + basetag + tag_data, MPI_COMM_WORLD, + &req_data_out[k]); + if (res != MPI_SUCCESS) error("MPI_Isend failed to send data."); + + /* Start Irecv to receive datasize from the other rank. */ + basetag = k * tag_shift; + + res = MPI_Irecv(&recv_datasizes[k], 1, MPI_INT, k, + basetag + tag_size, MPI_COMM_WORLD, + &req_recv_size[k]); + if (res != MPI_SUCCESS) error("MPI_Irecv failed to listen for size."); + } else { + + /* No requests from ourselves. */ + req_send_size[k] = MPI_REQUEST_NULL; + req_data_out[k] = MPI_REQUEST_NULL; + req_recv_size[k] = MPI_REQUEST_NULL; + } + + /* Clear associated request for receiving the data. */ + req_data_in[k] = MPI_REQUEST_NULL; + } + if (verbose) + message("All requests are launched"); + + /* Now wait for any of the datasize irecvs to complete and then create + * the irecv to receive the data. */ + for (int k = 0; k < nr_nodes - 1; k++) { + int pid = MPI_UNDEFINED; + MPI_Status status; + + res = MPI_Waitany(nr_nodes, req_recv_size, &pid, &status); + if (res != MPI_SUCCESS || pid == MPI_UNDEFINED) + error("MPI_Waitany failed for sizes."); + if (verbose) message("Size received from node %d", pid); + + int basetag = pid * tag_shift; + + recv_datavalues[pid] = malloc(recv_datasizes[pid]); + + /* Fill data with our rank, so we can test when it has been modified. */ + datacheck_fill(myrank, recv_datasizes[pid], recv_datavalues[pid]); + + res = MPI_Irecv(recv_datavalues[pid], recv_datasizes[pid], MPI_BYTE, + pid, basetag + tag_data, MPI_COMM_WORLD, + &req_data_in[pid]); + if (res != MPI_SUCCESS) error("MPI_Irecv failed when listening for data."); + } + if (verbose) + message("All data sizes have arrived, data irecvs are launched"); + + /* Waitall for all Isend sizes to complete. */ + res = MPI_Waitall(nr_nodes, req_send_size, MPI_STATUSES_IGNORE); + if (res != MPI_SUCCESS) error("Waitall for Isend completions failed."); + if (verbose) + message("All sends of sizes have completed"); + + /* Now wait for the data exchange irecvs to complete. */ + for (int k = 0; k < nr_nodes - 1; k++) { + int pid = MPI_UNDEFINED; + MPI_Status status; + + res = MPI_Waitany(nr_nodes, req_data_in, &pid, &status); + if (res != MPI_SUCCESS || pid == MPI_UNDEFINED) + error("MPI_Waitany failed for data."); + + /* Check the data received is correct. Should be filled with + * the rank of sender and not our rank. */ + if (!datacheck_test(pid, recv_datasizes[pid], recv_datavalues[pid])) { + message("Received data is not correct, not filled with sender " + "rank value"); + + /* Report the tag and source of the request. */ + int expected_tag = pid * tag_shift + tag_data; + message("sent from rank %d, with tag %d/%d and error code %d", + status.MPI_SOURCE, status.MPI_TAG, expected_tag, + status.MPI_ERROR); + + /* Shouldn't happen, but has been seen. */ + if (status.MPI_ERROR != MPI_SUCCESS) + mpi_error_string(status.MPI_ERROR, "unexpected MPI status"); + + /* Make a report on what the buffer contains. */ + datacheck_fulltest(recv_datasizes[pid], recv_datavalues[pid]); + + /* This call will succeed, if the receive buffer has not been + * updated. */ + if (datacheck_test(myrank, recv_datasizes[pid], recv_datavalues[pid])) { + message("Received data buffer has not been modified"); + fflush(stdout); + error("Failed"); + } else { + message("Received data is corrupt"); + fflush(stdout); + error("Failed"); + } + } else { + if (verbose) message("Received data is correct"); + } + free(recv_datavalues[pid]); + recv_datavalues[pid] = NULL; + } + if (verbose) + message("All the data exchanges have completed"); + + /* Waitall for data Isends to complete. */ + res = MPI_Waitall(nr_nodes, req_data_out, MPI_STATUSES_IGNORE); + if (res != MPI_SUCCESS) error("Waitall for data Isends failed."); + if (verbose) + message("All sends of data have completed"); + + /* Check data is unmodified while being offloaded. */ + for (int k = 0; k < nr_nodes; k++) { + if (k != myrank) { + if (!datacheck_test(myrank, send_datasizes[k], send_datavalues[k])) { + datacheck_fulltest(send_datasizes[k], send_datavalues[k]); + error("Sent data has been corrupted"); + } else { + if (verbose) message("Sent data is correct"); + } + free(send_datavalues[k]); + send_datavalues[k] = NULL; + } + } + } /* nloop */ + + /* Shutdown MPI. */ + res = MPI_Finalize(); + if (res != MPI_SUCCESS) + error("call to MPI_Finalize failed with error %i.", res); + + if (myrank == 0) message("All done, no errors detected"); + + return 0; +} -- GitLab