-
Peter W. Draper authoredPeter W. Draper authored
swiftmpiproxies.c 12.96 KiB
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"
/* Global: Our rank for all to see. */
int myrank = -1;
/* Are we verbose. */
static int verbose = 0;
/* Maximum main loops. */
static int maxloops = 1000;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 23;
static const int task_type_recv = 24;
static const int task_subtype_count = 29;
static const int task_subtype_pcells = 35;
/* Proxy tag arithmetic. From proxy.h, must match log. */
#define proxy_tag_shift 8
#define proxy_tag_count 0
#define proxy_tag_cells 6
/* Our queues of communications. Need two to separate out the pcell sends and
* recvs. */
struct mpiuse_log_entry **send_pcells;
int nr_send_pcells = 0;
struct mpiuse_log_entry **recv_pcells;
int nr_recv_pcells = 0;
/**
* @brief fill a data area with a pattern that can be checked for changes.
*
* @param fill value used in fill, note data type.
* @param size size of data in bytes.
* @param data the data to fill.
*/
static void datacheck_fill(unsigned char fill, size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
p[i] = fill;
}
}
/**
* @brief test a filled data area for the given value. Returns 0 if not found
* in all elements of data.
*
* @param fill value used in fill, note data type.
* @param size size of data in bytes.
* @param data the data to fill.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(unsigned char fill, size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
if (p[i] != fill) {
if (verbose) {
message("%d != %d", p[i], fill);
fflush(stdout);
}
return 0;
}
}
return 1;
}
/**
* @brief check a data area reporting some statistics about the content.
*
* Assumes datacheck_test() has already failed.
*
* @param size size of data in bytes.
* @param data the data to fill.
*/
static void datacheck_fulltest(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
double sum = 0.0;
unsigned char pmin = 255;
unsigned char pmax = 0;
for (size_t i = 0; i < size; i++) {
sum += p[i];
if (p[i] > pmax) pmax = p[i];
if (p[i] < pmin) pmin = p[i];
}
message("sum: %.2f, mean: %.2f, min: %d, max: %d", sum, sum / (double)size,
pmin, pmax);
}
/**
* @brief Pick out the relevant logging data for our rank, i.e. all
* activations of sends and recvs. We ignore the original completion logs,
* those are not relevant.
*/
static void pick_logs(void) {
size_t nlogs = mpiuse_nr_logs();
if (verbose) message("Restored %zd logs", nlogs);
/* Separated logs. */
send_pcells = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_send_pcells = 0;
recv_pcells = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recv_pcells = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation) {
log->data = NULL;
if (log->type == task_type_send) {
if (log->subtype == task_subtype_pcells) {
send_pcells[nr_send_pcells] = log;
nr_send_pcells++;
} else if (log->subtype != task_subtype_pcells &&
log->subtype != task_subtype_count) {
error("task subtype '%d' is not a known value", log->subtype);
}
} else if (log->type == task_type_recv) {
if (log->subtype == task_subtype_pcells) {
recv_pcells[nr_recv_pcells] = log;
nr_recv_pcells++;
} else if (log->subtype != task_subtype_pcells &&
log->subtype != task_subtype_count) {
error("task subtype '%d' is not a known value", log->subtype);
}
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
}
}
if (verbose)
message("Read %d send and %d recv pcells logs", nr_send_pcells,
nr_recv_pcells);
}
/**
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s [-vn] SWIFT_mpiuse-log-file.dat\n", argv[0]);
fprintf(stderr, " options: -n maxloops, -v verbose\n");
fflush(stderr);
}
/**
* @brief main function.
*/
int main(int argc, char *argv[]) {
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
int nr_nodes = 0;
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Handle the command-line, we expect a mpiuse data file to read and
* various options. */
int opt;
while ((opt = getopt(argc, argv, "vn:")) != -1) {
switch (opt) {
case 'n':
maxloops = atoi(optarg);
break;
case 'v':
verbose = 1;
break;
default:
if (myrank == 0) usage(argv);
return 1;
}
}
if (optind >= argc) {
if (myrank == 0) usage(argv);
return 1;
}
char *infile = argv[optind];
/* Start time across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
clocks_set_cpufreq(0);
/* Now we read the SWIFT MPI logger output that defines the communcations we
* will undertake. Note this has all ranks for a single step, SWIFT outputs
* one MPI log per rank per step, so you need to combine all ranks from a
* step. */
mpiuse_log_restore(infile);
int nranks = mpiuse_nr_ranks();
/* This should match the expected size. */
if (nr_nodes != nranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_nodes);
/* Each rank has its own queues of requests, so extract them. */
pick_logs();
/* And run our version of the proxy exchanges. */
MPI_Request req_send_counts[nr_send_pcells];
MPI_Request req_recv_counts[nr_send_pcells];
MPI_Request req_pcells_out[nr_send_pcells];
int pcells_size[nr_send_pcells];
/* Loop over next section a number of times to simulate rebuild steps in
* SWIFT. */
for (int nloop = 0; nloop < maxloops; nloop++) {
MPI_Barrier(MPI_COMM_WORLD);
if (myrank == 0)
message("*** Proxy simulation exchange loop: %d ***", nloop);
/* Note in SWIFT we use the threadpool to launch these. */
for (int k = 0; k < nr_send_pcells; k++) {
struct mpiuse_log_entry *log = send_pcells[k];
/* Need to regenerate the tags for each other communication type. */
int basetag = myrank * proxy_tag_shift;
/* Start Isend counts of pcells. Really just the size of the buffer
* we're about to send, SWIFT sends the count. */
res = MPI_Isend(&log->size, 1, MPI_INT, log->otherrank,
basetag + proxy_tag_count, MPI_COMM_WORLD,
&req_send_counts[k]);
if (res != MPI_SUCCESS) error("Counts MPI_Isend failed.");
/* Start Isend of pcells, fill with our rank value. */
log->data = calloc(log->size, 1);
datacheck_fill(myrank, log->size, log->data);
res = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank,
basetag + proxy_tag_cells, MPI_COMM_WORLD,
&req_pcells_out[k]);
if (res != MPI_SUCCESS) error("Pcell MPI_Isend failed.");
/* Start Irecv counts of pcells from other rank. */
basetag = log->otherrank * proxy_tag_shift;
res = MPI_Irecv(&pcells_size[k], 1, MPI_INT, log->otherrank,
basetag + proxy_tag_count, MPI_COMM_WORLD,
&req_recv_counts[k]);
if (res != MPI_SUCCESS) error("Counts MPI_Irecv failed.");
}
if (verbose)
message("All counts requests and pcell sends are launched");
/* Now wait for any of the counts irecvs to complete and then create the
* irecv for the pcells. */
void *pcells_in[nr_send_pcells];
MPI_Request req_pcells_in[nr_send_pcells];
for (int k = 0; k < nr_send_pcells; k++) {
int pid = MPI_UNDEFINED;
MPI_Status status;
res = MPI_Waitany(nr_send_pcells, req_recv_counts, &pid, &status);
if (res != MPI_SUCCESS || pid == MPI_UNDEFINED)
error("MPI_Waitany failed.");
if (verbose) message("Counts received for proxy %d", pid);
struct mpiuse_log_entry *log = send_pcells[pid];
int basetag = log->otherrank * proxy_tag_shift;
pcells_in[pid] = calloc(pcells_size[pid], 1);
/* Fill data with our rank. */
datacheck_fill(myrank, pcells_size[pid], pcells_in[pid]);
res = MPI_Irecv(pcells_in[pid], pcells_size[pid], MPI_BYTE,
log->otherrank, basetag + proxy_tag_cells, MPI_COMM_WORLD,
&req_pcells_in[pid]);
if (res != MPI_SUCCESS) error("Pcell MPI_Irecv failed.");
}
if (verbose)
message("All proxy cell counts have arrived, pcells irecvs are launched");
/* Waitall for all Isend counts to complete. */
res = MPI_Waitall(nr_send_pcells, req_send_counts, MPI_STATUSES_IGNORE);
if (res != MPI_SUCCESS) error("Waitall for counts Isend failed.");
if (verbose)
message("All sends of counts have completed");
/* Now wait for the pcell irecvs to complete, so we receive the pcells,
* which would be unpacked in SWIFT. */
for (int k = 0; k < nr_send_pcells; k++) {
int pid = MPI_UNDEFINED;
MPI_Status status;
res = MPI_Waitany(nr_send_pcells, req_pcells_in, &pid, &status);
if (res != MPI_SUCCESS || pid == MPI_UNDEFINED)
error("MPI_Waitany failed.");
/* Check the data received is correct. Should be filled with
* rank of sender. */
if (!datacheck_test(status.MPI_SOURCE, pcells_size[pid],
pcells_in[pid])) {
message("Received data is not correct");
/* Report the tag and source of the request. */
int expected_tag =
status.MPI_SOURCE * proxy_tag_shift + proxy_tag_cells;
message("sent from rank %d, with tag %d/%d and error code %d",
status.MPI_SOURCE, status.MPI_TAG, expected_tag,
status.MPI_ERROR);
/* Shouldn't happen, but has been seen. */
if (status.MPI_ERROR != MPI_SUCCESS)
mpi_error_string(status.MPI_ERROR, "unexpected MPI status");
/* Make a report on what the buffer contains. */
datacheck_fulltest(pcells_size[pid], pcells_in[pid]);
/* This call will succeed, if the receive buffer has not been
* updated. */
if (datacheck_test(myrank, pcells_size[pid], pcells_in[pid])) {
message("Received data buffer has not been modified");
fflush(stdout);
error("Failed");
} else {
message("Received data is corrupt");
fflush(stdout);
error("Failed");
}
} else {
if (verbose) message("Received data is correct");
}
free(pcells_in[pid]);
pcells_in[pid] = NULL;
}
if (verbose)
message("All proxy cells have arrived");
/* Waitall for Isend of pcells to complete. */
res = MPI_Waitall(nr_send_pcells, req_pcells_out, MPI_STATUSES_IGNORE);
if (res != MPI_SUCCESS) error("Waitall for pcells Isend failed.");
if (verbose)
message("All sends of pcells have completed");
/* Check data is unmodified while being offloaded. */
for (int k = 0; k < nr_send_pcells; k++) {
struct mpiuse_log_entry *log = send_pcells[k];
if (!datacheck_test(myrank, log->size, log->data)) {
datacheck_fulltest(log->size, log->data);
error("Sent data has been corrupted");
} else {
if (verbose) message("Sent data is correct");
}
free(log->data);
log->data = NULL;
}
} /* nloop */
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
if (myrank == 0) message("All done, no errors detected");
return 0;
}