Skip to content
Snippets Groups Projects
Commit 9d877988 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Stupid implementation of partition, communicators are all wrong, runs with 1 thread

parent 768a12d9
No related branches found
No related tags found
No related merge requests found
/******************************************************************************* /*******************************************************************************
* This file is part of SWIFT. * This file is part of SWIFT.
* Copyright (c) 2019 Peter W. Draper * Copyright (c) 2020 Peter W. Draper
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published * it under the terms of the GNU Lesser General Public License as published
...@@ -29,6 +29,9 @@ ...@@ -29,6 +29,9 @@
#include "error.h" #include "error.h"
#include "mpiuse.h" #include "mpiuse.h"
/* Number of threads used to partition the requests. */
#define NTHREADS 2
/* Global: Our rank for all to see. */ /* Global: Our rank for all to see. */
int myrank = -1; int myrank = -1;
...@@ -47,21 +50,23 @@ static const int task_type_recv = 23; ...@@ -47,21 +50,23 @@ static const int task_type_recv = 23;
/* Global communicators for each of the subtypes. */ /* Global communicators for each of the subtypes. */
static const int task_subtype_count = 30; // Just some upper limit on subtype. static const int task_subtype_count = 30; // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[30]; static MPI_Comm subtypeMPI_comms[NTHREADS][30];
/* The local queues. */ /* The local queues. We need to partition these to keep the MPI communications
static struct mpiuse_log_entry **volatile reqs_queue; * separate so we can use the MPI_THREAD_SPLIT option of the Intel 2020 MPI
static int volatile ind_req = 0; * library. */
static int volatile nr_reqs = 0; static struct mpiuse_log_entry **volatile reqs_queue[NTHREADS];
static int volatile injecting = 1; static int volatile ind_req[NTHREADS] = {0};
static struct mpiuse_log_entry **volatile recvs_queue; static int volatile nr_reqs[NTHREADS] = {0};
static int volatile nr_recvs = 0; static int volatile injecting[NTHREADS] = {0};
static int volatile ind_recv = 0; static struct mpiuse_log_entry **volatile recvs_queue[NTHREADS];
static int volatile todo_recv = 0; static int volatile nr_recvs[NTHREADS] = {0};
static struct mpiuse_log_entry **volatile sends_queue; static int volatile ind_recv[NTHREADS] = {0};
static int volatile nr_sends = 0; static int volatile todo_recv[NTHREADS] = {0};
static int volatile ind_send = 0; static struct mpiuse_log_entry **volatile sends_queue[NTHREADS];
static int volatile todo_send = 0; static int volatile nr_sends[NTHREADS] = {0};
static int volatile ind_send[NTHREADS] = {0};
static int volatile todo_send[NTHREADS] = {0};
/* CPU frequency of the machine that created the MPI log. */ /* CPU frequency of the machine that created the MPI log. */
// XXX need to store this in the data file. // XXX need to store this in the data file.
...@@ -76,7 +81,7 @@ static double log_clocks_cpufreq = 2194844448.0; ...@@ -76,7 +81,7 @@ static double log_clocks_cpufreq = 2194844448.0;
static void datacheck_fill(size_t size, void *data) { static void datacheck_fill(size_t size, void *data) {
unsigned char *p = (unsigned char *)data; unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) { for (size_t i = 0; i < size; i++) {
p[i] = 170; /* 10101010 in bits. */ p[i] = 170; /* 10101010 in bits. */
} }
} }
...@@ -104,19 +109,20 @@ static int datacheck_test(size_t size, void *data) { ...@@ -104,19 +109,20 @@ static int datacheck_test(size_t size, void *data) {
* set, otherwise we just do them as quickly as possible. * set, otherwise we just do them as quickly as possible.
*/ */
static void *inject_thread(void *arg) { static void *inject_thread(void *arg) {
int tid = *((int *)arg);
if (verbose) message("%d: injection thread starts", *((int *)arg)); if (verbose) message("%d: injection thread starts", tid);
ticks starttics = getticks(); ticks starttics = getticks();
/* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */ /* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */
ticks basetic = reqs_queue[0]->tic; ticks basetic = reqs_queue[tid][0]->tic;
ticks looptics = 0; ticks looptics = 0;
double deadtime = 0.0; double deadtime = 0.0;
struct timespec sleep; struct timespec sleep;
sleep.tv_sec = 0; sleep.tv_sec = 0;
while (ind_req < nr_reqs) { while (ind_req[tid] < nr_reqs[tid]) {
struct mpiuse_log_entry *log = reqs_queue[ind_req]; struct mpiuse_log_entry *log = reqs_queue[tid][ind_req[tid]];
if (usetics) { if (usetics) {
/* Expect time between this request and the previous one. */ /* Expect time between this request and the previous one. */
...@@ -166,28 +172,28 @@ static void *inject_thread(void *arg) { ...@@ -166,28 +172,28 @@ static void *inject_thread(void *arg) {
/* And send. */ /* And send. */
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
subtypeMPI_comms[log->subtype], &log->req); subtypeMPI_comms[tid][log->subtype], &log->req);
/* Add a new send request. */ /* Add a new send request. */
int ind = atomic_inc(&nr_sends); int ind = atomic_inc(&nr_sends[tid]);
sends_queue[ind] = log; sends_queue[tid][ind] = log;
atomic_inc(&todo_send); atomic_inc(&todo_send[tid]);
} else { } else {
/* Ready to receive. */ /* Ready to receive. */
log->data = calloc(log->size, 1); log->data = calloc(log->size, 1);
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
subtypeMPI_comms[log->subtype], &log->req); subtypeMPI_comms[tid][log->subtype], &log->req);
/* Add a new recv request. */ /* Add a new recv request. */
int ind = atomic_inc(&nr_recvs); int ind = atomic_inc(&nr_recvs[tid]);
recvs_queue[ind] = log; recvs_queue[tid][ind] = log;
atomic_inc(&todo_recv); atomic_inc(&todo_recv[tid]);
} }
if (err != MPI_SUCCESS) error("Failed to activate send or recv"); if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++; ind_req[tid]++;
/* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are /* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are
* equally timed. Note we include a nanosleep, they are slow. */ * equally timed. Note we include a nanosleep, they are slow. */
...@@ -196,42 +202,45 @@ static void *inject_thread(void *arg) { ...@@ -196,42 +202,45 @@ static void *inject_thread(void *arg) {
nanosleep(&sleep, NULL); nanosleep(&sleep, NULL);
looptics = (getticks() - starttics); looptics = (getticks() - starttics);
if (verbose) if (verbose)
message("injection loop took %.3f %s.", clocks_from_ticks(looptics), message("%d: injection loop took %.3f %s.", tid,
clocks_getunit()); clocks_from_ticks(looptics), clocks_getunit());
} }
} }
/* All done, thread exiting. */ /* All done, thread exiting. */
if (verbose) { if (verbose) {
message("%d injections completed, sends = %d, recvs = %d", ind_req, message("%d: %d injections completed, sends = %d, recvs = %d", tid,
nr_sends, nr_recvs); ind_req[tid], nr_sends[tid], nr_recvs[tid]);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv); message("%d: remaining sends = %d, recvs = %d", tid, todo_send[tid],
if (usetics) message("deadtime %.3f ms", deadtime / 1.0e6); todo_recv[tid]);
if (usetics) message("%d: deadtime %.3f ms", tid, deadtime / 1.0e6);
} }
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("%d: took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics),
clocks_getunit()); clocks_getunit());
atomic_dec(&injecting); atomic_dec(&injecting[tid]);
return NULL; return NULL;
} }
/** /**
* @brief main loop to run over a queue of MPI requests and test for when they * @brief main loop to run over a queue of MPI requests and test for when they
* complete. Returns the total amount of time spent in calls to MPI_Test and * complete for a thread. Returns the total amount of time spent in calls to
* the number of times it was called. * MPI_Test and the number of times it was called.
* *
* @param logs the list of logs pointing to requests. * @param logs the list of logs pointing to requests.
* @param nr_logs pointer to the variable containing the current number of * @param nr_logs pointer to the variable containing the current number of
* logs. * logs.
* @param todos pointer to the variable containing the number of requests that * @param todos pointer to the variable containing the number of requests that
* are still active. * are still active.
* @param injecting pointer to the variable indicating if there are still some
* injections.
* @param sum the total number of ticks spent in calls to MPI_Test. * @param sum the total number of ticks spent in calls to MPI_Test.
* @param ncalls the total number of calls to MPI_Test. * @param ncalls the total number of calls to MPI_Test.
* @param mint the minimum ticks an MPI_Test call took. * @param mint the minimum ticks an MPI_Test call took.
* @param maxt the maximum ticks an MPI_Test call took. * @param maxt the maximum ticks an MPI_Test call took.
*/ */
static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
int volatile *todos, double *sum, int *ncalls, int volatile *todos, int volatile *injecting,
ticks *mint, ticks *maxt) { double *sum, int *ncalls, ticks *mint, ticks *maxt) {
/* Global MPI_Test statistics. */ /* Global MPI_Test statistics. */
int lncalls = 0; int lncalls = 0;
...@@ -241,7 +250,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, ...@@ -241,7 +250,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
/* We loop while new requests are being injected and we still have requests /* We loop while new requests are being injected and we still have requests
* to complete. */ * to complete. */
while (injecting || (!injecting && *todos > 0)) { while (*injecting || (!*injecting && *todos > 0)) {
int nlogs = *nr_logs; int nlogs = *nr_logs;
for (int k = 0; k < nlogs; k++) { for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = logs[k]; struct mpiuse_log_entry *log = logs[k];
...@@ -293,26 +302,28 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, ...@@ -293,26 +302,28 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
} }
/** /**
* @brief Send thread, checks if MPI_Isend requests have completed. * @brief A send thread, checks if MPI_Isend requests have completed.
*/ */
static void *send_thread(void *arg) { static void *send_thread(void *arg) {
int tid = *((int *)arg);
if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting); if (verbose) message("%d: send thread starts (%d)", tid, injecting[tid]);
ticks starttics = getticks(); ticks starttics = getticks();
int ncalls; int ncalls;
double sum; double sum;
ticks mint; ticks mint;
ticks maxt; ticks maxt;
queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt); queue_runner(sends_queue[tid], &nr_sends[tid], &todo_send[tid],
&injecting[tid], &sum, &ncalls, &mint, &maxt);
message( message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " "%d: %d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, "
"%.3f (%s)", "max time %.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose) if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("%d: took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics),
clocks_getunit()); clocks_getunit());
/* Thread exits. */ /* Thread exits. */
...@@ -320,26 +331,28 @@ static void *send_thread(void *arg) { ...@@ -320,26 +331,28 @@ static void *send_thread(void *arg) {
} }
/** /**
* @brief Recv thread, checks if MPI_Irecv requests have completed. * @brief A recv thread, checks if MPI_Irecv requests have completed.
*/ */
static void *recv_thread(void *arg) { static void *recv_thread(void *arg) {
int tid = *((int *)arg);
if (verbose) message("%d: recv thread starts", *((int *)arg)); if (verbose) message("%d: recv thread starts (%d)", tid, injecting[tid]);
ticks starttics = getticks(); ticks starttics = getticks();
int ncalls; int ncalls;
double sum; double sum;
ticks mint; ticks mint;
ticks maxt; ticks maxt;
queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt); queue_runner(recvs_queue[tid], &nr_recvs[tid], &todo_recv[tid],
&injecting[tid], &sum, &ncalls, &mint, &maxt);
message( message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " "%d: %d MPI_Test calls took: %.3f, mean time %.3f, "
"%.3f (%s)", "min time %.3f, max time %.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose) if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("%d: took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics),
clocks_getunit()); clocks_getunit());
/* Thread exits. */ /* Thread exits. */
...@@ -361,23 +374,30 @@ static int cmp_logs(const void *p1, const void *p2) { ...@@ -361,23 +374,30 @@ static int cmp_logs(const void *p1, const void *p2) {
/** /**
* @brief Pick out the relevant logging data for our rank, i.e. all * @brief Pick out the relevant logging data for our rank, i.e. all
* activations of sends and recvs. We ignore the original completions. * activations of sends and recvs and split into NTHREADS parts.
* The final list is sorted into increasing time of activation. * We ignore the original completions. The final lists are sorted into
* increasing time of activation.
*/ */
static void pick_logs(void) { static void pick_logs(void) {
size_t nlogs = mpiuse_nr_logs(); size_t nlogs = mpiuse_nr_logs();
/* Duplicate of logs. */ /* Duplicates of logs. */
reqs_queue = (struct mpiuse_log_entry **)calloc( int size = nlogs / NTHREADS + 1;
nlogs, sizeof(struct mpiuse_log_entry *)); for (int k = 0; k < NTHREADS; k++) {
nr_reqs = 0; reqs_queue[k] = (struct mpiuse_log_entry **)calloc(
sends_queue = (struct mpiuse_log_entry **)calloc( size, sizeof(struct mpiuse_log_entry *));
nlogs, sizeof(struct mpiuse_log_entry *)); nr_reqs[k] = 0;
nr_sends = 0; sends_queue[k] = (struct mpiuse_log_entry **)calloc(
recvs_queue = (struct mpiuse_log_entry **)calloc( size, sizeof(struct mpiuse_log_entry *));
nlogs, sizeof(struct mpiuse_log_entry *)); nr_sends[k] = 0;
nr_recvs = 0; recvs_queue[k] = (struct mpiuse_log_entry **)calloc(
size, sizeof(struct mpiuse_log_entry *));
nr_recvs[k] = 0;
}
/* Index of the thread logs to update. */
int nt = 0;
int nr_logs = 0;
for (int k = 0; k < nlogs; k++) { for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k); struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->rank == myrank && log->activation) { if (log->rank == myrank && log->activation) {
...@@ -385,23 +405,31 @@ static void pick_logs(void) { ...@@ -385,23 +405,31 @@ static void pick_logs(void) {
/* And keep this log. */ /* And keep this log. */
log->data = NULL; log->data = NULL;
reqs_queue[nr_reqs] = log; reqs_queue[nt][nr_reqs[nt]] = log;
nr_reqs++; nr_reqs[nt]++;
nt++;
if (nt == NTHREADS) nt = 0;
nr_logs++;
} else { } else {
error("task type '%d' is not a known send or recv task", log->type); error("task type '%d' is not a known send or recv task", log->type);
} }
} }
} }
message("Read %d active send and recv logs from input file", nr_logs);
/* Sort into increasing time. */ /* Sort into increasing time. */
qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs); for (int j = 0; j < NTHREADS; j++) {
qsort(reqs_queue[j], nr_reqs[j], sizeof(struct mpiuse_log_entry *),
/* Check. */ cmp_logs);
for (int k = 0; k < nr_reqs - 1; k++) {
if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic) /* Check. */
message("reqs_queue: %lld > %lld", reqs_queue[k]->tic, for (int k = 0; k < nr_reqs[j] - 1; k++) {
reqs_queue[k + 1]->tic); if (reqs_queue[j][k]->tic > reqs_queue[j][k + 1]->tic)
message("reqs_queue: %lld > %lld", reqs_queue[j][k]->tic,
reqs_queue[j][k + 1]->tic);
}
} }
} }
...@@ -472,9 +500,11 @@ int main(int argc, char *argv[]) { ...@@ -472,9 +500,11 @@ int main(int argc, char *argv[]) {
error("The number of MPI ranks %d does not match the expected value %d", error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_nodes); nranks, nr_nodes);
/* Create communicators for each subtype of the tasks. */ /* Create communicators for each subtype of the tasks and each thread. */
for (int i = 0; i < task_subtype_count; i++) { for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); for (int j = 0; j < NTHREADS; j++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[j][i]);
}
} }
/* Each rank requires its own queue, so extract them. */ /* Each rank requires its own queue, so extract them. */
...@@ -488,26 +518,35 @@ int main(int argc, char *argv[]) { ...@@ -488,26 +518,35 @@ int main(int argc, char *argv[]) {
message("=================="); message("==================");
if (verbose) { if (verbose) {
if (!usetics) message("using fast untimed injections"); if (!usetics) message("using fast untimed injections");
if (datacheck) message("checking data pattern on send and recv completion"); if (datacheck)
message("checking data pattern on send and recv completion");
} }
} }
/* Make three threads, one for injecting tasks and two to check for /* Make three batches of NTHREADS threads, one batch for injecting tasks and
* completions of the sends and recv independently. */ * two batches to check for completions of the sends and recv
pthread_t injectthread; * independently. */
if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0) pthread_t injectthread[NTHREADS];
error("Failed to create injection thread."); pthread_t sendthread[NTHREADS];
pthread_t sendthread; pthread_t recvthread[NTHREADS];
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) static int tids[NTHREADS];
error("Failed to create send thread."); for (int j = 0; j < NTHREADS; j++) {
pthread_t recvthread; tids[j] = j;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) injecting[j] = 1;
error("Failed to create recv thread."); if (pthread_create(&injectthread[j], NULL, &inject_thread, &tids[j]) != 0)
error("Failed to create injection thread.");
if (pthread_create(&sendthread[j], NULL, &send_thread, &tids[j]) != 0)
error("Failed to create send thread.");
if (pthread_create(&recvthread[j], NULL, &recv_thread, &tids[j]) != 0)
error("Failed to create recv thread.");
}
/* Wait until all threads have exited and all MPI requests have completed. */ /* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread, NULL); for (int j = 0; j < NTHREADS; j++) {
pthread_join(sendthread, NULL); pthread_join(injectthread[j], NULL);
pthread_join(recvthread, NULL); pthread_join(sendthread[j], NULL);
pthread_join(recvthread[j], NULL);
}
/* Dump the updated MPI logs. */ /* Dump the updated MPI logs. */
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment