-
Peter W. Draper authoredPeter W. Draper authored
mpistalls-timed.c 9.86 KiB
/**
* Attempt to reproduce the asynchronous stalling that we see for medium
* busy steps in SWIFT.
*
* Timed injection version.
*
* So we need to setup a multithreaded MPI program that performs asynchronous
* exchanges of various data sizes and continuously checks the requests for
* completion. Also need timers to record the time taken by all this...
*/
#include <stdio.h>
#include <mpi.h>
#include <pthread.h>
#include <stdlib.h>
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"
/* Global: Our rank for all to see. */
int myrank = -1;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;
/* Global communicators for each of the subtypes. */
static const int task_subtype_count = 30; // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[30];
/* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue;
static int volatile ind_req = 0;
static int volatile nr_reqs = 0;
static int volatile injecting = 1;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0;
static int volatile ind_recv = 0;
static int volatile todo_recv = 0;
static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends = 0;
static int volatile ind_send = 0;
static int volatile todo_send = 0;
/* Delta tics that we do not wait for. XXX measure this. */
const ticks looptics = 10000;
/* CPU frequency of the machine that created the MPI log. */
// XXX need to store this in the data file.
static double clocks_cpufreq = 2194844448.0;
/* Injection thread, initiates MPI_Isend and MPI_Irecv requests at various
* times. */
static void *inject_thread(void *arg) {
message("%d: injection thread starts", *((int *)arg));
ticks starttics = getticks();
/* Ticks of our last attempt. */
ticks basetic = reqs_queue[0]->tic;
while (ind_req < nr_reqs) {
struct mpiuse_log_entry *log = reqs_queue[ind_req];
/* Expect time between this request and the previous one. */
ticks dt = log->tic - basetic;
double ddt = (double)log->tic - (double)basetic;
ticks oldbasetic = basetic;
basetic = log->tic;
/* Guess some time below which we should not attempt to wait, otherwise
* we'll start to overrun, and just inject the next call if we are below
* that (basically how many ticks this loop takes without any
* waiting). Otherwise wait a while. */
if (dt > looptics) {
struct timespec sleep;
sleep.tv_sec = 0;
/* Remember to be fair and remove the looptics, then convert to
* nanoseconds. */
double ns = (double)(dt - looptics) / clocks_cpufreq * 1.0e9;
if (ns < 1.0e9) {
sleep.tv_nsec = (long) ns;
} else {
/* Wait more than one second. Must be an error, but complain and
* continue. */
sleep.tv_nsec = (long) 1.0e9;
message("wait greater than one second");
}
nanosleep(&sleep, NULL);
}
// Differences to SWIFT: MPI_BYTE might overflow, should use MPI_Type(?).
int err = 0;
if (log->type == task_type_send) {
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank,
log->tag, subtypeMPI_comms[log->subtype], &log->req);
/* Add a new send request. */
int ind = atomic_inc(&nr_sends);
sends_queue[ind] = log;
atomic_inc(&todo_send);
} else {
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank,
log->tag, subtypeMPI_comms[log->subtype], &log->req);
/* Add a new recv request. */
int ind = atomic_inc(&nr_recvs);
recvs_queue[ind] = log;
atomic_inc(&todo_recv);
}
if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++;
}
message("%d injections completed, sends = %d, recvs = %d", ind_req,
nr_sends, nr_recvs);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
atomic_dec(&injecting);
return NULL;
}
/* Send thread, checks if MPI_Isend requests have completed. */
static void *send_thread(void *arg) {
message("%d: send thread starts (%d)", *((int *)arg), injecting);
ticks starttics = getticks();
int res;
MPI_Status stat;
// Need a test that only exits when requests are all inserted and we have
// emptied our queue. */
size_t attempts = 0;
while (injecting || (!injecting && todo_send > 0)) {
int nsends = nr_sends;
for (int k = 0; k < nsends; k++) {
struct mpiuse_log_entry *log = sends_queue[k];
if (log != NULL) {
attempts++;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
if (res) {
/* Done, clean up. */
//message("MPI_Test successful");
free(log->data);
sends_queue[k] = NULL;
atomic_dec(&todo_send);
}
}
}
}
message("sends completed, required %zd attempts (left: %d)", attempts,
todo_send);
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
return NULL;
}
/* Recv thread, checks if MPI_Irecv requests have completed. */
static void *recv_thread(void *arg) {
message("%d: recv thread starts", *((int *)arg));
ticks starttics = getticks();
int res;
MPI_Status stat;
size_t attempts = 0;
while (injecting || (!injecting && todo_recv > 0)) {
int nrecvs = nr_recvs;
for (int k = 0; k < nrecvs; k++) {
struct mpiuse_log_entry *log = recvs_queue[k];
if (log != NULL) {
attempts++;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
if (res) {
/* Done, clean up. */
//message("MPI_Test successful");
free(log->data);
recvs_queue[k] = NULL;
atomic_dec(&todo_recv);
}
}
}
}
message("recvs completed, required %zd attempts (left: %d)", attempts,
todo_recv);
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
return NULL;
}
/* Comparison function for logged times. */
static int cmp_logs(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
/* Large unsigned values, so take care. */
if (l1->tic > l2->tic)
return 1;
if (l1->tic < l2->tic)
return -1;
return 0;
}
/* Pick out the relevant logging data for our rank, i.e. all activations of
* sends and recvs. */
static void pick_logs(void) {
size_t nlogs = mpiuse_nr_logs();
int nranks = mpiuse_nr_ranks();
/* Duplicate of logs. XXX could loop twice to reduce memory use if needed. */
reqs_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_reqs= 0;
sends_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_sends= 0;
recvs_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_recvs = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation) {
if (log->type == task_type_send || log->type == task_type_recv) {
/* Allocate space for data. */
log->data = calloc(log->size, 1);
/* And keep this log. */
reqs_queue[nr_reqs] = log;
nr_reqs++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
}
}
/* Sort into increasing time. */
qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
}
int main(int argc, char *argv[]) {
/* First we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. */
mpiuse_log_restore("testdata/mpiuse_report-step1.dat");
int nranks = mpiuse_nr_ranks();
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
int nr_nodes = 0;
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
if (res != MPI_SUCCESS)
error("MPI_Comm_size failed with error %i.", res);
/* This should match the expected size. */
if (nr_nodes != nranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_nodes);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Create communicators for each subtype of the tasks. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
}
/* Each rank requires its own queue, so extract them. */
pick_logs();
/* Start time. */
message("Start of MPI tests");
clocks_set_cpufreq(0);
/* Make three threads, one for injecting tasks and two to check for
* completions of the sends and recv independently. */
pthread_t injectthread;
if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
error("Failed to create injection thread.");
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread, NULL);
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
return 0;
}