Skip to content
Snippets Groups Projects
Commit 6c5205e6 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Add usetics to avoid two programs, get basic stats of calls to MPI_Test

parent b0fbeb0a
No related branches found
No related tags found
No related merge requests found
......@@ -24,6 +24,9 @@ int myrank = -1;
/* Are we verbose. */
static int verbose = 0;
/* Attempt to keep original injection time differences. */
static int usetics = 1;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;
......@@ -48,13 +51,14 @@ static int volatile todo_send = 0;
/* CPU frequency of the machine that created the MPI log. */
// XXX need to store this in the data file.
static double clocks_cpufreq = 2194844448.0;
static double log_clocks_cpufreq = 2194844448.0;
/**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
*
* The requests are initiated in the time order of the original log and an
* attempt to start these with the same interval gap is made.
* attempt to start these with the same interval gap is made if usetics is
* set, otherwise we just do them as quickly as possible.
*/
static void *inject_thread(void *arg) {
......@@ -62,40 +66,41 @@ static void *inject_thread(void *arg) {
message("%d: injection thread starts", *((int *)arg));
ticks starttics = getticks();
/* Ticks of our last attempt. */
/* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */
ticks basetic = reqs_queue[0]->tic;
/* The time that once around this loop takes. We do not wait for injections
* that are closer in time than this value. */
ticks looptics = 0;
while (ind_req < nr_reqs) {
struct mpiuse_log_entry *log = reqs_queue[ind_req];
/* Expect time between this request and the previous one. */
ticks dt = log->tic - basetic;
basetic = log->tic;
/* Guess some time below which we should not attempt to wait, otherwise
* we'll start to overrun, and just inject the next call if we are below
* that (basically how many ticks this loop takes without any
* waiting). Otherwise wait a while. */
if (dt > looptics) {
struct timespec sleep;
sleep.tv_sec = 0;
/* Remember to be fair and remove the looptics, then convert to
* nanoseconds. */
double ns = (double)(dt - looptics) / clocks_cpufreq * 1.0e9;
if (ns < 1.0e9) {
sleep.tv_nsec = (long) ns;
} else {
/* Wait more than one second. Must be an error, but complain and
* continue. */
sleep.tv_nsec = (long) 1.0e9;
message("wait greater than one second");
if (usetics) {
/* Expect time between this request and the previous one. */
ticks dt = log->tic - basetic;
basetic = log->tic;
/* We guess some time below which we should not attempt to wait,
* otherwise we'll start to overrun, and just inject the next call if we
* are below that (we time the ticks this loop takes without any waiting
* and use that). Otherwise we wait a while. Note we need to convert the
* ticks of the log file into nanoseconds, that requires the original
* CPU frequency. */
if (dt > looptics) {
struct timespec sleep;
sleep.tv_sec = 0;
/* Remember to be fair and remove the looptics, then convert to
* nanoseconds. */
double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9;
if (ns < 1.0e9) {
sleep.tv_nsec = (long) ns;
} else {
/* Wait more than one second. Must be an error, but complain and
* continue. */
sleep.tv_nsec = (long) 1.0e9;
message("wait greater than one second");
}
nanosleep(&sleep, NULL);
}
nanosleep(&sleep, NULL);
}
// Differences to SWIFT: MPI_BYTE might overflow, should use MPI_Type(?).
......@@ -124,7 +129,7 @@ static void *inject_thread(void *arg) {
/* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are
* equally timed. */
if (looptics == 0) {
if (looptics == 0 && usetics) {
looptics = starttics - getticks();
if (verbose)
message("injection loop took %.3f %s.", clocks_from_ticks(looptics),
......@@ -145,47 +150,90 @@ static void *inject_thread(void *arg) {
}
/**
* @brief Send thread, checks if MPI_Isend requests have completed.
* @brief main loop to run over a queue of MPI requests and test for when they
* complete. Returns the total amount of time spent in calls to MPI_Test and
* the number of times it was called.
*
* @param logs the list of logs pointing to requests.
* @param nr_logs pointer to the variable containing the current number of
* logs.
* @param todos pointer to the variable containing the number of requests that
* are still active.
* @param sum the total number of ticks spent in calls to MPI_Test.
* @param ncalls the total number of calls to MPI_Test.
* @param mint the minimum ticks an MPI_Test call took.
* @param maxt the maximum ticks an MPI_Test call took.
*/
static void *send_thread(void *arg) {
if (verbose)
message("%d: send thread starts (%d)", *((int *)arg), injecting);
ticks starttics = getticks();
/* We loop while new requests are being injected and then when we still have
* sends to complete. */
size_t attempts = 0;
while (injecting || (!injecting && todo_send > 0)) {
int nsends = nr_sends;
for (int k = 0; k < nsends; k++) {
struct mpiuse_log_entry *log = sends_queue[k];
static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
int volatile *todos, double *sum, int *ncalls,
ticks *mint, ticks *maxt ) {
/* MPI_Test statistics. */
int lncalls = 0;
double lsum = 0.0;
ticks lmint = log_clocks_cpufreq;
ticks lmaxt = 0;
/* We loop while new requests are being injected and we still have requests
* to complete. */
while (injecting || (!injecting && *todos > 0)) {
int nlogs = *nr_logs;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = logs[k];
if (log != NULL) {
attempts++;
ticks tics = getticks();
int res;
MPI_Status stat;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
ticks dt = getticks() - tics;
lsum += (double)dt;
lncalls++;
if (dt < lmint) lmint = dt;
if (dt > lmaxt) lmaxt = dt;
if (res) {
/* Done, clean up. */
free(log->data);
sends_queue[k] = NULL;
atomic_dec(&todo_send);
logs[k] = NULL;
atomic_dec(todos);
}
}
}
}
/* All done, thread exiting. */
/* All done. */
*sum = lsum;
*ncalls = lncalls;
*mint = lmint;
*maxt = lmaxt;
return;
}
/**
* @brief Send thread, checks if MPI_Isend requests have completed.
*/
static void *send_thread(void *arg) {
if (verbose)
message("sends completed, required %zd attempts (left: %d)", attempts,
todo_send);
message("%d: send thread starts (%d)", *((int *)arg), injecting);
ticks starttics = getticks();
int ncalls;
double sum;
ticks mint;
ticks maxt;
queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt);
message("%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time %.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum/ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
}
......@@ -198,38 +246,19 @@ static void *recv_thread(void *arg) {
message("%d: recv thread starts", *((int *)arg));
ticks starttics = getticks();
/* We loop while new requests are being injected and then when we still have
* recvs to complete. */
size_t attempts = 0;
while (injecting || (!injecting && todo_recv > 0)) {
int nrecvs = nr_recvs;
for (int k = 0; k < nrecvs; k++) {
struct mpiuse_log_entry *log = recvs_queue[k];
if (log != NULL) {
attempts++;
int res;
MPI_Status stat;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
if (res) {
/* Done, clean up. */
free(log->data);
recvs_queue[k] = NULL;
atomic_dec(&todo_recv);
}
}
}
}
int ncalls;
double sum;
ticks mint;
ticks maxt;
queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt);
/* All done, thread exiting. */
if (verbose)
message("recvs completed, required %zd attempts (left: %d)", attempts,
todo_recv);
message("%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time %.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum/ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
}
......@@ -298,7 +327,7 @@ int main(int argc, char *argv[]) {
/* First we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. */
mpiuse_log_restore("testdata/mpiuse_report-step1062.dat");
mpiuse_log_restore("testdata/mpiuse_report-step1.dat");
int nranks = mpiuse_nr_ranks();
/* Initiate MPI. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment