Skip to content
Snippets Groups Projects
Commit 4de7da6b authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Add more framework for reading log, fulling queues and doing the MPI work

parent 8015bae9
No related branches found
No related tags found
No related merge requests found
mpistalls: mpistalls.c mpiuse.c atomic.h cycle.h
mpistalls: mpistalls.c mpiuse.c mpiuse.h atomic.h cycle.h
$(CC) -g -O0 -o mpistalls mpistalls.c mpiuse.c -I/usr/include/mpi -lmpi -lpthread
clean:
......
#include <mpi.h>
extern int myrank;
/* Exit in error macro. */
#define error(s, ...) \
#define error(s, ...) \
({ \
fflush(stdout); \
fprintf(stderr, "%s:%s():%i: " s "\n", __FILE__, __FUNCTION__, \
fprintf(stderr, "%d/ %s:%s():%i: " s "\n", myrank, __FILE__, __FUNCTION__, \
__LINE__, ##__VA_ARGS__); \
MPI_Abort(MPI_COMM_WORLD, -1); \
})
/* Print a message */
#define message(s, ...) \
#define message(s, ...) \
({ \
printf("%s: " s "\n", __FUNCTION__, ##__VA_ARGS__); \
printf("%d/ %s: " s "\n", myrank, __FUNCTION__, ##__VA_ARGS__); \
})
......@@ -9,42 +9,200 @@
#include <stdio.h>
#include <mpi.h>
#include <pthread.h>
#include <stdlib.h>
#include "atomic.h"
#include "error.h"
#include "mpiuse.h"
/* Integer types of send and recv tasks, must match log. */
static int task_type_send = 22;
static int task_type_recv = 23;
/* Our rank for all to see. */
int myrank = -1;
/* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue;
static int volatile ind_req = 0;
static int volatile nr_reqs = 0;
static int volatile injecting = 1;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0;
static int volatile ind_recv = 0;
static int volatile todo_recv = 0;
static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends = 0;
static int volatile ind_send = 0;
static int volatile todo_send = 0;
/* Injection thread, initiates MPI_Isend and MPI_Irecv requests at various
* times. */
void *inject_thread(void *arg) {
static void *inject_thread(void *arg) {
message("%d: injection thread starts", *((int *)arg));
//atomic_inc(&injecting);
while (1) {
//inject_one();
while (ind_req < nr_reqs) {
struct mpiuse_log_entry *log = reqs_queue[ind_req];
// Differences here, MPI_COMM_WORLD, which will break as tags will not
// be unique, MPI_BYTE might overflow, should use MPI_Type(?).
// Need to use the tic information to time our injections.
int err = 0;
if (log->type == task_type_send) {
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank,
log->tag, MPI_COMM_WORLD, &log->req);
/* Add a new send request. */
int ind = atomic_inc(&nr_sends);
sends_queue[ind] = log;
atomic_inc(&todo_send);
} else {
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->rank,
log->tag, MPI_COMM_WORLD, &log->req);
/* Add a new recv request. */
int ind = atomic_inc(&nr_recvs);
recvs_queue[ind] = log;
atomic_inc(&todo_recv);
}
if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++;
}
message("%d injections completed, sends = %d, recvs = %d", ind_req,
nr_sends, nr_recvs);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
atomic_dec(&injecting);
return NULL;
}
/* Send thread, checks if MPI_Isend requests have completed. */
void *send_thread(void *arg) {
message("%d: send thread starts", *((int *)arg));
while (1) {
//send_test();
static void *send_thread(void *arg) {
message("%d: send thread starts (%d)", *((int *)arg), injecting);
int res;
MPI_Status stat;
// Need a test that only exits when requests are all inserted and we have
// emptied our queue. */
size_t attempts = 0;
while (injecting || (!injecting && todo_send > 0)) {
int nsends = nr_sends;
for (int k = 0; k < nsends; k++) {
struct mpiuse_log_entry *log = sends_queue[k];
if (log != NULL) {
attempts++;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
if (res) {
/* Done, clean up. */
message("MPI_Test successful");
free(log->data);
sends_queue[k] = NULL;
atomic_dec(&todo_send);
}
}
}
}
message("sends completed, required %zd attempts (left: %d)", attempts,
todo_send);
return NULL;
}
/* Recv thread, checks if MPI_Irecv requests have completed. */
void *recv_thread(void *arg) {
static void *recv_thread(void *arg) {
message("%d: recv thread starts", *((int *)arg));
while (1) {
//recv_test();
int res;
MPI_Status stat;
size_t attempts = 0;
while (injecting || (!injecting && todo_recv > 0)) {
int nrecvs = nr_recvs;
for (int k = 0; k < nrecvs; k++) {
struct mpiuse_log_entry *log = recvs_queue[k];
if (log != NULL) {
attempts++;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
if (res) {
/* Done, clean up. */
message("MPI_Test successful");
free(log->data);
recvs_queue[k] = NULL;
atomic_dec(&todo_recv);
}
}
}
}
message("recvs completed, required %zd attempts (left: %d)", attempts,
todo_recv);
return NULL;
}
/* Comparison function for logged times. */
static int cmp_logs(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
return l1->tic - l2->tic;
}
/* Pick out the relevant logging data for our rank, i.e. all activations of
* sends and recvs. */
static void pick_logs() {
size_t nlogs = mpiuse_nr_logs();
int nranks = mpiuse_nr_ranks();
/* Duplicate of logs. XXX could loop twice to reduce memory use if needed. */
reqs_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_reqs= 0;
sends_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_sends= 0;
recvs_queue = (struct mpiuse_log_entry **)
malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
nr_recvs = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation) {
if (log->type == task_type_send || log->type == task_type_recv) {
/* Allocate space for data. */
log->data = calloc(log->size, 1);
/* And keep this log. */
reqs_queue[nr_reqs] = log;
nr_reqs++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
}
}
/* Sort into increasing time. */
qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
}
int main(int argc, char *argv[]) {
/* First we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. */
mpiuse_log_restore("testdata/mpiuse_report-step2.dat");
int nranks = mpiuse_nr_ranks();
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
......@@ -56,12 +214,19 @@ int main(int argc, char *argv[]) {
if (res != MPI_SUCCESS)
error("MPI_Comm_size failed with error %i.", res);
static int myrank = 0;
/* This should match the expected size. */
if (nr_nodes != nranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_nodes);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
message("Hello from rank: %d", myrank);
message("Starts");
/* Each rank requires its own queue, so extract them. */
pick_logs(myrank);
/* Make three threads, one for injecting tasks and two to check for
* completions of the sends and recv independently. */
......@@ -75,11 +240,6 @@ int main(int argc, char *argv[]) {
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Read the MPI logger output that defines the communcations we will
* undertake and the time differences between injections into the queues. */
mpiuse_log_restore("testdata/mpiuse_report-step2.dat");
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread, NULL);
pthread_join(sendthread, NULL);
......
......@@ -34,52 +34,18 @@
#include "cycle.h"
#include "error.h"
/* XXX threading support not needed, should remove. */
/* The initial size and increment of the log entries buffer. */
#define MPIUSE_INITLOG 1000000
/* Entry for logger of MPI send and recv requests in a step. */
struct mpiuse_log_entry {
/* Rank of entry. */
int rank;
/* Type and subtype of MPI task. */
int type;
int subtype;
/* Step of action. */
int step;
/* Whether an activation, send or recv, or if handoff completed. Not the
* same as delivered, need to match across ranks to see that. */
int activation;
/* Memory of the request. */
size_t size;
/* Pointer to the request associated with the call. Needs to be
* unique and match to the successful */
union {
void *ptr;
uint8_t vptr[sizeof(uintptr_t)]; /* For rnode keys. */
};
/* Ticks at time of this action. */
ticks tic;
/* Rank of otherside of communication. */
int otherrank;
/* The tag. */
int tag;
};
/* The log of activations and handoffs. All volatile as accessed from threads
* that use the value to synchronise. */
static struct mpiuse_log_entry *volatile mpiuse_log = NULL;
static volatile size_t mpiuse_log_size = 0;
static volatile size_t mpiuse_log_count = 0;
static volatile size_t mpiuse_log_done = 0;
static volatile int mpiuse_max_rank = 0;
/**
* @brief reallocate the entries log if space is needed.
......@@ -147,16 +113,20 @@ void mpiuse_log_allocation(int rank, int step, size_t tic, int type,
;
/* Record the log. */
mpiuse_log[ind].activation = activation;
mpiuse_log[ind].data = NULL;
mpiuse_log[ind].otherrank = otherrank;
mpiuse_log[ind].rank = rank;
mpiuse_log[ind].req = MPI_REQUEST_NULL;
mpiuse_log[ind].size = size;
mpiuse_log[ind].step = step;
mpiuse_log[ind].type = type;
mpiuse_log[ind].subtype = subtype;
mpiuse_log[ind].activation = activation;
mpiuse_log[ind].size = size;
mpiuse_log[ind].ptr = NULL;
mpiuse_log[ind].otherrank = otherrank;
mpiuse_log[ind].tag = tag;
mpiuse_log[ind].tic = tic;
mpiuse_log[ind].type = type;
/* Keep number of ranks for convenience. */
if (rank > mpiuse_max_rank) mpiuse_max_rank = rank;
atomic_inc(&mpiuse_log_done);
}
......@@ -176,7 +146,6 @@ void mpiuse_log_restore(const char *filename) {
return;
}
/* Read until the end of the file is reached.*/
char line[132];
size_t stic, etic, dtic, size, sum;
......@@ -192,10 +161,38 @@ void mpiuse_log_restore(const char *filename) {
mpiuse_log_allocation(rank, step, stic, itype, isubtype, activation,
size, otherrank, tag);
}
}
}
fclose(fd);
}
/**
* @brief return the number of log entries.
*
* @result the number of log entries.
*/
int mpiuse_nr_logs(void) {
return mpiuse_log_count;
}
/**
* @brief return the number of ranks in log.
*
* @result the number of ranks we've seen.
*/
int mpiuse_nr_ranks(void) {
return mpiuse_max_rank + 1;
}
/**
* @brief get a log entry.
*
* @param ind the index of the entry required.
* @result NULL if not available.
*/
struct mpiuse_log_entry *mpiuse_get_log(int ind) {
if (ind < mpiuse_log_count && ind >= 0) return &mpiuse_log[ind];
return NULL;
}
......@@ -19,10 +19,55 @@
#ifndef SWIFT_MPIUSE_H
#define SWIFT_MPIUSE_H
/* Local includes. */
#include "atomic.h"
#include "cycle.h"
#include "error.h"
/* Entry for logger of MPI send and recv requests in a step. */
struct mpiuse_log_entry {
/* Rank of entry. */
int rank;
/* Type and subtype of MPI task. */
int type;
int subtype;
/* Step of action. */
int step;
/* Whether an activation, send or recv, or if handoff completed. Not the
* same as delivered, need to match across ranks to see that. */
int activation;
/* Ticks at time of this action. */
ticks tic;
/* Rank of otherside of communication. */
int otherrank;
/* The MPI tag. */
int tag;
/* Memory of the request. */
size_t size;
/* Pointer to the data. */
void *data;
/* The request associated with the call. */
MPI_Request req;
};
/* API. */
void mpiuse_log_allocation(int rank, int step, size_t tic, int type,
int subtype, int activation, size_t size,
int otherrank, int tag);
struct mpiuse_log_entry *mpiuse_get_log(int ind);
void mpiuse_log_restore(const char *filename);
int mpiuse_nr_logs(void);
int mpiuse_nr_ranks(void);
#endif /* SWIFT_MPIUSE_H */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment