Skip to content
Snippets Groups Projects
Commit 71a0d978 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Abstract swiftmpiproxies into a non-SWIFT code

parent 3b09861a
No related branches found
No related tags found
No related merge requests found
/*******************************************************************************
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
/* Exit in error macro. */
#define error(s, ...) \
({ \
fflush(stdout); \
fprintf(stderr, "[%03i] %s:%s():%i: " s "\n", myrank, \
__FILE__, __FUNCTION__, __LINE__, \
##__VA_ARGS__); \
MPI_Abort(MPI_COMM_WORLD, -1); \
})
/* Print a message */
#define message(s, ...) \
({ \
fprintf(stdout, "[%04i] %s: " s "\n", myrank, \
__FUNCTION__, ##__VA_ARGS__); \
})
/* Print MPI error as a string. */
#define mpi_error_string(res, s, ...) \
({ \
fprintf(stderr, "[%03i] %s:%s():%i: " s "\n", myrank, \
__FILE__, __FUNCTION__, __LINE__, \
##__VA_ARGS__); \
int len = 1024; \
char buf[len]; \
MPI_Error_string(res, buf, &len); \
fprintf(stdout, "%s\n\n", buf); \
})
/* Global: Our rank for all to see. */
int myrank = -1;
/* Are we verbose. */
static int verbose = 0;
/* Maximum main loops. */
static int maxloops = 1000;
/* Size of data to exchange. 32 MB on 16 ranks fails on COSMA Mellanox. */
static int datasize = 32 * 1024 * 1024;
/* Tag arithmetic. */
#define tag_shift 8
#define tag_size 0
#define tag_data 1
/**
* @brief fill a data area with a pattern that can be checked for changes.
*
* @param fill value used in fill, note data type.
* @param size size of data in bytes.
* @param data the data to fill.
*/
static void datacheck_fill(unsigned char fill, size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
p[i] = fill;
}
}
/**
* @brief test a filled data area for the given value. Returns 0 if not found
* in all elements of data.
*
* @param fill value used in fill, note data type.
* @param size size of data in bytes.
* @param data the data to fill.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(unsigned char fill, size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
if (p[i] != fill) {
if (verbose) {
message("%d != %d", p[i], fill);
fflush(stdout);
}
return 0;
}
}
return 1;
}
/**
* @brief check a data area reporting some statistics about the content.
*
* Assumes datacheck_test() has already failed.
*
* @param size size of data in bytes.
* @param data the data to fill.
*/
static void datacheck_fulltest(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
double sum = 0.0;
unsigned char pmin = 255;
unsigned char pmax = 0;
for (size_t i = 0; i < size; i++) {
sum += p[i];
if (p[i] > pmax) pmax = p[i];
if (p[i] < pmin) pmin = p[i];
}
message("sum: %.2f, mean: %.2f, min: %d, max: %d", sum, sum / (double)size,
pmin, pmax);
}
/**
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s\n", argv[0]);
fprintf(stderr, " options: -s size -n maxloops, -v verbose\n");
fflush(stderr);
}
/**
* @brief main function.
*/
int main(int argc, char *argv[]) {
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
/* How many ranks are communicating? */
int nr_nodes = 0;
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Handle the command-line options. */
int opt;
while ((opt = getopt(argc, argv, "vn:s:")) != -1) {
switch (opt) {
case 'n':
maxloops = atoi(optarg);
break;
case 'v':
verbose = 1;
break;
case 's':
datasize = atoi(optarg);
break;
default:
if (myrank == 0) usage(argv);
return 1;
}
}
/* Space for the in flight requests. */
MPI_Request req_send_size[nr_nodes];
MPI_Request req_recv_size[nr_nodes];
MPI_Request req_data_out[nr_nodes];
MPI_Request req_data_in[nr_nodes];
void *send_datavalues[nr_nodes];
void *recv_datavalues[nr_nodes];
int send_datasizes[nr_nodes];
int recv_datasizes[nr_nodes];
/* Loop over all the exchanges between nodes. This simulates the proxy cell
* exchanges in SWIFT that fails with Intel 2020. */
for (int nloop = 0; nloop < maxloops; nloop++) {
MPI_Barrier(MPI_COMM_WORLD);
if (myrank == 0)
message("*** Proxy simulation exchange loop: %d ***", nloop);
/* Note in SWIFT we use the threadpool to launch these. */
for (int k = 0; k < nr_nodes; k++) {
if (k != myrank) {
/* Base value for tags for sending size and data. */
int basetag = myrank * tag_shift;
/* First send the size of the data we're about to exchange. */
send_datasizes[k] = datasize;
res = MPI_Isend(&send_datasizes[k], 1, MPI_INT, k,
basetag + tag_size, MPI_COMM_WORLD,
&req_send_size[k]);
if (res != MPI_SUCCESS) error("MPI_Isend failed to send size.");
/* Start Isend of data which we fill with our rank value. */
send_datavalues[k] = malloc(datasize);
datacheck_fill(myrank, datasize, send_datavalues[k]);
res = MPI_Isend(send_datavalues[k], datasize, MPI_BYTE, k,
basetag + tag_data, MPI_COMM_WORLD,
&req_data_out[k]);
if (res != MPI_SUCCESS) error("MPI_Isend failed to send data.");
/* Start Irecv to receive datasize from the other rank. */
basetag = k * tag_shift;
res = MPI_Irecv(&recv_datasizes[k], 1, MPI_INT, k,
basetag + tag_size, MPI_COMM_WORLD,
&req_recv_size[k]);
if (res != MPI_SUCCESS) error("MPI_Irecv failed to listen for size.");
} else {
/* No requests from ourselves. */
req_send_size[k] = MPI_REQUEST_NULL;
req_data_out[k] = MPI_REQUEST_NULL;
req_recv_size[k] = MPI_REQUEST_NULL;
}
/* Clear associated request for receiving the data. */
req_data_in[k] = MPI_REQUEST_NULL;
}
if (verbose)
message("All requests are launched");
/* Now wait for any of the datasize irecvs to complete and then create
* the irecv to receive the data. */
for (int k = 0; k < nr_nodes - 1; k++) {
int pid = MPI_UNDEFINED;
MPI_Status status;
res = MPI_Waitany(nr_nodes, req_recv_size, &pid, &status);
if (res != MPI_SUCCESS || pid == MPI_UNDEFINED)
error("MPI_Waitany failed for sizes.");
if (verbose) message("Size received from node %d", pid);
int basetag = pid * tag_shift;
recv_datavalues[pid] = malloc(recv_datasizes[pid]);
/* Fill data with our rank, so we can test when it has been modified. */
datacheck_fill(myrank, recv_datasizes[pid], recv_datavalues[pid]);
res = MPI_Irecv(recv_datavalues[pid], recv_datasizes[pid], MPI_BYTE,
pid, basetag + tag_data, MPI_COMM_WORLD,
&req_data_in[pid]);
if (res != MPI_SUCCESS) error("MPI_Irecv failed when listening for data.");
}
if (verbose)
message("All data sizes have arrived, data irecvs are launched");
/* Waitall for all Isend sizes to complete. */
res = MPI_Waitall(nr_nodes, req_send_size, MPI_STATUSES_IGNORE);
if (res != MPI_SUCCESS) error("Waitall for Isend completions failed.");
if (verbose)
message("All sends of sizes have completed");
/* Now wait for the data exchange irecvs to complete. */
for (int k = 0; k < nr_nodes - 1; k++) {
int pid = MPI_UNDEFINED;
MPI_Status status;
res = MPI_Waitany(nr_nodes, req_data_in, &pid, &status);
if (res != MPI_SUCCESS || pid == MPI_UNDEFINED)
error("MPI_Waitany failed for data.");
/* Check the data received is correct. Should be filled with
* the rank of sender and not our rank. */
if (!datacheck_test(pid, recv_datasizes[pid], recv_datavalues[pid])) {
message("Received data is not correct, not filled with sender "
"rank value");
/* Report the tag and source of the request. */
int expected_tag = pid * tag_shift + tag_data;
message("sent from rank %d, with tag %d/%d and error code %d",
status.MPI_SOURCE, status.MPI_TAG, expected_tag,
status.MPI_ERROR);
/* Shouldn't happen, but has been seen. */
if (status.MPI_ERROR != MPI_SUCCESS)
mpi_error_string(status.MPI_ERROR, "unexpected MPI status");
/* Make a report on what the buffer contains. */
datacheck_fulltest(recv_datasizes[pid], recv_datavalues[pid]);
/* This call will succeed, if the receive buffer has not been
* updated. */
if (datacheck_test(myrank, recv_datasizes[pid], recv_datavalues[pid])) {
message("Received data buffer has not been modified");
fflush(stdout);
error("Failed");
} else {
message("Received data is corrupt");
fflush(stdout);
error("Failed");
}
} else {
if (verbose) message("Received data is correct");
}
free(recv_datavalues[pid]);
recv_datavalues[pid] = NULL;
}
if (verbose)
message("All the data exchanges have completed");
/* Waitall for data Isends to complete. */
res = MPI_Waitall(nr_nodes, req_data_out, MPI_STATUSES_IGNORE);
if (res != MPI_SUCCESS) error("Waitall for data Isends failed.");
if (verbose)
message("All sends of data have completed");
/* Check data is unmodified while being offloaded. */
for (int k = 0; k < nr_nodes; k++) {
if (k != myrank) {
if (!datacheck_test(myrank, send_datasizes[k], send_datavalues[k])) {
datacheck_fulltest(send_datasizes[k], send_datavalues[k]);
error("Sent data has been corrupted");
} else {
if (verbose) message("Sent data is correct");
}
free(send_datavalues[k]);
send_datavalues[k] = NULL;
}
}
} /* nloop */
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
if (myrank == 0) message("All done, no errors detected");
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment