-
Peter W. Draper authoredPeter W. Draper authored
swiftmpifakestepsim.c 15.50 KiB
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2021 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"
/* Global: Our rank for all to see. */
int myrank = -1;
/* Are we verbose. */
static int verbose = 0;
/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;
/* Default seed for pseudorandoms. */
static long int default_seed = 1987654321;
/* MPI communicator for each rank. XXX static XXX. */
static MPI_Comm node_comms[512];
/* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue;
static int volatile ind_req = 0;
static int volatile nr_reqs = 0;
static int volatile injecting = 1;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0;
static int volatile ind_recv = 0;
static int volatile todo_recv = 0;
static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends = 0;
static int volatile ind_send = 0;
static int volatile todo_send = 0;
/**
* @brief fill a data area with a pattern that can be checked for changes.
*
* @param size size of data in bytes.
* @param data the data to fill.
*/
static void datacheck_fill(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
p[i] = 170; /* 10101010 in bits. */
}
}
/**
* @brief test a filled data area for our pattern.
*
* @param size size of data in bytes.
* @param data the data to fill.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
if (p[i] != 170) return 0;
}
return 1;
}
/**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
*/
static void *inject_thread(void *arg) {
if (verbose) message("%d: injection thread starts", *((int *)arg));
ticks starttics = getticks();
while (ind_req < nr_reqs) {
struct mpiuse_log_entry *log = reqs_queue[ind_req];
/* Initialise new log elements. */
log->done = 0;
log->nr_tests = 0;
log->tsum = 0.0;
log->tmax = 0;
log->tmin = INT_MAX;
log->endtic = 0;
log->injtic = getticks();
/* Differences to SWIFT: MPI_BYTE not the MPI_Type. */
int err = 0;
if (log->type == SEND_TYPE) {
log->data = calloc(log->size, 1);
/* Fill data with pattern. */
if (datacheck) datacheck_fill(log->size, log->data);
/* And send. */
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
node_comms[log->rank], &log->req);
/* Add a new send request. */
int ind = atomic_inc(&nr_sends);
sends_queue[ind] = log;
atomic_inc(&todo_send);
} else {
/* Ready to receive. */
log->data = calloc(log->size, 1);
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
node_comms[log->otherrank], &log->req);
/* Add a new recv request. */
int ind = atomic_inc(&nr_recvs);
recvs_queue[ind] = log;
atomic_inc(&todo_recv);
}
if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++;
}
/* All done, thread exiting. */
if (verbose) {
message("%d injections completed, sends = %d, recvs = %d", ind_req,
nr_sends, nr_recvs);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
atomic_dec(&injecting);
return NULL;
}
/**
* @brief main loop to run over a queue of MPI requests and test for when they
* complete. Returns the total amount of time spent in calls to MPI_Test and
* the number of times it was called.
*
* @param logs the list of logs pointing to requests.
* @param nr_logs pointer to the variable containing the current number of
* logs.
* @param todos pointer to the variable containing the number of requests that
* are still active.
* @param sum the total number of ticks spent in calls to MPI_Test.
* @param ncalls the total number of calls to MPI_Test.
* @param mint the minimum ticks an MPI_Test call took.
* @param maxt the maximum ticks an MPI_Test call took.
*/
static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
int volatile *todos, double *sum, int *ncalls,
ticks *mint, ticks *maxt) {
/* Global MPI_Test statistics. */
int lncalls = 0;
double lsum = 0.0;
ticks lmint = INT_MAX;
ticks lmaxt = 0;
/* We loop while new requests are being injected and we still have requests
* to complete. */
while (injecting || (!injecting && *todos > 0)) {
int nlogs = *nr_logs;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = logs[k];
if (log != NULL && !log->done) {
ticks tics = getticks();
int res;
MPI_Status stat;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
/* Increment etc. of statistics about time in MPI_Test. */
ticks dt = getticks() - tics;
log->tsum += (double)dt;
lsum += (double)dt;
log->nr_tests++;
lncalls++;
if (dt < log->tmin) log->tmin = dt;
if (dt > log->tmax) log->tmax = dt;
if (dt < lmint) lmint = dt;
if (dt > lmaxt) lmaxt = dt;
if (res) {
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(log->size, log->data)) {
error("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
free(log->data);
atomic_dec(todos);
}
}
}
}
/* All done. */
*sum = lsum;
*ncalls = lncalls;
*mint = lmint;
*maxt = lmaxt;
return;
}
/**
* @brief Send thread, checks if MPI_Isend requests have completed.
*/
static void *send_thread(void *arg) {
if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting);
ticks starttics = getticks();
int ncalls;
double sum;
ticks mint;
ticks maxt;
queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt);
message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time "
"%.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
}
/**
* @brief Recv thread, checks if MPI_Irecv requests have completed.
*/
static void *recv_thread(void *arg) {
if (verbose) message("%d: recv thread starts", *((int *)arg));
ticks starttics = getticks();
int ncalls;
double sum;
ticks mint;
ticks maxt;
queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt);
message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time "
"%.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
}
/**
* @brief Comparison function for logged times.
*/
static int cmp_logs(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
/* Large unsigned values, so take care. */
if (l1->tic > l2->tic) return 1;
if (l1->tic < l2->tic) return -1;
return 0;
}
/**
* @brief Pick out the relevant logging data for our rank, i.e. all
* activations of sends and recvs. We ignore the original completions.
* The final list is sorted into increasing time of activation if required,
* otherwise the order is randomized.
*
* @param random randomize injection order, otherwise use order of the
* original logs.
*/
static void pick_logs(int random) {
size_t nlogs = mpiuse_nr_logs();
/* Duplicate of logs. */
reqs_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_reqs = 0;
sends_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends = 0;
recvs_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recvs = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation) {
log->data = NULL;
reqs_queue[nr_reqs] = log;
nr_reqs++;
}
}
if (!random) {
/* Sort into increasing time. */
qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
} else {
/* Randomize the order, so ranks do not all work in sequence. */
mpiuse_shuffle_logs(reqs_queue, nr_reqs);
}
/* Check. */
if (!random) {
for (int k = 0; k < nr_reqs - 1; k++) {
if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic)
message("reqs_queue: %lld > %lld", reqs_queue[k]->tic,
reqs_queue[k + 1]->tic);
}
}
}
/**
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s [options] nr_messages logfile.dat\n", argv[0]);
fprintf(stderr,
" options: -v verbose, -d data check, -s size (bytes/scale), \n"
"\t -f randomize injection order, \n"
"\t[-r uniform random from 1 to size, | \n"
"\t-r -g half gaussian random from 1 with 2.5 sigma size., | \n"
"\t-r -c <file> use cdf from file, size is a scale factor., |\n"
"\t-r -o <file> use occurence sample of values in a file, size is a "
"scale factor.,] \n"
"\t-x random seed\n");
fflush(stderr);
}
/**
* @brief main function.
*/
int main(int argc, char *argv[]) {
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
int nr_nodes = 0;
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Handle the command-line, we expect the number of messages to exchange per
* rank an output log and some options, the interesting ones are a size and
* whether to use a random selections of various kinds. */
int size = 1024;
int random = 0;
int randomorder = 0;
int uniform = 1;
char *cdf = NULL;
char *odata = NULL;
int opt;
unsigned int seed = default_seed;
while ((opt = getopt(argc, argv, "vds:rgx:c:o:f")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
break;
case 'c':
cdf = optarg;
break;
case 'f':
randomorder = 1;
break;
case 'g':
uniform = 0;
break;
case 's':
size = atoi(optarg);
break;
case 'r':
random = 1;
break;
case 'o':
odata = optarg;
break;
case 'v':
verbose = 1;
break;
case 'x':
seed = atol(optarg);
break;
default:
if (myrank == 0) usage(argv);
return 1;
}
}
if (optind >= argc - 1) {
if (myrank == 0) usage(argv);
return 1;
}
if (cdf != NULL && odata != NULL)
error("Cannot use -c and -o options together");
int nr_logs = atoi(argv[optind]);
if (nr_logs == 0)
error("Expected number of messages to exchange, got: %s", argv[optind]);
char *logfile = argv[optind + 1];
/* Generate the fake logs for the exchanges. */
if (myrank == 0) {
if (random) {
if (cdf != NULL) {
message(
"Generating %d fake logs for %d ranks with randoms"
" based on cdf %s scaled by factor %d",
nr_logs, nr_nodes, cdf, size);
} else if (odata != NULL) {
message(
"Generating %d fake logs for %d ranks with randoms"
" based on occurence data %s scaled by factor %d",
nr_logs, nr_nodes, cdf, size);
} else if (uniform) {
message(
"Generating %d fake logs for %d ranks with random distribution"
" using size %d",
nr_logs, nr_nodes, size);
} else {
message(
"Generating %d fake logs for %d ranks with gaussian random "
"distribution using size %d as 2.5 sigma",
nr_logs, nr_nodes, size);
}
} else {
message("Generating %d fake logs for %d ranks of size %d", nr_logs,
nr_nodes, size);
}
}
mpiuse_log_generate(nr_nodes, nr_logs, size, random, seed, uniform, cdf,
odata);
int nranks = mpiuse_nr_ranks();
/* Create communicators for each MPI rank. */
for (int i = 0; i < nr_nodes; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &node_comms[i]);
}
/* Each rank requires its own queue, so extract them. */
pick_logs(randomorder);
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
clocks_set_cpufreq(0);
if (myrank == 0) {
message("Start of MPI tests");
message("==================");
if (verbose) {
if (datacheck)
message("checking data pattern on send and recv completion");
}
}
/* Make three threads, one for injecting tasks and two to check for
* completions of the sends and recv independently. */
pthread_t injectthread;
if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
error("Failed to create injection thread.");
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread, NULL);
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Dump the updated MPI logs. */
MPI_Barrier(MPI_COMM_WORLD);
fflush(stdout);
if (myrank == 0) message("Dumping updated log");
mpiuse_dump_logs(nranks, logfile);
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
if (myrank == 0) message("Bye");
return 0;
}