Skip to content
Snippets Groups Projects
Commit aac1ca3c authored by Peter W. Draper's avatar Peter W. Draper
Browse files

An attempt at SWIFT-like communications that respect the MPI_THREAD_SPLIT model

Would mean we need to not queue MPI requests except on the engine thread for sends or recvs, and not allow stealing
parent f674aeee
No related branches found
No related tags found
No related merge requests found
CFLAGS = -g -O0 -Wall
all: swiftmpistepsim
all: swiftmpistepsim notswiftmpistepsim
swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c
mpicc $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c
notswiftmpistepsim: notswiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c
mpicc $(CFLAGS) -o notswiftmpistepsim notswiftmpistepsim.c mpiuse.c clocks.c
clean:
rm -f swiftmpistepsim
rm -f notswiftmpistepsim
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
// XXX variation in which we join injection and completion into same threads.
#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"
/* Global: Our rank for all to see. */
int myrank = -1;
/* Are we verbose. */
static int verbose = 0;
/* Attempt to keep original injection time differences. */
static int usetics = 1;
/* Scale to apply to the size of the messages we send. */
static float messagescale = 1.0;
/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;
/* Global communicators for each of the subtypes. */
static const int task_subtype_count = 30; // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[30];
/* The local queues. We need to partition these to keep the MPI communications
* separate so we can use the MPI_THREAD_SPLIT option of the Intel 2020 MPI
* library. */
static struct mpiuse_log_entry **volatile recv_logs;
static int volatile nr_recv_logs = 0;
static struct mpiuse_log_entry **volatile send_logs;
static int volatile nr_send_logs = 0;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs_queue = 0;
static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends_queue = 0;
/* CPU frequency of the machine that created the MPI log. */
// XXX need to store this in the data file.
static double log_clocks_cpufreq = 2194844448.0;
/**
* @brief fill a data area with a pattern that can be checked for changes.
*
* @param size size of data in bytes.
* @param data the data to fill.
*/
static void datacheck_fill(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
p[i] = 170; /* 10101010 in bits. */
}
}
/**
* @brief test a filled data area for our pattern.
*
* @param size size of data in bytes.
* @param data the data to fill.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
if (p[i] != 170) return 0;
}
return 1;
}
/**
* @brief send nitiates MPI_Isend requests and the waits for completion.
*/
static void *send_thread(void *arg) {
if (verbose) message("send thread starts");
ticks starttics = getticks();
/* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */
ticks basetic = send_logs[0]->tic;
ticks looptics = 0;
double deadtime = 0.0;
struct timespec sleep;
sleep.tv_sec = 0;
for (int k = 0; k < nr_send_logs; k++) {
struct mpiuse_log_entry *log = send_logs[k];
if (usetics) {
/* Expect time between this request and the previous one. */
ticks dt = log->tic - basetic;
basetic = log->tic;
/* We guess some time below which we should not attempt to wait,
* otherwise we'll start to overrun, and just inject the next call if we
* are below that (we time the ticks this loop takes without any waiting
* and use that). Otherwise we wait a while. Note we need to convert the
* ticks of the log file into nanoseconds, that requires the original
* CPU frequency. */
if (dt > looptics) {
/* Remember to be fair and remove the looptics, then convert to
* nanoseconds. */
double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9;
if (ns < 1.0e9) {
sleep.tv_nsec = (long)ns;
} else {
/* Wait more than one second. Must be an error, but complain and
* continue. */
sleep.tv_nsec = (long)1.0e9;
message("wait greater than one second");
}
nanosleep(&sleep, NULL);
deadtime += sleep.tv_nsec;
}
}
/* Initialise new log elements. */
log->done = 0;
log->nr_tests = 0;
log->tsum = 0.0;
log->tmax = 0;
log->tmin = INT_MAX;
log->endtic = 0;
log->injtic = getticks();
/* Differences to SWIFT: MPI_BYTE not the MPI_Type. */
int err = 0;
log->data = calloc(log->size, 1);
/* Fill data with pattern. */
if (datacheck) datacheck_fill(log->size, log->data);
/* And send. */
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
subtypeMPI_comms[log->subtype], &log->req);
/* Add a new send request. */
sends_queue[nr_sends_queue] = log;
nr_sends_queue++;
}
if (verbose)
message("send injections completed: %d", nr_sends_queue);
/* Now we wait for the sends to complete. */
/* Global MPI_Test statistics. */
int ncalls = 0;
double sum = 0.0;
ticks mint = INT_MAX;
ticks maxt = 0;
/* We loop while requests we still have requests to complete. */
int todo = nr_sends_queue;
while (todo > 0) {
for (int k = 0; k < nr_sends_queue; k++) {
struct mpiuse_log_entry *log = sends_queue[k];
if (log != NULL && !log->done) {
ticks tics = getticks();
int res;
MPI_Status stat;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
/* Increment etc. of statistics about time in MPI_Test. */
ticks dt = getticks() - tics;
log->tsum += (double)dt;
sum += (double)dt;
log->nr_tests++;
ncalls++;
if (dt < log->tmin) log->tmin = dt;
if (dt > log->tmax) log->tmax = dt;
if (dt < mint) mint = dt;
if (dt > maxt) maxt = dt;
if (res) {
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(log->size, log->data)) {
error("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
free(log->data);
/* One down. */
todo--;
}
}
}
}
/* All done. */
message("sends completed");
message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, "
"max time %.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
return NULL;
}
/**
* @brief send nitiates MPI_Irecv requests and the waits for completion.
*/
static void *recv_thread(void *arg) {
if (verbose) message("recv thread starts");
ticks starttics = getticks();
/* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */
ticks basetic = recv_logs[0]->tic;
ticks looptics = 0;
double deadtime = 0.0;
struct timespec sleep;
sleep.tv_sec = 0;
for (int k = 0; k < nr_recv_logs; k++) {
struct mpiuse_log_entry *log = recv_logs[k];
if (usetics) {
/* Expect time between this request and the previous one. */
ticks dt = log->tic - basetic;
basetic = log->tic;
/* We guess some time below which we should not attempt to wait,
* otherwise we'll start to overrun, and just inject the next call if we
* are below that (we time the ticks this loop takes without any waiting
* and use that). Otherwise we wait a while. Note we need to convert the
* ticks of the log file into nanoseconds, that requires the original
* CPU frequency. */
if (dt > looptics) {
/* Remember to be fair and remove the looptics, then convert to
* nanoseconds. */
double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9;
if (ns < 1.0e9) {
sleep.tv_nsec = (long)ns;
} else {
/* Wait more than one second. Must be an error, but complain and
* continue. */
sleep.tv_nsec = (long)1.0e9;
message("wait greater than one second");
}
nanosleep(&sleep, NULL);
deadtime += sleep.tv_nsec;
}
}
/* Initialise new log elements. */
log->done = 0;
log->nr_tests = 0;
log->tsum = 0.0;
log->tmax = 0;
log->tmin = INT_MAX;
log->endtic = 0;
log->injtic = getticks();
/* Differences to SWIFT: MPI_BYTE not the MPI_Type. */
int err = 0;
log->data = calloc(log->size, 1);
/* Fill data with pattern. */
if (datacheck) datacheck_fill(log->size, log->data);
/* And start to listen for send. */
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
subtypeMPI_comms[log->subtype], &log->req);
/* Add a new recv request. */
recvs_queue[nr_recvs_queue] = log;
nr_recvs_queue++;
}
if (verbose)
message("recv injections completed: %d", nr_recvs_queue);
/* Now we wait for the recvs to complete. */
/* Global MPI_Test statistics. */
int ncalls = 0;
double sum = 0.0;
ticks mint = INT_MAX;
ticks maxt = 0;
/* We loop while requests we still have requests to complete. */
int todo = nr_recvs_queue;
while (todo > 0) {
for (int k = 0; k < nr_recvs_queue; k++) {
struct mpiuse_log_entry *log = recvs_queue[k];
if (log != NULL && !log->done) {
ticks tics = getticks();
int res;
MPI_Status stat;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
/* Increment etc. of statistics about time in MPI_Test. */
ticks dt = getticks() - tics;
log->tsum += (double)dt;
sum += (double)dt;
log->nr_tests++;
ncalls++;
if (dt < log->tmin) log->tmin = dt;
if (dt > log->tmax) log->tmax = dt;
if (dt < mint) mint = dt;
if (dt > maxt) maxt = dt;
if (res) {
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(log->size, log->data)) {
error("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
free(log->data);
/* One down. */
todo--;
}
}
}
}
/* All done. */
message("recvs completed");
message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, "
"max time %.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
return NULL;
}
/**
* @brief Comparison function for logged times.
*/
static int cmp_logs(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
/* Large unsigned values, so take care. */
if (l1->tic > l2->tic) return 1;
if (l1->tic < l2->tic) return -1;
return 0;
}
/**
* @brief Pick out the relevant logging data for our rank, i.e. all
* activations of sends and recvs, which we separate for processing
* in their own threads.
*/
static void pick_logs(void) {
size_t nlogs = mpiuse_nr_logs();
/* Duplicates of logs. */
recv_logs = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
nr_recv_logs = 0;
send_logs = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
nr_send_logs = 0;
sends_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends_queue = 0;
recvs_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
nr_recvs_queue = 0;
/* Index of the thread logs to update. */
int nr_logs = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation) {
if (log->type == task_type_send || log->type == task_type_recv) {
/* Scale size. */
log->size *= messagescale ;
/* And keep this log. */
log->data = NULL;
if (log->type == task_type_send) {
send_logs[nr_send_logs] = log;
nr_send_logs++;
} else {
recv_logs[nr_recv_logs] = log;
nr_recv_logs++;
}
nr_logs++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
}
}
message("Read %d active send and recv logs from input file", nr_logs);
/* Sort into increasing time. */
qsort(send_logs, nr_send_logs, sizeof(struct mpiuse_log_entry *),
cmp_logs);
qsort(recv_logs, nr_recv_logs, sizeof(struct mpiuse_log_entry *),
cmp_logs);
}
/**
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n",
argv[0]);
fprintf(stderr, " options: -v verbose, -f fast injections, "
"-s message size (bytes)\n");
fflush(stderr);
}
/**
* @brief main function.
*/
int main(int argc, char *argv[]) {
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
int nr_nodes = 0;
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int opt;
while ((opt = getopt(argc, argv, "vfds:")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
break;
case 'f':
usetics = 0;
break;
case 'v':
verbose = 1;
break;
case 's':
messagescale = atof(optarg);
break;
default:
if (myrank == 0) usage(argv);
return 1;
}
}
if (optind >= argc - 1) {
if (myrank == 0) usage(argv);
return 1;
}
char *infile = argv[optind];
char *logfile = argv[optind + 1];
/* Initialize time for messages and timers. */
clocks_set_cpufreq(0);
/* Now we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
* log per rank per step, so you need to combine all ranks from a step. */
mpiuse_log_restore(infile);
int nranks = mpiuse_nr_ranks();
/* This should match the expected size. */
if (nr_nodes != nranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_nodes);
/* Create communicators for each subtype, these are associated with the
* threads somewhat randomly, we should seek to balance the work. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
char str[8] = {0};
if (i == 11) {
sprintf(str, "%d", 0); // XXX fudge for xv
} else {
sprintf(str, "%d", i % 2);
}
MPI_Info info;
MPI_Info_create(&info);
MPI_Info_set(info, "thread_id", str);
MPI_Comm_set_info(subtypeMPI_comms[i], info);
if (verbose) message("subtype %d assigned to thread_id %s", i, str);
MPI_Info_free(&info);
}
if (verbose) {
MPI_Barrier(MPI_COMM_WORLD);
fflush(stdout);
}
/* Each rank requires its own queue, so extract them. */
pick_logs();
/* Try to make it synchronous across the ranks and reset time to zero. */
MPI_Barrier(MPI_COMM_WORLD);
clocks_set_cpufreq(0);
if (myrank == 0) {
message("Start of MPI tests");
message("==================");
if (messagescale != 1.0f) {
message(" ");
message(" Using message scale of %f", messagescale);
}
if (verbose) {
if (!usetics) message("using fast untimed injections");
if (datacheck)
message("checking data pattern on send and recv completion");
}
}
/* Need two threads, one for the sends and one for the recvs. */
pthread_t sendthread;
pthread_t recvthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Dump the updated MPI logs. */
MPI_Barrier(MPI_COMM_WORLD);
fflush(stdout);
if (myrank == 0) message("Dumping updated log");
mpiuse_dump_logs(nranks, logfile);
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
if (myrank == 0) message("Bye");
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment