Skip to content
Snippets Groups Projects
Commit 9b5b4d58 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Add timed variant that attempts to mimic the original injection time

parent 22d98b76
No related branches found
No related tags found
No related merge requests found
all: mpistalls mpistalls-timed
mpistalls: mpistalls.c mpiuse.c mpiuse.h atomic.h cycle.h mpistalls: mpistalls.c mpiuse.c mpiuse.h atomic.h cycle.h
$(CC) -g -O0 -o mpistalls mpistalls.c mpiuse.c -I/usr/include/mpi -lmpi -lpthread $(CC) -g -O0 -o mpistalls mpistalls.c mpiuse.c -I/usr/include/mpi -lmpi -lpthread
mpistalls-timed: mpistalls-timed.c mpiuse.c mpiuse.h atomic.h cycle.h
$(CC) -g -O0 -o mpistalls-timed mpistalls-timed.c mpiuse.c -I/usr/include/mpi -lmpi -lpthread
clean: clean:
rm mpistalls rm mpistalls mpistalls-timed
/**
* Attempt to reproduce the asynchronous stalling that we see for medium
* busy steps in SWIFT.
*
* Timed injection version.
*
* So we need to setup a multithreaded MPI program that performs asynchronous
* exchanges of various data sizes and continuously checks the requests for
* completion. Also need timers to record the time taken by all this...
*/
#include <stdio.h>
#include <mpi.h>
#include <pthread.h>
#include <stdlib.h>
#include "atomic.h"
#include "error.h"
#include "mpiuse.h"
/* Global: Our rank for all to see. */
int myrank = -1;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;
/* Global communicators for each of the subtypes. */
static const int task_subtype_count = 30; // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[30];
/* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue;
static int volatile ind_req = 0;
static int volatile nr_reqs = 0;
static int volatile injecting = 1;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0;
static int volatile ind_recv = 0;
static int volatile todo_recv = 0;
static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends = 0;
static int volatile ind_send = 0;
static int volatile todo_send = 0;
/* Delta tics that we do not wait for. XXX measure this. */
const ticks looptics = 10000;
/* CPU frequency of the machine that created the MPI log. */
// XXX need to store this in the data file.
static double clocks_cpufreq = 2194844448.0;
/* Injection thread, initiates MPI_Isend and MPI_Irecv requests at various
* times. */
static void *inject_thread(void *arg) {
message("%d: injection thread starts", *((int *)arg));
/* Ticks of our last attempt. */
ticks basetic = reqs_queue[0]->tic;
while (ind_req < nr_reqs) {
struct mpiuse_log_entry *log = reqs_queue[ind_req];
/* Expect time between this request and the previous one. */
ticks dt = log->tic - basetic;
double ddt = (double)log->tic - (double)basetic;
ticks oldbasetic = basetic;
basetic = log->tic;
/* Guess some time below which we should not attempt to wait, otherwise
* we'll start to overrun, and just inject the next call if we are below
* that (basically how many ticks this loop takes without any
* waiting). Otherwise wait a while. */
if (dt > looptics) {
struct timespec sleep;
sleep.tv_sec = 0;
/* Remember to be fair and remove the looptics, then convert to
* nanoseconds. */
double ns = (double)(dt - looptics) / clocks_cpufreq * 1.0e9;
if (ns < 1.0e9) {
sleep.tv_nsec = (long) ns;
} else {
/* Wait more than one second. Must be an error, but complain and
* continue. */
sleep.tv_nsec = (long) 1.0e9;
message("wait greater than one second");
}
//message("injecting wait of: %ld ns", sleep.tv_nsec);
nanosleep(&sleep, NULL);
}
// Differences to SWIFT: MPI_BYTE might overflow, should use MPI_Type(?).
int err = 0;
if (log->type == task_type_send) {
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank,
log->tag, subtypeMPI_comms[log->subtype], &log->req);
/* Add a new send request. */
int ind = atomic_inc(&nr_sends);
sends_queue[ind] = log;
atomic_inc(&todo_send);
} else {
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank,
log->tag, subtypeMPI_comms[log->subtype], &log->req);
/* Add a new recv request. */
int ind = atomic_inc(&nr_recvs);
recvs_queue[ind] = log;
atomic_inc(&todo_recv);
}
if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++;
}
message("%d injections completed, sends = %d, recvs = %d", ind_req,
nr_sends, nr_recvs);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
atomic_dec(&injecting);
return NULL;
}
/* Send thread, checks if MPI_Isend requests have completed. */
static void *send_thread(void *arg) {
message("%d: send thread starts (%d)", *((int *)arg), injecting);
int res;
MPI_Status stat;
// Need a test that only exits when requests are all inserted and we have
// emptied our queue. */
size_t attempts = 0;
while (injecting || (!injecting && todo_send > 0)) {
int nsends = nr_sends;
for (int k = 0; k < nsends; k++) {
struct mpiuse_log_entry *log = sends_queue[k];
if (log != NULL) {
attempts++;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
if (res) {
/* Done, clean up. */
//message("MPI_Test successful");
free(log->data);
sends_queue[k] = NULL;
atomic_dec(&todo_send);
}
}
}
}
message("sends completed, required %zd attempts (left: %d)", attempts,
todo_send);
return NULL;
}
/* Recv thread, checks if MPI_Irecv requests have completed. */
static void *recv_thread(void *arg) {
message("%d: recv thread starts", *((int *)arg));
int res;
MPI_Status stat;
size_t attempts = 0;
while (injecting || (!injecting && todo_recv > 0)) {
int nrecvs = nr_recvs;
for (int k = 0; k < nrecvs; k++) {
struct mpiuse_log_entry *log = recvs_queue[k];
if (log != NULL) {
attempts++;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
if (res) {
/* Done, clean up. */
//message("MPI_Test successful");
free(log->data);
recvs_queue[k] = NULL;
atomic_dec(&todo_recv);
}
}
}
}
message("recvs completed, required %zd attempts (left: %d)", attempts,
todo_recv);
return NULL;
}
/* Comparison function for logged times. */
static int cmp_logs(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
/* Large unsigned values, so take care. */
if (l1->tic > l2->tic)
return 1;
if (l1->tic < l2->tic)
return -1;
return 0;
}
/* Pick out the relevant logging data for our rank, i.e. all activations of
* sends and recvs. */
static void pick_logs(void) {
size_t nlogs = mpiuse_nr_logs();
int nranks = mpiuse_nr_ranks();
/* Duplicate of logs. XXX could loop twice to reduce memory use if needed. */
reqs_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_reqs= 0;
sends_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_sends= 0;
recvs_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_recvs = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation) {
if (log->type == task_type_send || log->type == task_type_recv) {
/* Allocate space for data. */
log->data = calloc(log->size, 1);
/* And keep this log. */
reqs_queue[nr_reqs] = log;
nr_reqs++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
}
}
/* Sort into increasing time. */
qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
}
int main(int argc, char *argv[]) {
/* First we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. */
mpiuse_log_restore("testdata/mpiuse_report-step1.dat");
int nranks = mpiuse_nr_ranks();
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
int nr_nodes = 0;
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
if (res != MPI_SUCCESS)
error("MPI_Comm_size failed with error %i.", res);
/* This should match the expected size. */
if (nr_nodes != nranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_nodes);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Create communicators for each subtype of the tasks. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
}
/* Each rank requires its own queue, so extract them. */
pick_logs();
message("Start of MPI tests");
/* Make three threads, one for injecting tasks and two to check for
* completions of the sends and recv independently. */
pthread_t injectthread;
if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
error("Failed to create injection thread.");
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread, NULL);
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment