-
Peter W. Draper authoredPeter W. Draper authored
mpiuse.c 11.50 KiB
/* This file is part of SWIFT.
* Copyright (c) 2019 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/**
* @file mpiuse.c
* @brief file of routines to report about MPI tasks used in SWIFT.
*/
/* Standard includes. */
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/* Local defines. */
#include "mpiuse.h"
/* Local includes. */
#include "atomic.h"
#include "clocks.h"
#include "cycle.h"
#include "error.h"
/* Our rank. */
extern int myrank;
/* The initial size and increment of the log entries buffer. */
#define MPIUSE_INITLOG 1000000
/* The log of activations and handoffs. All volatile as accessed from threads
* that use the value to synchronise. */
static struct mpiuse_log_entry *volatile mpiuse_log = NULL;
static volatile size_t mpiuse_log_size = 0;
static volatile size_t mpiuse_log_count = 0;
static volatile size_t mpiuse_log_done = 0;
static volatile int mpiuse_max_rank = 0;
/**
* @brief reallocate the entries log if space is needed.
*/
static void mpiuse_log_reallocate(size_t ind) {
if (ind == 0) {
/* Need to perform initialization. Be generous. */
if ((mpiuse_log = (struct mpiuse_log_entry *)malloc(
sizeof(struct mpiuse_log_entry) * MPIUSE_INITLOG)) == NULL)
error("Failed to allocate MPI use log.");
/* Last action. */
mpiuse_log_size = MPIUSE_INITLOG;
} else {
struct mpiuse_log_entry *new_log;
if ((new_log = (struct mpiuse_log_entry *)malloc(
sizeof(struct mpiuse_log_entry) *
(mpiuse_log_size + MPIUSE_INITLOG))) == NULL)
error("Failed to re-allocate MPI use log.");
/* Wait for all writes to the old buffer to complete. */
while (mpiuse_log_done < mpiuse_log_size)
;
/* Copy to new buffer. */
memcpy(new_log, mpiuse_log,
sizeof(struct mpiuse_log_entry) * mpiuse_log_size);
free(mpiuse_log);
mpiuse_log = new_log;
/* Last action, releases waiting threads. */
atomic_add(&mpiuse_log_size, MPIUSE_INITLOG);
}
}
/**
* @brief Log an MPI request or handoff.
*
* @param rank the rank
* @param step the step
* @param tic the ticks at time of log, will be relative.
* @param type the task type (send or recv).
* @param subtype the task subtype.
* @param activation if not is a successful MPI_Test, not MPI_Isend or
* MPI_Irecv.
* @param size the size in bytes of memory to be transfered or received.
* 0 for a deactivation.
* @param otherrank other rank associated with the transfer.
* @param tag the MPI tag.
*/
void mpiuse_log_allocation(int rank, int step, size_t tic, int type,
int subtype, int activation, size_t size,
int otherrank, int tag) {
size_t ind = atomic_inc(&mpiuse_log_count);
/* If we are at the current size we need more space. */
if (ind == mpiuse_log_size) mpiuse_log_reallocate(ind);
/* Other threads wait for space. */
while (ind > mpiuse_log_size)
;
/* Record the log. */
mpiuse_log[ind].activation = activation;
mpiuse_log[ind].data = NULL;
mpiuse_log[ind].otherrank = otherrank;
mpiuse_log[ind].rank = rank;
mpiuse_log[ind].req = MPI_REQUEST_NULL;
mpiuse_log[ind].size = size;
mpiuse_log[ind].step = step;
mpiuse_log[ind].subtype = subtype;
mpiuse_log[ind].tag = tag;
mpiuse_log[ind].tic = tic;
mpiuse_log[ind].type = type;
/* Keep number of ranks for convenience. */
if (rank > mpiuse_max_rank) mpiuse_max_rank = rank;
atomic_inc(&mpiuse_log_done);
}
/**
* @brief restore the log from a dump.
*
* @param filename name of file with the previous dump in.
*/
void mpiuse_log_restore(const char *filename) {
/* Open the input file. */
FILE *fd;
if ((fd = fopen(filename, "r")) == NULL) {
message("Failed to open the MPI use log file '%s'.", filename);
return;
}
/* Read until the end of the file is reached.*/
char line[132];
size_t stic, etic, dtic, size, sum;
int step, rank, otherrank, itype, isubtype, activation, tag;
char type[32], subtype[32];
while (!feof(fd)) {
if (fgets(line, 132, fd) != NULL) {
if (line[0] != '#') {
sscanf(line, "%zd %zd %zd %d %d %d %s %d %s %d %d %d %zd %zd", &stic,
&etic, &dtic, &step, &rank, &otherrank, type, &itype, subtype,
&isubtype, &activation, &tag, &size, &sum);
mpiuse_log_allocation(rank, step, stic, itype, isubtype, activation,
size, otherrank, tag);
}
}
}
fclose(fd);
}
/**
* @brief dump the logs for all ranks to a file.
*
* @param nranks the number of ranks.
* @param dumpfile the file to write
*/
void mpiuse_dump_logs(int nranks, const char *dumpfile) {
/* Make sure output file is empty, only on one rank. */
FILE *fd;
if (myrank == 0) {
fd = fopen(dumpfile, "w");
/* Header. */
fprintf(fd,
"# logticin logtic injtic endtic dtic step rank otherrank itype "
" isubtype tag size nr_tests tsum tmin tmax\n");
fclose(fd);
}
MPI_Barrier(MPI_COMM_WORLD);
/* Loop over all ranks, one by one, getting each rank to append their
* logs. */
for (int k = 0; k < nranks; k++) {
/* Rank 0 decides the index of the writing node, this happens
* one-by-one. */
int kk = k;
MPI_Bcast(&kk, 1, MPI_INT, 0, MPI_COMM_WORLD);
if (kk == myrank) {
/* Open file and position at end. */
fd = fopen(dumpfile, "a");
/* And append our logs. Note log->tic is not necessarily from this
* machine, so the conversion to ms may be suspect. We also rebase a
* version to match the expected injection times for this new run. */
size_t nlogs = mpiuse_log_count;
ticks basetics = 0;
for (size_t k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = &mpiuse_log[k];
if (log->rank == myrank && log->endtic > 0) {
if (basetics == 0) basetics = log->tic;
fprintf(fd,
"%lld %.4f %.4f %.4f %.6f %d %d %d %d %d %d %zd %d %.4f %.6f "
"%.6f\n",
log->tic, clocks_from_ticks(log->tic - basetics),
clocks_from_ticks(log->injtic - clocks_start_ticks),
clocks_from_ticks(log->endtic - clocks_start_ticks),
clocks_from_ticks(log->endtic - log->injtic), log->step,
log->rank, log->otherrank, log->type, log->subtype, log->tag,
log->size, log->nr_tests, clocks_from_ticks(log->tsum),
clocks_from_ticks(log->tmin), clocks_from_ticks(log->tmax));
}
}
fclose(fd);
}
/* Need to stay in step. */
MPI_Barrier(MPI_COMM_WORLD);
}
}
/**
* @brief return the number of log entries.
*
* @result the number of log entries.
*/
int mpiuse_nr_logs(void) { return mpiuse_log_count; }
/**
* @brief return the number of ranks in log.
*
* @result the number of ranks we've seen.
*/
int mpiuse_nr_ranks(void) { return mpiuse_max_rank + 1; }
/**
* @brief get a log entry.
*
* @param ind the index of the entry required.
* @result NULL if not available.
*/
struct mpiuse_log_entry *mpiuse_get_log(int ind) {
if (ind < mpiuse_log_count && ind >= 0) return &mpiuse_log[ind];
return NULL;
}
/**
* @brief return random number from a upper part of gaussian distribution.
*
* @result the random.
*/
static double gauss_rand_upper(void) {
double V1, V2, S;
do {
double U1 = drand48();
double U2 = drand48();
V1 = U1 - 1.0;
V2 = U2 - 1.0;
S = V1 * V1 + V2 * V2;
} while (S >= 1.0 || S == 0.0);
return fabs(V1 * sqrt(-2.0 * log(S) / S));
}
/**
* @brief generate a list of fake exchanges as mpiuse logs.
*
* @param nr_nodes the number of ranks that will be used.
* @param nr_logs the number of logs to generate per rank.
* @param size bytes per message, unless random when this is the maximum
* and the minimum is 1 for uniform, if using a gaussian
* distribution the value is a 2.5 sigma, for a CDF based
* selection this is just a scale factor of the values.
* @param random whether to use random sizes.
* @param seed the random seed, use same for fixed sequences.
* @param uniform whether to use a uniform distribution other gaussian, unless
* cdf is defined, in which case this parameter is ignored.
* @param cdf text file containing a normalized CDF to use as a basis for
* inverse transform sampling of the randoms. NULL for no file.
*/
void mpiuse_log_generate(int nr_nodes, int nr_logs, int size, int random,
long int seed, int uniform, const char *cdf) {
/* Only used for CDF, may need to increase these. */
int nvals = 0;
double imin[1024], imax[1024], value[1024];
/* Note that each rank exchanges messages with all the others and each "log"
* has the same size. */
/* Set seed. */
if (random) srand48(seed);
/* Check for CDF. This should be based on some real distribution, the format
* is same as output from TOPCAT, i.e. bin-low, bin-high, value space
* separated values. Note the value column should be normalised into the
* range 0 to 1 so that it maps into a uniform random distribution. */
if (cdf != NULL) {
FILE *infile = fopen(cdf, "r");
if (infile == NULL) error("Failed to open CDF file: %s", cdf);
char line[132];
while (!feof(infile)) {
if (fgets(line, 132, infile) != NULL) {
if (line[0] != '#') {
int nread = sscanf(line, "%lf %lf %lf", &imin[nvals], &imax[nvals],
&value[nvals]);
if (nread == 3) nvals++;
}
}
}
fclose(infile);
}
/* Message tags increment with across rank logs. */
int tag = 1;
for (int k = 0; k < nr_logs; k++) {
/* Set size for this messages. */
int logsize = size;
if (random) {
if (cdf) {
/* CDF randoms. */
double rand = drand48();
/* Binary search for containing bin for this rand. */
unsigned int lower = 0;
unsigned int upper = nvals;
unsigned int middle = 0;
while (lower < upper) {
middle = (upper + lower) / 2;
if (rand > value[middle])
lower = middle + 1;
else
upper = middle;
}
logsize = 0.5 * (imax[middle] + imin[middle]);
} else if (uniform) {
/* Uniform randoms in the range 0 to 1 */
logsize = (drand48() * (double)size) + 1;
} else {
// Gaussian randoms so no maximum, assume size is 2.5 sigma.
logsize = (gauss_rand_upper() * (double)size * 0.25) + 1;
}
}
for (int i = 0; i < nr_nodes; i++) {
for (int j = 0; j < nr_nodes; j++) {
if (i != j) {
mpiuse_log_allocation(i, 1, k, SEND_TYPE, NO_SUBTYPE, 1, logsize, j,
tag);
mpiuse_log_allocation(j, 1, k, RECV_TYPE, NO_SUBTYPE, 1, logsize, i,
tag);
}
}
}
tag++;
}
}