-
Peter W. Draper authoredPeter W. Draper authored
mpistalls.c 8.06 KiB
/**
* Attempt to reproduce the asynchronous stalling that we see for medium
* busy steps in SWIFT.
*
* So we need to setup a multithreaded MPI program that performs asynchronous
* exchanges of various data sizes and continuously checks the requests for
* completion. Also need timers to record the time taken by all this...
*/
#include <stdio.h>
#include <mpi.h>
#include <pthread.h>
#include <stdlib.h>
#include "atomic.h"
#include "error.h"
#include "mpiuse.h"
/* Global: Our rank for all to see. */
int myrank = -1;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;
/* Global communicators for each of the subtypes. */
static const int task_subtype_count = 30; // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[30];
/* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue;
static int volatile ind_req = 0;
static int volatile nr_reqs = 0;
static int volatile injecting = 1;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0;
static int volatile ind_recv = 0;
static int volatile todo_recv = 0;
static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends = 0;
static int volatile ind_send = 0;
static int volatile todo_send = 0;
/* Injection thread, initiates MPI_Isend and MPI_Irecv requests at various
* times. */
static void *inject_thread(void *arg) {
message("%d: injection thread starts", *((int *)arg));
while (ind_req < nr_reqs) {
struct mpiuse_log_entry *log = reqs_queue[ind_req];
// Differences to SWIFT: MPI_BYTE might overflow, should use MPI_Type(?)
// injections not timed.
int err = 0;
if (log->type == task_type_send) {
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank,
log->tag, subtypeMPI_comms[log->subtype], &log->req);
/* Add a new send request. */
int ind = atomic_inc(&nr_sends);
sends_queue[ind] = log;
atomic_inc(&todo_send);
} else {
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank,
log->tag, subtypeMPI_comms[log->subtype], &log->req);
/* Add a new recv request. */
int ind = atomic_inc(&nr_recvs);
recvs_queue[ind] = log;
atomic_inc(&todo_recv);
}
if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++;
}
message("%d injections completed, sends = %d, recvs = %d", ind_req,
nr_sends, nr_recvs);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
atomic_dec(&injecting);
return NULL;
}
/* Send thread, checks if MPI_Isend requests have completed. */
static void *send_thread(void *arg) {
message("%d: send thread starts (%d)", *((int *)arg), injecting);
int res;
MPI_Status stat;
// Need a test that only exits when requests are all inserted and we have
// emptied our queue. */
size_t attempts = 0;
while (injecting || (!injecting && todo_send > 0)) {
int nsends = nr_sends;
for (int k = 0; k < nsends; k++) {
struct mpiuse_log_entry *log = sends_queue[k];
if (log != NULL) {
attempts++;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
if (res) {
/* Done, clean up. */
//message("MPI_Test successful");
free(log->data);
sends_queue[k] = NULL;
atomic_dec(&todo_send);
}
}
}
}
message("sends completed, required %zd attempts (left: %d)", attempts,
todo_send);
return NULL;
}
/* Recv thread, checks if MPI_Irecv requests have completed. */
static void *recv_thread(void *arg) {
message("%d: recv thread starts", *((int *)arg));
int res;
MPI_Status stat;
size_t attempts = 0;
while (injecting || (!injecting && todo_recv > 0)) {
int nrecvs = nr_recvs;
for (int k = 0; k < nrecvs; k++) {
struct mpiuse_log_entry *log = recvs_queue[k];
if (log != NULL) {
attempts++;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
if (res) {
/* Done, clean up. */
//message("MPI_Test successful");
free(log->data);
recvs_queue[k] = NULL;
atomic_dec(&todo_recv);
}
}
}
}
message("recvs completed, required %zd attempts (left: %d)", attempts,
todo_recv);
return NULL;
}
/* Comparison function for logged times. */
static int cmp_logs(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
/* Large unsigned values, so take care. */
if (l1->tic > l2->tic)
return 1;
if (l1->tic < l2->tic)
return -1;
return 0;
}
/* Pick out the relevant logging data for our rank, i.e. all activations of
* sends and recvs. */
static void pick_logs(void) {
size_t nlogs = mpiuse_nr_logs();
/* Duplicate of logs. XXX could loop twice to reduce memory use if needed. */
reqs_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_reqs= 0;
sends_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_sends= 0;
recvs_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_recvs = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation) {
if (log->type == task_type_send || log->type == task_type_recv) {
/* Allocate space for data. */
log->data = calloc(log->size, 1);
/* And keep this log. */
reqs_queue[nr_reqs] = log;
nr_reqs++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
}
}
/* Sort into increasing time. */
qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
}
int main(int argc, char *argv[]) {
/* First we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. */
mpiuse_log_restore("testdata/mpiuse_report-step1.dat");
int nranks = mpiuse_nr_ranks();
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
int nr_nodes = 0;
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
if (res != MPI_SUCCESS)
error("MPI_Comm_size failed with error %i.", res);
/* This should match the expected size. */
if (nr_nodes != nranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_nodes);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Create communicators for each subtype of the tasks. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
}
message("Starts");
/* Each rank requires its own queue, so extract them. */
pick_logs();
/* Make three threads, one for injecting tasks and two to check for
* completions of the sends and recv independently. */
pthread_t injectthread;
if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
error("Failed to create injection thread.");
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread, NULL);
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
return 0;
}