diff --git a/Makefile b/Makefile index bfaf303b3db56fd2119b3b42c65f4d464adcfa1d..de4baeb7d220b451dd2e147edfcd5e897a2d8502 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -mpistalls: mpistalls.c mpiuse.c atomic.h cycle.h +mpistalls: mpistalls.c mpiuse.c mpiuse.h atomic.h cycle.h $(CC) -g -O0 -o mpistalls mpistalls.c mpiuse.c -I/usr/include/mpi -lmpi -lpthread clean: diff --git a/error.h b/error.h index e1433b663aa6e6c0d08dcf3d10c600b08576e611..05bc9fa80b6bc74acf571b6054fc84f0013091e3 100644 --- a/error.h +++ b/error.h @@ -1,16 +1,18 @@ #include <mpi.h> +extern int myrank; + /* Exit in error macro. */ -#define error(s, ...) \ +#define error(s, ...) \ ({ \ fflush(stdout); \ - fprintf(stderr, "%s:%s():%i: " s "\n", __FILE__, __FUNCTION__, \ + fprintf(stderr, "%d/ %s:%s():%i: " s "\n", myrank, __FILE__, __FUNCTION__, \ __LINE__, ##__VA_ARGS__); \ MPI_Abort(MPI_COMM_WORLD, -1); \ }) /* Print a message */ -#define message(s, ...) \ +#define message(s, ...) \ ({ \ - printf("%s: " s "\n", __FUNCTION__, ##__VA_ARGS__); \ + printf("%d/ %s: " s "\n", myrank, __FUNCTION__, ##__VA_ARGS__); \ }) diff --git a/mpistalls.c b/mpistalls.c index 95312a8d8069981e831d71e1e5266e5727c7c28f..75eec48a61b23fbd1e2de1c342c70282806db294 100644 --- a/mpistalls.c +++ b/mpistalls.c @@ -9,42 +9,200 @@ #include <stdio.h> #include <mpi.h> #include <pthread.h> +#include <stdlib.h> +#include "atomic.h" #include "error.h" #include "mpiuse.h" +/* Integer types of send and recv tasks, must match log. */ +static int task_type_send = 22; +static int task_type_recv = 23; + +/* Our rank for all to see. */ +int myrank = -1; + +/* The local queues. */ +static struct mpiuse_log_entry **volatile reqs_queue; +static int volatile ind_req = 0; +static int volatile nr_reqs = 0; +static int volatile injecting = 1; +static struct mpiuse_log_entry **volatile recvs_queue; +static int volatile nr_recvs = 0; +static int volatile ind_recv = 0; +static int volatile todo_recv = 0; +static struct mpiuse_log_entry **volatile sends_queue; +static int volatile nr_sends = 0; +static int volatile ind_send = 0; +static int volatile todo_send = 0; /* Injection thread, initiates MPI_Isend and MPI_Irecv requests at various * times. */ -void *inject_thread(void *arg) { +static void *inject_thread(void *arg) { message("%d: injection thread starts", *((int *)arg)); + //atomic_inc(&injecting); - while (1) { - //inject_one(); + while (ind_req < nr_reqs) { + struct mpiuse_log_entry *log = reqs_queue[ind_req]; + + // Differences here, MPI_COMM_WORLD, which will break as tags will not + // be unique, MPI_BYTE might overflow, should use MPI_Type(?). + + // Need to use the tic information to time our injections. + + int err = 0; + if (log->type == task_type_send) { + err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, + log->tag, MPI_COMM_WORLD, &log->req); + + /* Add a new send request. */ + int ind = atomic_inc(&nr_sends); + sends_queue[ind] = log; + atomic_inc(&todo_send); + + } else { + err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->rank, + log->tag, MPI_COMM_WORLD, &log->req); + + /* Add a new recv request. */ + int ind = atomic_inc(&nr_recvs); + recvs_queue[ind] = log; + atomic_inc(&todo_recv); + } + if (err != MPI_SUCCESS) error("Failed to activate send or recv"); + + ind_req++; } + message("%d injections completed, sends = %d, recvs = %d", ind_req, + nr_sends, nr_recvs); + message("remaining sends = %d, recvs = %d", todo_send, todo_recv); + atomic_dec(&injecting); + return NULL; } /* Send thread, checks if MPI_Isend requests have completed. */ -void *send_thread(void *arg) { - message("%d: send thread starts", *((int *)arg)); - - while (1) { - //send_test(); +static void *send_thread(void *arg) { + message("%d: send thread starts (%d)", *((int *)arg), injecting); + + int res; + MPI_Status stat; + + // Need a test that only exits when requests are all inserted and we have + // emptied our queue. */ + size_t attempts = 0; + while (injecting || (!injecting && todo_send > 0)) { + int nsends = nr_sends; + for (int k = 0; k < nsends; k++) { + struct mpiuse_log_entry *log = sends_queue[k]; + if (log != NULL) { + attempts++; + int err = MPI_Test(&log->req, &res, &stat); + if (err != MPI_SUCCESS) { + error("MPI_Test call failed"); + } + if (res) { + /* Done, clean up. */ + message("MPI_Test successful"); + free(log->data); + sends_queue[k] = NULL; + atomic_dec(&todo_send); + } + } + } } + message("sends completed, required %zd attempts (left: %d)", attempts, + todo_send); + return NULL; } /* Recv thread, checks if MPI_Irecv requests have completed. */ -void *recv_thread(void *arg) { +static void *recv_thread(void *arg) { message("%d: recv thread starts", *((int *)arg)); - while (1) { - //recv_test(); + int res; + MPI_Status stat; + + size_t attempts = 0; + while (injecting || (!injecting && todo_recv > 0)) { + int nrecvs = nr_recvs; + for (int k = 0; k < nrecvs; k++) { + struct mpiuse_log_entry *log = recvs_queue[k]; + if (log != NULL) { + attempts++; + int err = MPI_Test(&log->req, &res, &stat); + if (err != MPI_SUCCESS) { + error("MPI_Test call failed"); + } + if (res) { + /* Done, clean up. */ + message("MPI_Test successful"); + free(log->data); + recvs_queue[k] = NULL; + atomic_dec(&todo_recv); + } + } + } } + + message("recvs completed, required %zd attempts (left: %d)", attempts, + todo_recv); + return NULL; } +/* Comparison function for logged times. */ +static int cmp_logs(const void *p1, const void *p2) { + struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1; + struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2; + return l1->tic - l2->tic; +} + +/* Pick out the relevant logging data for our rank, i.e. all activations of + * sends and recvs. */ +static void pick_logs() { + size_t nlogs = mpiuse_nr_logs(); + int nranks = mpiuse_nr_ranks(); + + /* Duplicate of logs. XXX could loop twice to reduce memory use if needed. */ + reqs_queue = (struct mpiuse_log_entry **) + malloc(sizeof(struct mpiuse_log_entry *) * nlogs); + nr_reqs= 0; + sends_queue = (struct mpiuse_log_entry **) + malloc(sizeof(struct mpiuse_log_entry *) * nlogs); + nr_sends= 0; + recvs_queue = (struct mpiuse_log_entry **) + malloc(sizeof(struct mpiuse_log_entry *) * nlogs); + nr_recvs = 0; + + for (int k = 0; k < nlogs; k++) { + struct mpiuse_log_entry *log = mpiuse_get_log(k); + if (log->rank == myrank && log->activation) { + if (log->type == task_type_send || log->type == task_type_recv) { + + /* Allocate space for data. */ + log->data = calloc(log->size, 1); + + /* And keep this log. */ + reqs_queue[nr_reqs] = log; + nr_reqs++; + + } else { + error("task type '%d' is not a known send or recv task", log->type); + } + } + } + + /* Sort into increasing time. */ + qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs); +} int main(int argc, char *argv[]) { + /* First we read the SWIFT MPI logger output that defines the communcations + * we will undertake and the time differences between injections into the + * queues. */ + mpiuse_log_restore("testdata/mpiuse_report-step2.dat"); + int nranks = mpiuse_nr_ranks(); + /* Initiate MPI. */ int prov = 0; int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov); @@ -56,12 +214,19 @@ int main(int argc, char *argv[]) { if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res); - static int myrank = 0; + /* This should match the expected size. */ + if (nr_nodes != nranks) + error("The number of MPI ranks %d does not match the expected value %d", + nranks, nr_nodes); + res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank); if (res != MPI_SUCCESS) error("Call to MPI_Comm_rank failed with error %i.", res); - message("Hello from rank: %d", myrank); + message("Starts"); + + /* Each rank requires its own queue, so extract them. */ + pick_logs(myrank); /* Make three threads, one for injecting tasks and two to check for * completions of the sends and recv independently. */ @@ -75,11 +240,6 @@ int main(int argc, char *argv[]) { if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) error("Failed to create recv thread."); - - /* Read the MPI logger output that defines the communcations we will - * undertake and the time differences between injections into the queues. */ - mpiuse_log_restore("testdata/mpiuse_report-step2.dat"); - /* Wait until all threads have exited and all MPI requests have completed. */ pthread_join(injectthread, NULL); pthread_join(sendthread, NULL); diff --git a/mpiuse.c b/mpiuse.c index eb6aef116f53c8ebbc75208a60991196675222b3..3f614bf3da6f942ff2490db23fc8db8028b9b4b5 100644 --- a/mpiuse.c +++ b/mpiuse.c @@ -34,52 +34,18 @@ #include "cycle.h" #include "error.h" +/* XXX threading support not needed, should remove. */ + /* The initial size and increment of the log entries buffer. */ #define MPIUSE_INITLOG 1000000 -/* Entry for logger of MPI send and recv requests in a step. */ -struct mpiuse_log_entry { - - /* Rank of entry. */ - int rank; - - /* Type and subtype of MPI task. */ - int type; - int subtype; - - /* Step of action. */ - int step; - - /* Whether an activation, send or recv, or if handoff completed. Not the - * same as delivered, need to match across ranks to see that. */ - int activation; - - /* Memory of the request. */ - size_t size; - - /* Pointer to the request associated with the call. Needs to be - * unique and match to the successful */ - union { - void *ptr; - uint8_t vptr[sizeof(uintptr_t)]; /* For rnode keys. */ - }; - - /* Ticks at time of this action. */ - ticks tic; - - /* Rank of otherside of communication. */ - int otherrank; - - /* The tag. */ - int tag; -}; - /* The log of activations and handoffs. All volatile as accessed from threads * that use the value to synchronise. */ static struct mpiuse_log_entry *volatile mpiuse_log = NULL; static volatile size_t mpiuse_log_size = 0; static volatile size_t mpiuse_log_count = 0; static volatile size_t mpiuse_log_done = 0; +static volatile int mpiuse_max_rank = 0; /** * @brief reallocate the entries log if space is needed. @@ -147,16 +113,20 @@ void mpiuse_log_allocation(int rank, int step, size_t tic, int type, ; /* Record the log. */ + mpiuse_log[ind].activation = activation; + mpiuse_log[ind].data = NULL; + mpiuse_log[ind].otherrank = otherrank; mpiuse_log[ind].rank = rank; + mpiuse_log[ind].req = MPI_REQUEST_NULL; + mpiuse_log[ind].size = size; mpiuse_log[ind].step = step; - mpiuse_log[ind].type = type; mpiuse_log[ind].subtype = subtype; - mpiuse_log[ind].activation = activation; - mpiuse_log[ind].size = size; - mpiuse_log[ind].ptr = NULL; - mpiuse_log[ind].otherrank = otherrank; mpiuse_log[ind].tag = tag; mpiuse_log[ind].tic = tic; + mpiuse_log[ind].type = type; + + /* Keep number of ranks for convenience. */ + if (rank > mpiuse_max_rank) mpiuse_max_rank = rank; atomic_inc(&mpiuse_log_done); } @@ -176,7 +146,6 @@ void mpiuse_log_restore(const char *filename) { return; } - /* Read until the end of the file is reached.*/ char line[132]; size_t stic, etic, dtic, size, sum; @@ -192,10 +161,38 @@ void mpiuse_log_restore(const char *filename) { mpiuse_log_allocation(rank, step, stic, itype, isubtype, activation, size, otherrank, tag); - } } } fclose(fd); } +/** + * @brief return the number of log entries. + * + * @result the number of log entries. + */ +int mpiuse_nr_logs(void) { + return mpiuse_log_count; +} + +/** + * @brief return the number of ranks in log. + * + * @result the number of ranks we've seen. + */ +int mpiuse_nr_ranks(void) { + return mpiuse_max_rank + 1; +} + +/** + * @brief get a log entry. + * + * @param ind the index of the entry required. + * @result NULL if not available. + */ +struct mpiuse_log_entry *mpiuse_get_log(int ind) { + + if (ind < mpiuse_log_count && ind >= 0) return &mpiuse_log[ind]; + return NULL; +} diff --git a/mpiuse.h b/mpiuse.h index cd5fff043a7f5c513ebafb08292d5a92a050bbd9..0bd8df6af377ae6cabfa5abe6eb021b699715d01 100644 --- a/mpiuse.h +++ b/mpiuse.h @@ -19,10 +19,55 @@ #ifndef SWIFT_MPIUSE_H #define SWIFT_MPIUSE_H +/* Local includes. */ +#include "atomic.h" +#include "cycle.h" +#include "error.h" + +/* Entry for logger of MPI send and recv requests in a step. */ +struct mpiuse_log_entry { + + /* Rank of entry. */ + int rank; + + /* Type and subtype of MPI task. */ + int type; + int subtype; + + /* Step of action. */ + int step; + + /* Whether an activation, send or recv, or if handoff completed. Not the + * same as delivered, need to match across ranks to see that. */ + int activation; + + /* Ticks at time of this action. */ + ticks tic; + + /* Rank of otherside of communication. */ + int otherrank; + + /* The MPI tag. */ + int tag; + + /* Memory of the request. */ + size_t size; + + /* Pointer to the data. */ + void *data; + + /* The request associated with the call. */ + MPI_Request req; + +}; + /* API. */ void mpiuse_log_allocation(int rank, int step, size_t tic, int type, int subtype, int activation, size_t size, int otherrank, int tag); - +struct mpiuse_log_entry *mpiuse_get_log(int ind); void mpiuse_log_restore(const char *filename); +int mpiuse_nr_logs(void); +int mpiuse_nr_ranks(void); + #endif /* SWIFT_MPIUSE_H */