Skip to content
Snippets Groups Projects
Commit 7b8772df authored by Peter W. Draper's avatar Peter W. Draper
Browse files

First attempt at fully mpi_split compliant code

Rendevous exchanges are locking up...
parent aac1ca3c
No related branches found
No related tags found
No related merge requests found
swiftmpistepsim
notswiftmpistepsim
notswiftmpistepsim2
*.o
*~
CFLAGS = -g -O0 -Wall
all: swiftmpistepsim notswiftmpistepsim
all: swiftmpistepsim notswiftmpistepsim notswiftmpistepsim2
swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c
mpicc $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c
......@@ -8,6 +8,10 @@ swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h c
notswiftmpistepsim: notswiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c
mpicc $(CFLAGS) -o notswiftmpistepsim notswiftmpistepsim.c mpiuse.c clocks.c
notswiftmpistepsim2: notswiftmpistepsim2.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c
mpicc $(CFLAGS) -o notswiftmpistepsim2 notswiftmpistepsim2.c mpiuse.c clocks.c
clean:
rm -f swiftmpistepsim
rm -f notswiftmpistepsim
rm -f notswiftmpistepsim2
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
// XXX variation in which we join injection and completion into same threads.
// and also restrict the communicators to 2.
#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"
/* Global: Our rank for all to see. */
int myrank = -1;
/* Are we verbose. */
static int verbose = 0;
/* Attempt to keep original injection time differences. */
static int usetics = 1;
/* Scale to apply to the size of the messages we send. */
static float messagescale = 1.0;
/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;
/* The maximum tag value in logs, rounded up to the next power of 2. */
static int maxtag = 0;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;
/* Global communicators for each thread. */
static MPI_Comm subtypeMPI_comms[2];
/* The local queues. We need to partition these to keep the MPI communications
* separate so we can use the MPI_THREAD_SPLIT option of the Intel 2020 MPI
* library. */
static struct mpiuse_log_entry **volatile recv_logs;
static int volatile nr_recv_logs = 0;
static struct mpiuse_log_entry **volatile send_logs;
static int volatile nr_send_logs = 0;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs_queue = 0;
static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends_queue = 0;
/* CPU frequency of the machine that created the MPI log. */
// XXX need to store this in the data file.
static double log_clocks_cpufreq = 2194844448.0;
/**
* @brief fill a data area with a pattern that can be checked for changes.
*
* @param size size of data in bytes.
* @param data the data to fill.
*/
static void datacheck_fill(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
p[i] = 170; /* 10101010 in bits. */
}
}
/**
* @brief test a filled data area for our pattern.
*
* @param size size of data in bytes.
* @param data the data to fill.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
if (p[i] != 170) return 0;
}
return 1;
}
/**
* @brief send nitiates MPI_Isend requests and the waits for completion.
*/
static void *send_thread(void *arg) {
if (verbose) message("send thread starts");
ticks starttics = getticks();
/* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */
ticks basetic = send_logs[0]->tic;
ticks looptics = 0;
double deadtime = 0.0;
struct timespec sleep;
sleep.tv_sec = 0;
for (int k = 0; k < nr_send_logs; k++) {
struct mpiuse_log_entry *log = send_logs[k];
if (usetics) {
/* Expect time between this request and the previous one. */
ticks dt = log->tic - basetic;
basetic = log->tic;
/* We guess some time below which we should not attempt to wait,
* otherwise we'll start to overrun, and just inject the next call if we
* are below that (we time the ticks this loop takes without any waiting
* and use that). Otherwise we wait a while. Note we need to convert the
* ticks of the log file into nanoseconds, that requires the original
* CPU frequency. */
if (dt > looptics) {
/* Remember to be fair and remove the looptics, then convert to
* nanoseconds. */
double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9;
if (ns < 1.0e9) {
sleep.tv_nsec = (long)ns;
} else {
/* Wait more than one second. Must be an error, but complain and
* continue. */
sleep.tv_nsec = (long)1.0e9;
message("wait greater than one second");
}
nanosleep(&sleep, NULL);
deadtime += sleep.tv_nsec;
}
}
/* Initialise new log elements. */
log->done = 0;
log->nr_tests = 0;
log->tsum = 0.0;
log->tmax = 0;
log->tmin = INT_MAX;
log->endtic = 0;
log->injtic = getticks();
/* Differences to SWIFT: MPI_BYTE not the MPI_Type. */
int err = 0;
log->data = calloc(log->size, 1);
/* Fill data with pattern. */
if (datacheck) datacheck_fill(log->size, log->data);
/* And send. Note using different communicator so need to construct
* different, still unique and computable, tag.*/
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank,
log->tag + maxtag * log->subtype,
subtypeMPI_comms[0], &log->req);
/* Add a new send request. */
sends_queue[nr_sends_queue] = log;
nr_sends_queue++;
}
if (verbose)
message("send injections completed: %d", nr_sends_queue);
/* Now we wait for the sends to complete. */
/* Global MPI_Test statistics. */
int ncalls = 0;
double sum = 0.0;
ticks mint = INT_MAX;
ticks maxt = 0;
/* We loop while requests we still have requests to complete. */
int todo = nr_sends_queue;
while (todo > 0) {
for (int k = 0; k < nr_sends_queue; k++) {
struct mpiuse_log_entry *log = sends_queue[k];
if (log != NULL && !log->done) {
ticks tics = getticks();
int res;
MPI_Status stat;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
/* Increment etc. of statistics about time in MPI_Test. */
ticks dt = getticks() - tics;
log->tsum += (double)dt;
sum += (double)dt;
log->nr_tests++;
ncalls++;
if (dt < log->tmin) log->tmin = dt;
if (dt > log->tmax) log->tmax = dt;
if (dt < mint) mint = dt;
if (dt > maxt) maxt = dt;
if (res) {
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(log->size, log->data)) {
error("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
free(log->data);
/* One down. */
todo--;
}
}
}
}
/* All done. */
message("sends completed");
message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, "
"max time %.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
return NULL;
}
/**
* @brief send nitiates MPI_Irecv requests and the waits for completion.
*/
static void *recv_thread(void *arg) {
if (verbose) message("recv thread starts");
ticks starttics = getticks();
/* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */
ticks basetic = recv_logs[0]->tic;
ticks looptics = 0;
double deadtime = 0.0;
struct timespec sleep;
sleep.tv_sec = 0;
for (int k = 0; k < nr_recv_logs; k++) {
struct mpiuse_log_entry *log = recv_logs[k];
if (usetics) {
/* Expect time between this request and the previous one. */
ticks dt = log->tic - basetic;
basetic = log->tic;
/* We guess some time below which we should not attempt to wait,
* otherwise we'll start to overrun, and just inject the next call if we
* are below that (we time the ticks this loop takes without any waiting
* and use that). Otherwise we wait a while. Note we need to convert the
* ticks of the log file into nanoseconds, that requires the original
* CPU frequency. */
if (dt > looptics) {
/* Remember to be fair and remove the looptics, then convert to
* nanoseconds. */
double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9;
if (ns < 1.0e9) {
sleep.tv_nsec = (long)ns;
} else {
/* Wait more than one second. Must be an error, but complain and
* continue. */
sleep.tv_nsec = (long)1.0e9;
message("wait greater than one second");
}
nanosleep(&sleep, NULL);
deadtime += sleep.tv_nsec;
}
}
/* Initialise new log elements. */
log->done = 0;
log->nr_tests = 0;
log->tsum = 0.0;
log->tmax = 0;
log->tmin = INT_MAX;
log->endtic = 0;
log->injtic = getticks();
/* Differences to SWIFT: MPI_BYTE not the MPI_Type. */
int err = 0;
log->data = calloc(log->size, 1);
/* Fill data with pattern. */
if (datacheck) datacheck_fill(log->size, log->data);
/* And listen for send. Note using different communicator so need to
* construct different, still unique and computable, tag.*/
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank,
log->tag + maxtag * log->subtype,
subtypeMPI_comms[1], &log->req);
/* Add a new recv request. */
recvs_queue[nr_recvs_queue] = log;
nr_recvs_queue++;
}
if (verbose)
message("recv injections completed: %d", nr_recvs_queue);
/* Now we wait for the recvs to complete. */
/* Global MPI_Test statistics. */
int ncalls = 0;
double sum = 0.0;
ticks mint = INT_MAX;
ticks maxt = 0;
/* We loop while requests we still have requests to complete. */
int todo = nr_recvs_queue;
while (todo > 0) {
for (int k = 0; k < nr_recvs_queue; k++) {
struct mpiuse_log_entry *log = recvs_queue[k];
if (log != NULL && !log->done) {
ticks tics = getticks();
int res;
MPI_Status stat;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
/* Increment etc. of statistics about time in MPI_Test. */
ticks dt = getticks() - tics;
log->tsum += (double)dt;
sum += (double)dt;
log->nr_tests++;
ncalls++;
if (dt < log->tmin) log->tmin = dt;
if (dt > log->tmax) log->tmax = dt;
if (dt < mint) mint = dt;
if (dt > maxt) maxt = dt;
if (res) {
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(log->size, log->data)) {
error("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
free(log->data);
/* One down. */
todo--;
}
}
}
}
/* All done. */
message("recvs completed");
message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, "
"max time %.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
return NULL;
}
/**
* @brief Comparison function for logged times.
*/
static int cmp_logs(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
/* Large unsigned values, so take care. */
if (l1->tic > l2->tic) return 1;
if (l1->tic < l2->tic) return -1;
return 0;
}
/**
* @brief Pick out the relevant logging data for our rank, i.e. all
* activations of sends and recvs, which we separate for processing
* in their own threads.
*/
static void pick_logs(void) {
size_t nlogs = mpiuse_nr_logs();
/* Duplicates of logs. */
recv_logs = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
nr_recv_logs = 0;
send_logs = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
nr_send_logs = 0;
sends_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends_queue = 0;
recvs_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
nr_recvs_queue = 0;
/* Select the send and recv logs that are initiators. */
int nr_logs = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
/* Need the maximum tag for all records to match between ranks. */
if (log->tag > maxtag) maxtag = log->tag;
if (log->rank == myrank && log->activation) {
if (log->type == task_type_send || log->type == task_type_recv) {
/* Scale size. */
log->size *= messagescale ;
/* And keep this log. */
log->data = NULL;
if (log->type == task_type_send) {
send_logs[nr_send_logs] = log;
nr_send_logs++;
} else {
recv_logs[nr_recv_logs] = log;
nr_recv_logs++;
}
nr_logs++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
}
}
message("Read %d active send and recv logs from input file", nr_logs);
/* Sort into increasing time. */
qsort(send_logs, nr_send_logs, sizeof(struct mpiuse_log_entry *),
cmp_logs);
qsort(recv_logs, nr_recv_logs, sizeof(struct mpiuse_log_entry *),
cmp_logs);
/* Maximum tag rounded to next power of 2. */
int power = 1;
while (power < maxtag) power = power << 1;
if (verbose) message("raw maxtag = %d, rounded maxtag = %d", maxtag, power);
maxtag = power;
}
/**
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n",
argv[0]);
fprintf(stderr, " options: -v verbose, -f fast injections, "
"-s message size (bytes)\n");
fflush(stderr);
}
/**
* @brief main function.
*/
int main(int argc, char *argv[]) {
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
int nr_nodes = 0;
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int opt;
while ((opt = getopt(argc, argv, "vfds:")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
break;
case 'f':
usetics = 0;
break;
case 'v':
verbose = 1;
break;
case 's':
messagescale = atof(optarg);
break;
default:
if (myrank == 0) usage(argv);
return 1;
}
}
if (optind >= argc - 1) {
if (myrank == 0) usage(argv);
return 1;
}
char *infile = argv[optind];
char *logfile = argv[optind + 1];
/* Initialize time for messages and timers. */
clocks_set_cpufreq(0);
/* Now we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
* log per rank per step, so you need to combine all ranks from a step. */
mpiuse_log_restore(infile);
int nranks = mpiuse_nr_ranks();
/* This should match the expected size. */
if (nr_nodes != nranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_nodes);
/* Create communicators for each subtype, these are associated with the
* threads somewhat randomly, we should seek to balance the work. */
for (int i = 0; i < 2; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
char str[8] = {0};
sprintf(str, "%d", i);
MPI_Info info;
MPI_Info_create(&info);
MPI_Info_set(info, "thread_id", str);
MPI_Comm_set_info(subtypeMPI_comms[i], info);
MPI_Info_free(&info);
}
if (verbose) {
MPI_Barrier(MPI_COMM_WORLD);
fflush(stdout);
}
/* Each rank requires its own queue, so extract them. */
pick_logs();
/* Try to make it synchronous across the ranks and reset time to zero. */
MPI_Barrier(MPI_COMM_WORLD);
clocks_set_cpufreq(0);
if (myrank == 0) {
message("Start of MPI tests");
message("==================");
if (messagescale != 1.0f) {
message(" ");
message(" Using message scale of %f", messagescale);
}
if (verbose) {
if (!usetics) message("using fast untimed injections");
if (datacheck)
message("checking data pattern on send and recv completion");
}
}
/* Need two threads, one for the sends and one for the recvs. */
pthread_t sendthread;
pthread_t recvthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Dump the updated MPI logs. */
MPI_Barrier(MPI_COMM_WORLD);
fflush(stdout);
if (myrank == 0) message("Dumping updated log");
mpiuse_dump_logs(nranks, logfile);
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
if (myrank == 0) message("Bye");
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment