Skip to content
Snippets Groups Projects
Commit d7e82b66 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'master' into fixed-size

Conflicts:
	swiftmpistepsim.c
parents 2ffa242b 615283c3
No related branches found
No related tags found
1 merge request!5Add an option to fix the message size
swiftmpistepsim
swiftmpifakestepsim
*.o
*~
CFLAGS = -g -O0 -Wall
all: swiftmpistepsim swiftmpifakestepsim
all: swiftmpistepsim
SRC = mpiuse.c clocks.c histogram.c
INC = mpiuse.h atomic.h cycle.h clocks.h histogram.h
swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c
$(CC) $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c -I/usr/include/mpi -lmpi -lpthread
swiftmpistepsim: swiftmpistepsim.c $(SRC) $(INC)
mpicc $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c $(SRC) -lpthread -lm
swiftmpifakestepsim: swiftmpifakestepsim.c $(SRC) $(INC)
mpicc $(CFLAGS) -o swiftmpifakestepsim swiftmpifakestepsim.c $(SRC) -lpthread -lm
clean:
rm swiftmpistepsim
rm -f swiftmpistepsim
rm -f swiftmpifakestepsim
swiftmpifakestepsim
===================
The swiftmpifakestepsim program works much like swiftmpistepsim, except that
it generates fake logs:
```
Usage: ./swiftmpifakestepsim [options] nr_messages logfile.dat
options: -v verbose, -d data check, -s size (bytes/scale),
-f <1|2> randomize injection order, 1 == just sends, 2 == sends and recvs
[-r uniform random from 1 to size, |
-r -g half gaussian random from 1 with 2.5 sigma size., |
-r -c <file> use cdf from file, size is a scale factor., |
-r -o <file> use occurence sample of values in a file, size is a scale factor.,]
-x random seed
```
To use a fixed message size just use `-s`.
To use a uniform distribution in the range 1 to N:
```
-s N -r
```
To use a half gaussian (so biased towards smaller packets) use:
```
-r -s <scale> -g
```
The cdf option reads a simple text file with a sampling of a cumulative
distribution function, where each line has three values, the minimum and
maximum range of the current bin and the value. Note that the value column
should be normalized into the range 0 to 1.
```
-r -s <scale> -c <cdf_file>
```
The occurrence file has just one value per line, these should present the
sizes of the packets, this is used to form a cdf:
```
-r -s <scale> -o <occurrence_file>
```
Other options are useful to make sure that the randoms are different, `-x` and
that they run in different order `-f <1|2>`.
Peter W. Draper 24 Apr 2023
---------------------------
......@@ -64,3 +64,14 @@ eager exchanges are working and what effect the size of the packets has.
---------------------------
Peter W. Draper 24 Sep 2019.
Continuing from the notes above, there are also RDMA based versions of
swiftmpistepsim, which are found in the various branches of the
repository. That work and a re-working of SWIFT to use RDMA revealed that the
main driver for the delays wasn't the scatter in MPI completion, but the time
taken to copy data into and out of registered memory, the best solution to
which was to make better use of the memory bandwidth by copying using multiple
threads.
-----------------------------
Peter W. Draper 24 April 2023.
#include <mpi.h>
#include "clocks.h"
extern int myrank;
......
#!/bin/bash
# Clang format command, can be overridden using CLANG_FORMAT_CMD.
# We currrently use version 5.0 so any overrides should provide that.
clang=${CLANG_FORMAT_CMD:="clang-format-5.0"}
# We currrently use version 13.0 so any overrides should provide that.
clang=${CLANG_FORMAT_CMD:="clang-format-13.0"}
# Formatting command
cmd="$clang -style=file $(git ls-files | grep '\.[ch]$')"
......
#include "histogram.h"
#include <float.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/*
* Simple histogram routines.
int main(int argc, char *argv[]) {
double *values;
int nvalues;
if (histread(argv[1], &values, &nvalues)) {
printf("## Read %d values from %s\n", nvalues, argv[1]);
struct histogram *h = calloc(1, sizeof(struct histogram));
histmake(nvalues, values, h);
printf("## Created cumulative histogram with %d values:\n", h->nvalues);
printf("# value sum\n");
for (int k = 0; k < h->nvalues; k++) {
printf("%f %24.17g\n", h->values[k], h->sums[k]);
}
free(h);
free(values);
return 0;
}
return 1;
}
*/
/**
* Create a histogram by binning a number of data values.
*
* The histogram is returned in a histogram struct, which includes the basic
* histogram (with NHIST values) and a normalized cumulative version (with
* h->nvals values, as the zero bins are coalesced).
*
* @param nvalues number of values.
* @param values the data values to bin into a histogram.
* @param h the #histogram.
*/
void histmake(int nvalues, double *values, struct histogram *h) {
/* Find the minimum and maximum values. */
double dmin = DBL_MAX;
double dmax = -DBL_MAX;
for (int i = 0; i < nvalues; i++) {
dmin = fmin(dmin, values[i]);
dmax = fmax(dmax, values[i]);
}
/* Form the fixed width bin histogram. */
double scale = (double)NHIST / (dmax - dmin);
h->width = 1.0 / scale;
h->zero = dmin - h->width / 2.0;
double sum = 0.0;
for (int i = 0; i < nvalues; i++) {
int idiff = (int)round(scale * ((double)values[i] - dmin));
h->hist[idiff] += 1;
sum++;
}
double norm = 1.0 / sum;
/* Form cumulative sums and count used bins. */
sum = 0.0;
int lastbin = 0;
int usedbins = 0;
for (int i = 0; i < NHIST; i++) {
/* Zero bins are folded into a range. */
if (h->hist[i] > 0) {
/* Value is mid of bin. */
h->values[usedbins] = 0.5 * ((lastbin * h->width + h->zero) +
((i + 1) * h->width + h->zero));
sum += h->hist[i] * norm;
h->sums[usedbins] = sum;
usedbins++;
lastbin = i + 1;
}
}
h->nvalues = usedbins;
}
/**
* Read in data to histogram. Assumes a simple text file with one value per
* line.
*
* @param file the name of the file to read.
* @param values the extracted values. Free when not needed.
* @param nvalues the number of values.
*
* @result 1 for successful, 0 otherwise.
*/
int histread(const char *file, double **values, int *nvalues) {
*values = NULL;
*nvalues = 0;
FILE *infile = fopen(file, "r");
if (infile == NULL) {
printf("Failed to open sizes file: %s\n", file);
return 0;
}
/* Initial space for data. */
int nvals = 0;
int maxvals = 1024;
double *vals = malloc(maxvals * sizeof(double));
char line[132];
while (!feof(infile)) {
if (fgets(line, 132, infile) != NULL) {
if (line[0] != '#') {
int n = sscanf(line, "%lf", &vals[nvals]);
if (n != 1) {
printf("Incorrect no. of values %s\n", line);
fclose(infile);
free(vals);
return 0;
}
nvals++;
/* Make more space. */
if (nvals > maxvals) {
maxvals += 1024;
vals = realloc(vals, maxvals * sizeof(double));
}
}
}
}
fclose(infile);
*values = vals;
*nvalues = nvals;
return 1;
}
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2021 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
#ifndef SWIFT_HISTOGRAM_H
#define SWIFT_HISTOGRAM_H
/* Bins in a histogram. Note this must be an even number. */
#define NHIST 65536
/** Histogram structure. */
struct histogram {
/* Raw histogram. NHIST counts in hist and the value associated with a bin
* is index*width+zero. */
double width;
double zero;
int hist[NHIST];
/* Normalized cumulative histogram. Empty bins are joined into a larger one,
* so values are the bin centre, sums the sum to that bin and nvalues the
* number of bins that have been populated, can be less than NHIST. */
double values[NHIST];
double sums[NHIST];
int nvalues;
};
int histread(const char *file, double **values, int *nvalues);
void histmake(int nvalues, double *values, struct histogram *h);
#endif
......@@ -22,6 +22,7 @@
*/
/* Standard includes. */
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
......@@ -34,6 +35,7 @@
#include "clocks.h"
#include "cycle.h"
#include "error.h"
#include "histogram.h"
/* Our rank. */
extern int myrank;
......@@ -257,3 +259,173 @@ struct mpiuse_log_entry *mpiuse_get_log(int ind) {
if (ind < mpiuse_log_count && ind >= 0) return &mpiuse_log[ind];
return NULL;
}
/**
* @brief return random number from a upper part of gaussian distribution.
*
* @result the random.
*/
static double gauss_rand_upper(void) {
double V1, V2, S;
do {
double U1 = drand48();
double U2 = drand48();
V1 = U1 - 1.0;
V2 = U2 - 1.0;
S = V1 * V1 + V2 * V2;
} while (S >= 1.0 || S == 0.0);
return fabs(V1 * sqrt(-2.0 * log(S) / S));
}
/**
* @brief generate a list of fake exchanges as mpiuse logs.
*
* @param nr_nodes the number of ranks that will be used.
* @param nr_logs the number of logs to generate per rank.
* @param size bytes per message, unless random when this is the maximum
* and the minimum is 1 for uniform, if using a gaussian
* distribution the value is a 2.5 sigma, for a CDF based
* selection this is just a scale factor of the values.
* @param random whether to use random sizes.
* @param seed the random seed, use same for fixed sequences.
* @param uniform whether to use a uniform distribution or gaussian, unless
* cdf is defined, in which case this parameter is ignored.
* @param cdf text file containing a normalized CDF to use as a basis for
* inverse transform sampling of the randoms. NULL for no file.
* @param odata text file containing a values representing occurences of the
* expected distribution -- converted into a normalised CDF to
* use as a basis for inverse transform sampling of the
* randoms. NULL for no file. Not used if cdf is not NULL.
*/
void mpiuse_log_generate(int nr_nodes, int nr_logs, int size, int random,
long int seed, int uniform, const char *cdf,
const char *odata) {
/* Only used for CDFs, may need to increase these. */
int nvals = 0;
double imin[NHIST], imax[NHIST], value[NHIST];
/* Note that each rank exchanges messages with all the others and each "log"
* has the same size. */
/* Set seed. */
if (random) srand48(seed);
/* Check for CDF. This should be based on some real distribution, the format
* is same as output from TOPCAT, i.e. bin-low, bin-high, value space
* separated values. Note the value column should be normalised into the
* range 0 to 1 so that it maps into a uniform random distribution. */
if (cdf != NULL) {
FILE *infile = fopen(cdf, "r");
if (infile == NULL) error("Failed to open CDF file: %s", cdf);
char line[132];
while (!feof(infile)) {
if (fgets(line, 132, infile) != NULL) {
if (line[0] != '#') {
int nread = sscanf(line, "%lf %lf %lf", &imin[nvals], &imax[nvals],
&value[nvals]);
if (nread == 3) nvals++;
}
}
}
fclose(infile);
} else if (odata != NULL) {
double *values;
int nvalues;
if (histread(odata, &values, &nvalues)) {
// printf("## Read %d occurence values from %s\n", nvalues, odata);
struct histogram *h = calloc(1, sizeof(struct histogram));
histmake(nvalues, values, h);
// printf("## Created cumulative histogram with %d values:\n",
// h->nvalues); printf("# value sum\n");
imin[0] = 0.0;
imax[0] = h->values[0];
value[0] = h->sums[0];
for (int k = 1; k < h->nvalues; k++) {
imin[k] = h->values[k - 1];
imax[k] = h->values[k];
value[k] = h->sums[k];
// printf("%f %24.17g\n", h->values[k], h->sums[k]);
}
nvals = h->nvalues;
free(h);
free(values);
} else {
error("Failed to read occurrence data from file: %s", odata);
}
}
/* Message tags increment with across rank logs. */
int tag = 1;
for (int k = 0; k < nr_logs; k++) {
/* Set size for this messages. */
double logsize = size;
if (random) {
if (cdf || odata) {
/* CDF based randoms. */
double rand = drand48();
/* Binary search for containing bin for this rand. */
unsigned int lower = 0;
unsigned int upper = nvals;
unsigned int middle = 0;
while (lower < upper) {
middle = (upper + lower) / 2;
if (rand > value[middle])
lower = middle + 1;
else
upper = middle;
}
logsize = 0.5 * (imax[middle] + imin[middle]);
} else if (uniform) {
/* Uniform randoms in the range 0 to 1 */
logsize = (drand48() * (double)size) + 1;
} else {
// Gaussian randoms so no maximum, assume size is 2.5 sigma.
logsize = (gauss_rand_upper() * (double)size * 0.25) + 1;
}
}
/* Cannot send more than 2^31-1 bytes at a time, so truncate. */
if (logsize > 2147483647.0) {
message("CDF size too large : %f, truncating", logsize);
logsize = 2147483647.0;
}
for (int i = 0; i < nr_nodes; i++) {
for (int j = 0; j < nr_nodes; j++) {
if (i != j) {
mpiuse_log_allocation(i, 1, k, SEND_TYPE, NO_SUBTYPE, 1,
(size_t)logsize, j, tag);
mpiuse_log_allocation(j, 1, k, RECV_TYPE, NO_SUBTYPE, 1,
(size_t)logsize, i, tag);
}
}
}
tag++;
}
}
/**
* Shuffle log pointers randomizing the order.
*
* Note assumes dran48() has been seeded.
*
* @param logs the log pointers to shuffle.
* @param nlogs the number of logs.
*/
void mpiuse_shuffle_logs(struct mpiuse_log_entry **logs, int nlogs) {
struct mpiuse_log_entry tmp;
for (int k = nlogs - 1; k > 0; k--) {
unsigned int j = (unsigned int)(drand48() * (k + 1));
memcpy(&tmp, &logs[j], sizeof(struct mpiuse_log_entry *));
memcpy(&logs[j], &logs[k], sizeof(struct mpiuse_log_entry *));
memcpy(&logs[k], &tmp, sizeof(struct mpiuse_log_entry *));
}
}
......@@ -84,6 +84,13 @@ struct mpiuse_log_entry {
ticks tmin;
};
/* Flags for the types of request when generating fakes. */
#ifndef SEND_TYPE
#define SEND_TYPE 25
#define RECV_TYPE 26
#define NO_SUBTYPE 0
#endif
/* API. */
void mpiuse_log_allocation(int rank, int step, size_t tic, int type,
int subtype, int activation, size_t size,
......@@ -94,4 +101,9 @@ int mpiuse_nr_logs(void);
int mpiuse_nr_ranks(void);
void mpiuse_dump_logs(int nranks, const char *logfile);
void mpiuse_log_generate(int nr_nodes, int nr_logs, int size, int random,
long int seed, int uniform, const char *cdf,
const char *odata);
void mpiuse_shuffle_logs(struct mpiuse_log_entry **logs, int nlogs);
#endif /* SWIFT_MPIUSE_H */
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2021 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"
/* Global: Our rank for all to see. */
int myrank = -1;
/* Are we verbose. */
static int verbose = 0;
/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;
/* Default seed for pseudorandoms. */
static long int default_seed = 1987654321;
/* MPI communicator for each rank. XXX static XXX. */
static MPI_Comm node_comms[512];
/* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue;
static int volatile ind_req = 0;
static int volatile nr_reqs = 0;
static int volatile injecting = 1;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0;
static int volatile ind_recv = 0;
static int volatile todo_recv = 0;
static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends = 0;
static int volatile ind_send = 0;
static int volatile todo_send = 0;
/**
* @brief fill a data area with a pattern that can be checked for changes.
*
* @param size size of data in bytes.
* @param data the data to fill.
*/
static void datacheck_fill(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
p[i] = 170; /* 10101010 in bits. */
}
}
/**
* @brief test a filled data area for our pattern.
*
* @param size size of data in bytes.
* @param data the data to fill.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
if (p[i] != 170) return 0;
}
return 1;
}
/**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
*/
static void *inject_thread(void *arg) {
if (verbose) message("%d: injection thread starts", *((int *)arg));
ticks starttics = getticks();
while (ind_req < nr_reqs) {
struct mpiuse_log_entry *log = reqs_queue[ind_req];
/* Initialise new log elements. */
log->done = 0;
log->nr_tests = 0;
log->tsum = 0.0;
log->tmax = 0;
log->tmin = INT_MAX;
log->endtic = 0;
log->injtic = getticks();
/* Differences to SWIFT: MPI_BYTE not the MPI_Type. */
int err = 0;
if (log->type == SEND_TYPE) {
log->data = calloc(log->size, 1);
/* Fill data with pattern. */
if (datacheck) datacheck_fill(log->size, log->data);
/* And send. */
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
node_comms[log->rank], &log->req);
/* Add a new send request. */
int ind = atomic_inc(&nr_sends);
sends_queue[ind] = log;
atomic_inc(&todo_send);
} else {
/* Ready to receive. */
log->data = calloc(log->size, 1);
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
node_comms[log->otherrank], &log->req);
/* Add a new recv request. */
int ind = atomic_inc(&nr_recvs);
recvs_queue[ind] = log;
atomic_inc(&todo_recv);
}
if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++;
}
/* All done, thread exiting. */
if (verbose) {
message("%d injections completed, sends = %d, recvs = %d", ind_req,
nr_sends, nr_recvs);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
atomic_dec(&injecting);
return NULL;
}
/**
* @brief main loop to run over a queue of MPI requests and test for when they
* complete. Returns the total amount of time spent in calls to MPI_Test and
* the number of times it was called.
*
* @param logs the list of logs pointing to requests.
* @param nr_logs pointer to the variable containing the current number of
* logs.
* @param todos pointer to the variable containing the number of requests that
* are still active.
* @param sum the total number of ticks spent in calls to MPI_Test.
* @param ncalls the total number of calls to MPI_Test.
* @param mint the minimum ticks an MPI_Test call took.
* @param maxt the maximum ticks an MPI_Test call took.
*/
static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
int volatile *todos, double *sum, int *ncalls,
ticks *mint, ticks *maxt) {
/* Global MPI_Test statistics. */
int lncalls = 0;
double lsum = 0.0;
ticks lmint = INT_MAX;
ticks lmaxt = 0;
/* We loop while new requests are being injected and we still have requests
* to complete. */
while (injecting || (!injecting && *todos > 0)) {
int nlogs = *nr_logs;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = logs[k];
if (log != NULL && !log->done) {
ticks tics = getticks();
int res;
MPI_Status stat;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
/* Increment etc. of statistics about time in MPI_Test. */
ticks dt = getticks() - tics;
log->tsum += (double)dt;
lsum += (double)dt;
log->nr_tests++;
lncalls++;
if (dt < log->tmin) log->tmin = dt;
if (dt > log->tmax) log->tmax = dt;
if (dt < lmint) lmint = dt;
if (dt > lmaxt) lmaxt = dt;
if (res) {
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(log->size, log->data)) {
error("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
free(log->data);
atomic_dec(todos);
}
}
}
}
/* All done. */
*sum = lsum;
*ncalls = lncalls;
*mint = lmint;
*maxt = lmaxt;
return;
}
/**
* @brief Send thread, checks if MPI_Isend requests have completed.
*/
static void *send_thread(void *arg) {
if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting);
ticks starttics = getticks();
int ncalls;
double sum;
ticks mint;
ticks maxt;
queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt);
message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time "
"%.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
}
/**
* @brief Recv thread, checks if MPI_Irecv requests have completed.
*/
static void *recv_thread(void *arg) {
if (verbose) message("%d: recv thread starts", *((int *)arg));
ticks starttics = getticks();
int ncalls;
double sum;
ticks mint;
ticks maxt;
queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt);
message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time "
"%.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
}
/**
* @brief Comparison function for logged times.
*/
static int cmp_logs(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
/* Large unsigned values, so take care. */
if (l1->tic > l2->tic) return 1;
if (l1->tic < l2->tic) return -1;
return 0;
}
/**
* @brief Pick out the relevant logging data for our rank, i.e. all
* activations of sends and recvs. We ignore the original completions.
* The final list is sorted into increasing time of activation if required,
* otherwise the order is randomized.
*
* @param random randomize injection order of sends or sends and recvs,
* otherwise use order of the original logs. Just sends
* are randomized when 1, 2 randomizes both, 0 neither.
*/
static void pick_logs(int random) {
size_t nlogs = mpiuse_nr_logs();
/* Duplicate of logs. */
reqs_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_reqs = 0;
sends_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends = 0;
recvs_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recvs = 0;
if (random == 0 || random == 2) {
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation) {
log->data = NULL;
reqs_queue[nr_reqs] = log;
nr_reqs++;
}
}
if (random == 0) {
/* Sort into increasing time. */
qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
} else {
/* Randomize the order, so ranks do not all work in sequence. */
mpiuse_shuffle_logs(reqs_queue, nr_reqs);
}
/* Check. */
if (random == 0) {
for (int k = 0; k < nr_reqs - 1; k++) {
if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic)
message("reqs_queue: %lld > %lld", reqs_queue[k]->tic,
reqs_queue[k + 1]->tic);
}
}
} else {
/* Randomizing the sends, but injecting the recvs first. */
/* Get recvs. */
int nrecv = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation && log->type == RECV_TYPE) {
log->data = NULL;
reqs_queue[nr_reqs] = log;
nr_reqs++;
nrecv++;
}
}
/* These are sorted into log time order. */
qsort(reqs_queue, nrecv, sizeof(struct mpiuse_log_entry *), cmp_logs);
/* Now the sends. */
int nsend = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation && log->type == SEND_TYPE) {
log->data = NULL;
reqs_queue[nr_reqs] = log;
nr_reqs++;
nsend++;
}
}
/* These are randomized. */
mpiuse_shuffle_logs(&reqs_queue[nrecv], nsend);
}
}
/**
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s [options] nr_messages logfile.dat\n", argv[0]);
fprintf(stderr,
" options: -v verbose, -d data check, -s size (bytes/scale), \n"
"\t -f <1|2> randomize injection order, 1 == just sends, "
"2 == sends and recvs\n"
"\t[-r uniform random from 1 to size, | \n"
"\t-r -g half gaussian random from 1 with 2.5 sigma size., | \n"
"\t-r -c <file> use cdf from file, size is a scale factor., |\n"
"\t-r -o <file> use occurence sample of values in a file, size is a "
"scale factor.,] \n"
"\t-x random seed\n");
fflush(stderr);
}
/**
* @brief main function.
*/
int main(int argc, char *argv[]) {
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
int nr_nodes = 0;
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Handle the command-line, we expect the number of messages to exchange per
* rank an output log and some options, the interesting ones are a size and
* whether to use a random selections of various kinds. */
int size = 1024;
int random = 0;
int randomorder = 0;
int uniform = 1;
char *cdf = NULL;
char *odata = NULL;
int opt;
unsigned int seed = default_seed;
while ((opt = getopt(argc, argv, "vds:rgx:c:o:f:")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
break;
case 'c':
cdf = optarg;
break;
case 'f':
randomorder = atoi(optarg);
break;
case 'g':
uniform = 0;
break;
case 's':
size = atoi(optarg);
break;
case 'r':
random = 1;
break;
case 'o':
odata = optarg;
break;
case 'v':
verbose = 1;
break;
case 'x':
seed = atol(optarg);
break;
default:
if (myrank == 0) usage(argv);
return 1;
}
}
if (optind >= argc - 1) {
if (myrank == 0) usage(argv);
return 1;
}
if (cdf != NULL && odata != NULL)
error("Cannot use -c and -o options together");
int nr_logs = atoi(argv[optind]);
if (nr_logs == 0)
error("Expected number of messages to exchange, got: %s", argv[optind]);
char *logfile = argv[optind + 1];
/* Generate the fake logs for the exchanges. */
if (myrank == 0) {
if (random) {
if (cdf != NULL) {
message(
"Generating %d fake logs for %d ranks with randoms"
" based on cdf %s scaled by factor %d",
nr_logs, nr_nodes, cdf, size);
} else if (odata != NULL) {
message(
"Generating %d fake logs for %d ranks with randoms"
" based on occurence data %s scaled by factor %d",
nr_logs, nr_nodes, cdf, size);
} else if (uniform) {
message(
"Generating %d fake logs for %d ranks with random distribution"
" using size %d",
nr_logs, nr_nodes, size);
} else {
message(
"Generating %d fake logs for %d ranks with gaussian random "
"distribution using size %d as 2.5 sigma",
nr_logs, nr_nodes, size);
}
} else {
message("Generating %d fake logs for %d ranks of size %d", nr_logs,
nr_nodes, size);
}
}
mpiuse_log_generate(nr_nodes, nr_logs, size, random, seed, uniform, cdf,
odata);
int nranks = mpiuse_nr_ranks();
/* Create communicators for each MPI rank. */
for (int i = 0; i < nr_nodes; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &node_comms[i]);
}
/* Each rank requires its own queue, so extract them. */
if (verbose && myrank == 0) {
if (randomorder == 0) message("Message order same as generated");
if (randomorder == 1) message("Message send order randomized");
if (randomorder == 2) message("Message send and recv order randomized");
}
pick_logs(randomorder);
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
clocks_set_cpufreq(0);
if (myrank == 0) {
message("Start of MPI tests");
message("==================");
if (verbose) {
if (datacheck)
message("checking data pattern on send and recv completion");
}
}
/* Make three threads, one for injecting tasks and two to check for
* completions of the sends and recv independently. */
pthread_t injectthread;
if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
error("Failed to create injection thread.");
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread, NULL);
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Dump the updated MPI logs. */
MPI_Barrier(MPI_COMM_WORLD);
fflush(stdout);
if (myrank == 0) message("Dumping updated log");
mpiuse_dump_logs(nranks, logfile);
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
if (myrank == 0) message("Bye");
return 0;
}
......@@ -42,13 +42,16 @@ static int usetics = 1;
* zero . */
static size_t messagesize = 0;
/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;
static const int task_type_send = 25;
static const int task_type_recv = 26;
/* Global communicators for each of the subtypes. */
static const int task_subtype_count = 30; // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[30];
static const int task_subtype_count = 34; // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[task_subtype_count];
/* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue;
......@@ -68,6 +71,35 @@ static int volatile todo_send = 0;
// XXX need to store this in the data file.
static double log_clocks_cpufreq = 2194844448.0;
/**
* @brief fill a data area with a pattern that can be checked for changes.
*
* @param size size of data in bytes.
* @param data the data to fill.
*/
static void datacheck_fill(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
p[i] = 170; /* 10101010 in bits. */
}
}
/**
* @brief test a filled data area for our pattern.
*
* @param size size of data in bytes.
* @param data the data to fill.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
if (p[i] != 170) return 0;
}
return 1;
}
/**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
*
......@@ -132,6 +164,11 @@ static void *inject_thread(void *arg) {
int err = 0;
if (log->type == task_type_send) {
log->data = calloc(log->size, 1);
/* Fill data with pattern. */
if (datacheck) datacheck_fill(log->size, log->data);
/* And send. */
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
subtypeMPI_comms[log->subtype], &log->req);
......@@ -141,6 +178,8 @@ static void *inject_thread(void *arg) {
atomic_inc(&todo_send);
} else {
/* Ready to receive. */
log->data = calloc(log->size, 1);
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
subtypeMPI_comms[log->subtype], &log->req);
......@@ -233,6 +272,12 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
if (dt > lmaxt) lmaxt = dt;
if (res) {
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(log->size, log->data)) {
error("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
......@@ -400,8 +445,11 @@ int main(int argc, char *argv[]) {
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int opt;
while ((opt = getopt(argc, argv, "vfs:")) != -1) {
while ((opt = getopt(argc, argv, "vfds:")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
break;
case 'f':
usetics = 0;
break;
......@@ -453,6 +501,11 @@ int main(int argc, char *argv[]) {
message(" ");
message(" Using fixed message size of %zd", messagesize);
}
if (verbose) {
if (!usetics) message("using fast untimed injections");
if (datacheck)
message("checking data pattern on send and recv completion");
}
}
/* Make three threads, one for injecting tasks and two to check for
......
No preview for this file type
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment