-
Peter W. Draper authored
Use more than one thread for send and receive as well as injection, also make it possible to use the output from swiftmpifakestepsim as the input to swiftmpistepsim (use -z flag)
Peter W. Draper authoredUse more than one thread for send and receive as well as injection, also make it possible to use the output from swiftmpifakestepsim as the input to swiftmpistepsim (use -z flag)
mpiuse.c 14.28 KiB
/* This file is part of SWIFT.
* Copyright (c) 2019 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/**
* @file mpiuse.c
* @brief file of routines to report about MPI tasks used in SWIFT.
*/
/* Standard includes. */
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/* Local defines. */
#include "mpiuse.h"
/* Local includes. */
#include "atomic.h"
#include "clocks.h"
#include "cycle.h"
#include "error.h"
#include "histogram.h"
/* Our rank. */
extern int myrank;
/* The initial size and increment of the log entries buffer. */
#define MPIUSE_INITLOG 1000000
/* The log of activations and handoffs. All volatile as accessed from threads
* that use the value to synchronise. */
static struct mpiuse_log_entry *volatile mpiuse_log = NULL;
static volatile size_t mpiuse_log_size = 0;
static volatile size_t mpiuse_log_count = 0;
static volatile size_t mpiuse_log_done = 0;
static volatile int mpiuse_max_rank = 0;
/**
* @brief reallocate the entries log if space is needed.
*/
static void mpiuse_log_reallocate(size_t ind) {
if (ind == 0) {
/* Need to perform initialization. Be generous. */
if ((mpiuse_log = (struct mpiuse_log_entry *)malloc(
sizeof(struct mpiuse_log_entry) * MPIUSE_INITLOG)) == NULL)
error("Failed to allocate MPI use log.");
/* Last action. */
mpiuse_log_size = MPIUSE_INITLOG;
} else {
struct mpiuse_log_entry *new_log;
if ((new_log = (struct mpiuse_log_entry *)malloc(
sizeof(struct mpiuse_log_entry) *
(mpiuse_log_size + MPIUSE_INITLOG))) == NULL)
error("Failed to re-allocate MPI use log.");
/* Wait for all writes to the old buffer to complete. */
while (mpiuse_log_done < mpiuse_log_size)
;
/* Copy to new buffer. */
memcpy(new_log, mpiuse_log,
sizeof(struct mpiuse_log_entry) * mpiuse_log_size);
free(mpiuse_log);
mpiuse_log = new_log;
/* Last action, releases waiting threads. */
atomic_add(&mpiuse_log_size, MPIUSE_INITLOG);
}
}
/**
* @brief Log an MPI request or handoff.
*
* @param rank the rank
* @param step the step
* @param tic the ticks at time of log, will be relative.
* @param type the task type (send or recv).
* @param subtype the task subtype.
* @param activation if not is a successful MPI_Test, not MPI_Isend or
* MPI_Irecv.
* @param size the size in bytes of memory to be transfered or received.
* 0 for a deactivation.
* @param otherrank other rank associated with the transfer.
* @param tag the MPI tag.
*/
void mpiuse_log_allocation(int rank, int step, size_t tic, int type,
int subtype, int activation, size_t size,
int otherrank, int tag) {
size_t ind = atomic_inc(&mpiuse_log_count);
/* If we are at the current size we need more space. */
if (ind == mpiuse_log_size) mpiuse_log_reallocate(ind);
/* Other threads wait for space. */
while (ind > mpiuse_log_size)
;
/* Record the log. */
mpiuse_log[ind].activation = activation;
mpiuse_log[ind].data = NULL;
mpiuse_log[ind].otherrank = otherrank;
mpiuse_log[ind].rank = rank;
mpiuse_log[ind].req = MPI_REQUEST_NULL;
mpiuse_log[ind].size = size;
mpiuse_log[ind].step = step;
mpiuse_log[ind].subtype = subtype;
mpiuse_log[ind].tag = tag;
mpiuse_log[ind].tic = tic;
mpiuse_log[ind].type = type;
/* Keep number of ranks for convenience. */
if (rank > mpiuse_max_rank) mpiuse_max_rank = rank;
atomic_inc(&mpiuse_log_done);
}
/**
* @brief restore the log from a dump.
*
* @param filename name of file with the previous dump in.
*/
void mpiuse_log_restore(const char *filename) {
/* Open the input file. */
FILE *fd;
if ((fd = fopen(filename, "r")) == NULL) {
message("Failed to open the MPI use log file '%s'.", filename);
return;
}
/* Read until the end of the file is reached.*/
char line[132];
size_t stic, etic, dtic, size, sum;
int step, rank, otherrank, itype, isubtype, activation, tag;
char type[32], subtype[32];
while (!feof(fd)) {
if (fgets(line, 132, fd) != NULL) {
if (line[0] != '#') {
sscanf(line, "%zd %zd %zd %d %d %d %s %d %s %d %d %d %zd %zd", &stic,
&etic, &dtic, &step, &rank, &otherrank, type, &itype, subtype,
&isubtype, &activation, &tag, &size, &sum);
mpiuse_log_allocation(rank, step, stic, itype, isubtype, activation,
size, otherrank, tag);
}
}
}
fclose(fd);
}
/**
* @brief dump the logs for all ranks to a file.
*
* @param nranks the number of ranks.
* @param standard only write a standard log, this can be used as input to
* other runs.
* @param dumpfile the file to write
*/
void mpiuse_dump_logs(int nranks, int standard, const char *dumpfile) {
/* Make sure output file is empty, only on one rank. */
FILE *fd;
if (myrank == 0) {
fd = fopen(dumpfile, "w");
/* Header. */
if (standard) {
fprintf(fd,
"# stic etic dtic step rank otherrank type itype "
" subtype isubtype activation tag size sum\n");
} else {
fprintf(fd,
"# logticin logtic injtic endtic dtic step rank otherrank itype "
" isubtype tag size nr_tests tsum tmin tmax\n");
}
fclose(fd);
}
MPI_Barrier(MPI_COMM_WORLD);
const char *types[] = {"send", "recv"};
/* Loop over all ranks, one by one, getting each rank to append their
* logs. */
for (int k = 0; k < nranks; k++) {
/* Rank 0 decides the index of the writing node, this happens
* one-by-one. */
int kk = k;
MPI_Bcast(&kk, 1, MPI_INT, 0, MPI_COMM_WORLD);
if (kk == myrank) {
/* Open file and position at end. */
fd = fopen(dumpfile, "a");
/* And append our logs. Note log->tic is not necessarily from this
* machine, so the conversion to ms may be suspect. We also rebase a
* version to match the expected injection times for this new run. */
size_t nlogs = mpiuse_log_count;
ticks basetics = 0;
long long sum = 0;
for (size_t k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = &mpiuse_log[k];
if (log->rank == myrank && log->endtic > 0) {
if (basetics == 0) basetics = log->tic;
if (standard) {
fprintf(fd, "%lld %lld %lld %d %d %d %s %d %s %d %d %d %lld %lld\n",
log->injtic, log->injtic, log->endtic - log->injtic,
log->step, log->rank, log->otherrank,
types[log->type - SEND_TYPE], log->type, "none",
log->subtype, log->activation, log->tag, log->size, sum);
sum += log->size;
} else {
fprintf(
fd,
"%lld %.4f %.4f %.4f %.6f %d %d %d %d %d %d %zd %d %.4f %.6f "
"%.6f\n",
log->tic, clocks_from_ticks(log->tic - basetics),
clocks_from_ticks(log->injtic - clocks_start_ticks),
clocks_from_ticks(log->endtic - clocks_start_ticks),
clocks_from_ticks(log->endtic - log->injtic), log->step,
log->rank, log->otherrank, log->type, log->subtype, log->tag,
log->size, log->nr_tests, clocks_from_ticks(log->tsum),
clocks_from_ticks(log->tmin), clocks_from_ticks(log->tmax));
}
}
}
fclose(fd);
}
/* Need to stay in step. */
MPI_Barrier(MPI_COMM_WORLD);
}
}
/**
* @brief return the number of log entries.
*
* @result the number of log entries.
*/
int mpiuse_nr_logs(void) { return mpiuse_log_count; }
/**
* @brief return the number of ranks in log.
*
* @result the number of ranks we've seen.
*/
int mpiuse_nr_ranks(void) { return mpiuse_max_rank + 1; }
/**
* @brief get a log entry.
*
* @param ind the index of the entry required.
* @result NULL if not available.
*/
struct mpiuse_log_entry *mpiuse_get_log(int ind) {
if (ind < mpiuse_log_count && ind >= 0) return &mpiuse_log[ind];
return NULL;
}
/**
* @brief return random number from a upper part of gaussian distribution.
*
* @result the random.
*/
static double gauss_rand_upper(void) {
double V1, V2, S;
do {
double U1 = drand48();
double U2 = drand48();
V1 = U1 - 1.0;
V2 = U2 - 1.0;
S = V1 * V1 + V2 * V2;
} while (S >= 1.0 || S == 0.0);
return fabs(V1 * sqrt(-2.0 * log(S) / S));
}
/**
* @brief generate a list of fake exchanges as mpiuse logs.
*
* @param nr_nodes the number of ranks that will be used.
* @param nr_logs the number of logs to generate per rank.
* @param size bytes per message, unless random when this is the maximum
* and the minimum is 1 for uniform, if using a gaussian
* distribution the value is a 2.5 sigma, for a CDF based
* selection this is just a scale factor of the values.
* @param random whether to use random sizes.
* @param seed the random seed, use same for fixed sequences.
* @param uniform whether to use a uniform distribution or gaussian, unless
* cdf is defined, in which case this parameter is ignored.
* @param cdf text file containing a normalized CDF to use as a basis for
* inverse transform sampling of the randoms. NULL for no file.
* @param odata text file containing a values representing occurences of the
* expected distribution -- converted into a normalised CDF to
* use as a basis for inverse transform sampling of the
* randoms. NULL for no file. Not used if cdf is not NULL.
*/
void mpiuse_log_generate(int nr_nodes, int nr_logs, int size, int random,
long int seed, int uniform, const char *cdf,
const char *odata) {
/* Only used for CDFs, may need to increase these. */
int nvals = 0;
double imin[NHIST], imax[NHIST], value[NHIST];
/* Note that each rank exchanges messages with all the others and each "log"
* has the same size. */
/* Set seed. */
if (random) srand48(seed);
/* Check for CDF. This should be based on some real distribution, the format
* is same as output from TOPCAT, i.e. bin-low, bin-high, value space
* separated values. Note the value column should be normalised into the
* range 0 to 1 so that it maps into a uniform random distribution. */
if (cdf != NULL) {
FILE *infile = fopen(cdf, "r");
if (infile == NULL) error("Failed to open CDF file: %s", cdf);
char line[132];
while (!feof(infile)) {
if (fgets(line, 132, infile) != NULL) {
if (line[0] != '#') {
int nread = sscanf(line, "%lf %lf %lf", &imin[nvals], &imax[nvals],
&value[nvals]);
if (nread == 3) nvals++;
}
}
}
fclose(infile);
} else if (odata != NULL) {
double *values;
int nvalues;
if (histread(odata, &values, &nvalues)) {
// printf("## Read %d occurence values from %s\n", nvalues, odata);
struct histogram *h = calloc(1, sizeof(struct histogram));
histmake(nvalues, values, h);
// printf("## Created cumulative histogram with %d values:\n",
// h->nvalues); printf("# value sum\n");
imin[0] = 0.0;
imax[0] = h->values[0];
value[0] = h->sums[0];
for (int k = 1; k < h->nvalues; k++) {
imin[k] = h->values[k - 1];
imax[k] = h->values[k];
value[k] = h->sums[k];
// printf("%f %24.17g\n", h->values[k], h->sums[k]);
}
nvals = h->nvalues;
free(h);
free(values);
} else {
error("Failed to read occurrence data from file: %s", odata);
}
}
/* Message tags increment with across rank logs. */
int tag = 1;
for (int k = 0; k < nr_logs; k++) {
/* Set size for this messages. */
double logsize = size;
if (random) {
if (cdf || odata) {
/* CDF based randoms. */
double rand = drand48();
/* Binary search for containing bin for this rand. */
unsigned int lower = 0;
unsigned int upper = nvals;
unsigned int middle = 0;
while (lower < upper) {
middle = (upper + lower) / 2;
if (rand > value[middle])
lower = middle + 1;
else
upper = middle;
}
logsize = 0.5 * (imax[middle] + imin[middle]);
} else if (uniform) {
/* Uniform randoms in the range 0 to 1 */
logsize = (drand48() * (double)size) + 1;
} else {
// Gaussian randoms so no maximum, assume size is 2.5 sigma.
logsize = (gauss_rand_upper() * (double)size * 0.25) + 1;
}
}
/* Cannot send more than 2^31-1 bytes at a time, so truncate. */
if (logsize > 2147483647.0) {
message("CDF size too large : %f, truncating", logsize);
logsize = 2147483647.0;
}
for (int i = 0; i < nr_nodes; i++) {
for (int j = 0; j < nr_nodes; j++) {
if (i != j) {
mpiuse_log_allocation(i, 1, k, SEND_TYPE, NO_SUBTYPE, 1, logsize, j,
tag);
mpiuse_log_allocation(j, 1, k, RECV_TYPE, NO_SUBTYPE, 1, logsize, i,
tag);
}
}
}
tag++;
}
}
/**
* Shuffle log pointers randomizing the order.
*
* Note assumes dran48() has been seeded.
*
* @param logs the log pointers to shuffle.
* @param nlogs the number of logs.
*/
void mpiuse_shuffle_logs(struct mpiuse_log_entry **logs, int nlogs) {
struct mpiuse_log_entry tmp;
for (int k = nlogs - 1; k > 0; k--) {
unsigned int j = (unsigned int)(drand48() * (k + 1));
memcpy(&tmp, &logs[j], sizeof(struct mpiuse_log_entry *));
memcpy(&logs[j], &logs[k], sizeof(struct mpiuse_log_entry *));
memcpy(&logs[k], &tmp, sizeof(struct mpiuse_log_entry *));
}
}