Skip to content
Snippets Groups Projects
Commit ec77e6f5 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Also make fake command multi-threaded

parent 172d580b
Branches
No related tags found
1 merge request!9Draft: Multiple threads for inject, send and recv.
...@@ -45,18 +45,20 @@ static long int default_seed = 1987654321; ...@@ -45,18 +45,20 @@ static long int default_seed = 1987654321;
static MPI_Comm node_comms[512]; static MPI_Comm node_comms[512];
/* The local queues. */ /* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue; /* The local queues. */
static int volatile ind_req = 0; #define NITHREADS 4
static int volatile nr_reqs = 0; static struct mpiuse_log_entry **volatile reqs_queue[NITHREADS];
static int volatile injecting = 1; static int volatile ind_req[NITHREADS] = {0};
static struct mpiuse_log_entry **volatile recvs_queue; static int volatile nr_reqs[NITHREADS] = {0};
static int volatile nr_recvs = 0; static int volatile injecting = NITHREADS;
static int volatile ind_recv = 0;
static int volatile todo_recv = 0; static struct mpiuse_log_entry **volatile recvs_queue[NITHREADS];
static struct mpiuse_log_entry **volatile sends_queue; static int volatile nr_recvs[NITHREADS] = {0};
static int volatile nr_sends = 0; static int volatile todo_recv[NITHREADS] = {0};
static int volatile ind_send = 0;
static int volatile todo_send = 0; static struct mpiuse_log_entry **volatile sends_queue[NITHREADS];
static int volatile nr_sends[NITHREADS] = {0};
static int volatile todo_send[NITHREADS] = {0};
/** /**
* @brief fill a data area with a pattern that can be checked for changes. * @brief fill a data area with a pattern that can be checked for changes.
...@@ -88,22 +90,26 @@ static int datacheck_test(size_t size, void *data) { ...@@ -88,22 +90,26 @@ static int datacheck_test(size_t size, void *data) {
} }
/** /**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests. * @brief Runner for a injection thread, initiates MPI_Isend and MPI_Irecv
* requests.
*/ */
static void *inject_thread(void *arg) { static void injection_runner(int tid) {
if (verbose) message("%d: injection thread starts", *((int *)arg)); if (verbose) message("%d: injection thread starts", tid);
ticks starttics = getticks(); ticks starttics = getticks();
struct mpiuse_log_entry **reqs = reqs_queue[tid];
struct mpiuse_log_entry **sends = sends_queue[tid];
struct mpiuse_log_entry **recvs = recvs_queue[tid];
while (ind_req < nr_reqs) { while (ind_req[tid] < nr_reqs[tid]) {
struct mpiuse_log_entry *log = reqs_queue[ind_req]; struct mpiuse_log_entry *log = reqs[ind_req[tid]];
/* Initialise new log elements. */ /* Initialise new log elements. */
log->done = 0; log->done = 0;
log->nr_tests = 0; log->nr_tests = 0;
log->tsum = 0.0; log->tsum = 0.0;
log->tmax = 0; log->tmax = 0;
log->tmin = INT_MAX; log->tmin = LONG_MAX;
log->endtic = 0; log->endtic = 0;
log->injtic = getticks(); log->injtic = getticks();
...@@ -120,9 +126,9 @@ static void *inject_thread(void *arg) { ...@@ -120,9 +126,9 @@ static void *inject_thread(void *arg) {
node_comms[log->rank], &log->req); node_comms[log->rank], &log->req);
/* Add a new send request. */ /* Add a new send request. */
int ind = atomic_inc(&nr_sends); int ind = atomic_inc(&nr_sends[tid]);
sends_queue[ind] = log; sends[ind] = log;
atomic_inc(&todo_send); atomic_inc(&todo_send[tid]);
} else { } else {
...@@ -132,24 +138,35 @@ static void *inject_thread(void *arg) { ...@@ -132,24 +138,35 @@ static void *inject_thread(void *arg) {
log->tag, node_comms[log->otherrank], &log->req); log->tag, node_comms[log->otherrank], &log->req);
/* Add a new recv request. */ /* Add a new recv request. */
int ind = atomic_inc(&nr_recvs); int ind = atomic_inc(&nr_recvs[tid]);
recvs_queue[ind] = log; recvs[ind] = log;
atomic_inc(&todo_recv); atomic_inc(&todo_recv[tid]);
} }
if (err != MPI_SUCCESS) error("Failed to activate send or recv"); if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++; ind_req[tid]++;
} }
/* All done, thread exiting. */ /* All done, thread exiting. */
if (verbose) { if (verbose) {
message("%d injections completed, sends = %d, recvs = %d", ind_req, message("%d injections completed, sends = %d, recvs = %d", ind_req[tid],
nr_sends, nr_recvs); nr_sends[tid], nr_recvs[tid]);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv); message("remaining sends = %d, recvs = %d", todo_send[tid], todo_recv[tid]);
} }
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("%d took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics),
clocks_getunit()); clocks_getunit());
atomic_dec(&injecting); atomic_dec(&injecting);
}
/**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
*
* The requests are initiated in the time order of the original log and an
* attempt to start these with the same interval gap is made if usetics is
* set, otherwise we just do them as quickly as possible.
*/
static void *inject_thread(void *arg) {
injection_runner(*(int *)arg);
return NULL; return NULL;
} }
...@@ -175,7 +192,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, ...@@ -175,7 +192,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
/* Global MPI_Test statistics. */ /* Global MPI_Test statistics. */
int lncalls = 0; int lncalls = 0;
double lsum = 0.0; double lsum = 0.0;
ticks lmint = INT_MAX; ticks lmint = LONG_MAX;
ticks lmaxt = 0; ticks lmaxt = 0;
/* We loop while new requests are being injected and we still have requests /* We loop while new requests are being injected and we still have requests
...@@ -236,22 +253,24 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, ...@@ -236,22 +253,24 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
*/ */
static void *send_thread(void *arg) { static void *send_thread(void *arg) {
if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting); int tid = *((int *)arg);
if (verbose) message("%d: send thread starts", tid);
ticks starttics = getticks(); ticks starttics = getticks();
int ncalls; int ncalls;
double sum; double sum;
ticks mint; ticks mint;
ticks maxt; ticks maxt;
queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt); queue_runner(sends_queue[tid], &nr_sends[tid], &todo_send[tid], &sum, &ncalls,
&mint, &maxt);
message( message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " "%d %d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time "
"%.3f (%s)", "%.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose) if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("%d took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics),
clocks_getunit()); clocks_getunit());
/* Thread exits. */ /* Thread exits. */
...@@ -263,22 +282,24 @@ static void *send_thread(void *arg) { ...@@ -263,22 +282,24 @@ static void *send_thread(void *arg) {
*/ */
static void *recv_thread(void *arg) { static void *recv_thread(void *arg) {
if (verbose) message("%d: recv thread starts", *((int *)arg)); int tid = *((int *)arg);
if (verbose) message("%d: recv thread starts", tid);
ticks starttics = getticks(); ticks starttics = getticks();
int ncalls; int ncalls;
double sum; double sum;
ticks mint; ticks mint;
ticks maxt; ticks maxt;
queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt); queue_runner(recvs_queue[tid], &nr_recvs[tid], &todo_recv[tid], &sum, &ncalls,
&mint, &maxt);
message( message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " "%d %d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time "
"%.3f (%s)", "%.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose) if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("%d took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics),
clocks_getunit()); clocks_getunit());
/* Thread exits. */ /* Thread exits. */
...@@ -312,40 +333,33 @@ static void pick_logs(int random) { ...@@ -312,40 +333,33 @@ static void pick_logs(int random) {
size_t nlogs = mpiuse_nr_logs(); size_t nlogs = mpiuse_nr_logs();
/* Duplicate of logs. */ /* Duplicate of logs. */
reqs_queue = (struct mpiuse_log_entry **)calloc( struct mpiuse_log_entry **reqs = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *)); nlogs, sizeof(struct mpiuse_log_entry *));
nr_reqs = 0; int nreqs = 0;
sends_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends = 0;
recvs_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recvs = 0;
if (random == 0 || random == 2) { if (random == 0 || random == 2) {
for (int k = 0; k < nlogs; k++) { for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k); struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation) { if (log->rank == myrank && log->activation) {
log->data = NULL; log->data = NULL;
reqs_queue[nr_reqs] = log; reqs[nreqs] = log;
nr_reqs++; nreqs++;
} }
} }
if (random == 0) { if (random == 0) {
/* Sort into increasing time. */ /* Sort into increasing time. */
qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs); qsort(reqs, nreqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
} else { } else {
/* Randomize the order, so ranks do not all work in sequence. */ /* Randomize the order, so ranks do not all work in sequence. */
mpiuse_shuffle_logs(reqs_queue, nr_reqs); mpiuse_shuffle_logs(reqs, nreqs);
} }
/* Check. */ /* Check. */
if (random == 0) { if (random == 0) {
for (int k = 0; k < nr_reqs - 1; k++) { for (int k = 0; k < nreqs - 1; k++) {
if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic) if (reqs[k]->tic > reqs[k + 1]->tic)
message("reqs_queue: %lld > %lld", reqs_queue[k]->tic, message("reqs_queue: %lld > %lld", reqs[k]->tic, reqs[k + 1]->tic);
reqs_queue[k + 1]->tic);
} }
} }
} else { } else {
...@@ -359,14 +373,14 @@ static void pick_logs(int random) { ...@@ -359,14 +373,14 @@ static void pick_logs(int random) {
if (log->rank == myrank && log->activation == 1 && if (log->rank == myrank && log->activation == 1 &&
log->type == RECV_TYPE) { log->type == RECV_TYPE) {
log->data = NULL; log->data = NULL;
reqs_queue[nr_reqs] = log; reqs[nreqs] = log;
nr_reqs++; nreqs++;
nrecv++; nrecv++;
} }
} }
/* These are sorted into log time order. */ /* These are sorted into log time order. */
qsort(reqs_queue, nrecv, sizeof(struct mpiuse_log_entry *), cmp_logs); qsort(reqs, nrecv, sizeof(struct mpiuse_log_entry *), cmp_logs);
/* Now the sends. */ /* Now the sends. */
int nsend = 0; int nsend = 0;
...@@ -375,15 +389,46 @@ static void pick_logs(int random) { ...@@ -375,15 +389,46 @@ static void pick_logs(int random) {
if (log->rank == myrank && log->activation == 1 && if (log->rank == myrank && log->activation == 1 &&
log->type == SEND_TYPE) { log->type == SEND_TYPE) {
log->data = NULL; log->data = NULL;
reqs_queue[nr_reqs] = log; reqs[nreqs] = log;
nr_reqs++; nreqs++;
nsend++; nsend++;
} }
} }
/* These are randomized. */ /* These are randomized. */
mpiuse_shuffle_logs(&reqs_queue[nrecv], nsend); mpiuse_shuffle_logs(&reqs[nrecv], nsend);
} }
/* And partition into queues for injection. Use interleave pick so that
* close in time injects are on different queues. */
for (int k = 0; k < NITHREADS; k++) {
reqs_queue[k] = (struct mpiuse_log_entry **)malloc(
sizeof(struct mpiuse_log_entry *) * nlogs);
nr_reqs[k] = 0;
sends_queue[k] = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends[k] = 0;
recvs_queue[k] = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recvs[k] = 0;
ind_req[k] = 0;
}
for (int k = 0; k < nreqs; k++) {
int tid = k % NITHREADS;
reqs_queue[tid][nr_reqs[tid]] = reqs[k];
nr_reqs[tid]++;
}
if (verbose) {
for (int k = 0; k < NITHREADS; k++) {
message("nr_reqs[%d] = %d", k, nr_reqs[k]);
}
}
free(reqs);
} }
/** /**
...@@ -539,28 +584,33 @@ int main(int argc, char *argv[]) { ...@@ -539,28 +584,33 @@ int main(int argc, char *argv[]) {
if (myrank == 0) { if (myrank == 0) {
message("Start of MPI tests"); message("Start of MPI tests");
message("=================="); message("==================");
if (verbose) { message("Using %d threads each for injection, send and receive", NITHREADS);
if (datacheck) if (datacheck) message("Checking data pattern on send and recv completion");
message("checking data pattern on send and recv completion");
}
} }
/* Make three threads, one for injecting tasks and two to check for /* Make three types of thread, one for injecting tasks and two to check for
* completions of the sends and recv independently. */ * completions of the sends and recv independently. */
pthread_t injectthread; static int ks[NITHREADS];
if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0) pthread_t injectthreads[NITHREADS];
error("Failed to create injection thread."); pthread_t sendthreads[NITHREADS];
pthread_t sendthread; pthread_t recvthreads[NITHREADS];
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) for (int k = 0; k < NITHREADS; k++) {
error("Failed to create send thread."); ks[k] = k;
pthread_t recvthread; if (pthread_create(&injectthreads[k], NULL, &inject_thread, &ks[k]) != 0)
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) error("Failed to create injection thread.");
error("Failed to create recv thread."); if (pthread_create(&sendthreads[k], NULL, &send_thread, &ks[k]) != 0)
error("Failed to create send thread.");
/* Wait until all threads have exited and all MPI requests have completed. */ if (pthread_create(&recvthreads[k], NULL, &recv_thread, &ks[k]) != 0)
pthread_join(injectthread, NULL); error("Failed to create recv thread.");
pthread_join(sendthread, NULL); }
pthread_join(recvthread, NULL);
/* Wait until all threads have exited and all MPI requests have completed.
*/
for (int k = 0; k < NITHREADS; k++) {
pthread_join(injectthreads[k], NULL);
pthread_join(sendthreads[k], NULL);
pthread_join(recvthreads[k], NULL);
}
/* Dump the updated MPI logs. */ /* Dump the updated MPI logs. */
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment