Skip to content
Snippets Groups Projects

Draft: Multi injectors many

Open Peter W. Draper requested to merge multi-injectors-many into master
+ 83
40
@@ -38,6 +38,9 @@ static int verbose = 0;
/* Attempt to keep original injection time differences. */
static int usetics = 1;
/* The wait between injections, nanosecs. */
static long long waitns = 0;
/* Scale to apply to the size of the messages we send. */
static float messagescale = 1.0;
@@ -57,14 +60,17 @@ static const int task_type_recv = 26;
static MPI_Comm subtypeMPI_comms[task_subtype_count];
/* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue;
static int volatile ind_req = 0;
static int volatile nr_reqs = 0;
static int volatile injecting = 1;
#define NITHREADS 4
static struct mpiuse_log_entry **volatile reqs_queue[NITHREADS];
static int volatile ind_req[NITHREADS] = {0};
static int volatile nr_reqs[NITHREADS] = {0};
static int volatile injecting = NITHREADS;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0;
static int volatile ind_recv = 0;
static int volatile todo_recv = 0;
static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends = 0;
static int volatile ind_send = 0;
@@ -103,29 +109,23 @@ static int datacheck_test(size_t size, void *data) {
return 1;
}
/**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
*
* The requests are initiated in the time order of the original log and an
* attempt to start these with the same interval gap is made if usetics is
* set, otherwise we just do them as quickly as possible.
*/
static void *inject_thread(void *arg) {
static void injection_runner(int qid) {
if (verbose) message("%d: injection thread starts", *((int *)arg));
if (verbose) message("%d: injection thread starts", qid);
ticks starttics = getticks();
struct mpiuse_log_entry **reqs = reqs_queue[qid];
/* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */
ticks basetic = reqs_queue[0]->tic;
ticks basetic = reqs[0]->tic;
ticks looptics = 0;
double deadtime = 0.0;
struct timespec sleep;
sleep.tv_sec = 0;
while (ind_req < nr_reqs) {
struct mpiuse_log_entry *log = reqs_queue[ind_req];
while (ind_req[qid] < nr_reqs[qid]) {
struct mpiuse_log_entry *log = reqs[ind_req[qid]];
if (usetics) {
if (usetics && waitns == 0) {
/* Expect time between this request and the previous one. */
ticks dt = log->tic - basetic;
basetic = log->tic;
@@ -135,7 +135,8 @@ static void *inject_thread(void *arg) {
* are below that (we time the ticks this loop takes without any waiting
* and use that). Otherwise we wait a while. Note we need to convert the
* ticks of the log file into nanoseconds, that requires the original
* CPU frequency. */
* CPU frequency. Note nanosleep is not very accurate and seems to have
* a base line at 50us using tests on Durham COSMA. */
if (dt > looptics) {
/* Remember to be fair and remove the looptics, then convert to
@@ -152,6 +153,10 @@ static void *inject_thread(void *arg) {
nanosleep(&sleep, NULL);
deadtime += sleep.tv_nsec;
}
} else if (waitns != 0) {
sleep.tv_nsec = waitns;
nanosleep(&sleep, NULL);
deadtime += sleep.tv_nsec;
}
/* Initialise new log elements. */
@@ -194,7 +199,7 @@ static void *inject_thread(void *arg) {
}
if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++;
ind_req[qid]++;
/* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are
* equally timed. Note we include a nanosleep, they are slow. */
@@ -210,14 +215,25 @@ static void *inject_thread(void *arg) {
/* All done, thread exiting. */
if (verbose) {
message("%d injections completed, sends = %d, recvs = %d", ind_req,
message("%d injections completed, sends = %d, recvs = %d", ind_req[qid],
nr_sends, nr_recvs);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
if (usetics) message("deadtime %.3f ms", deadtime / 1.0e6);
if (usetics || waitns > 0) message("deadtime %.3f ms", deadtime / 1.0e6);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
atomic_dec(&injecting);
}
/**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
*
* The requests are initiated in the time order of the original log and an
* attempt to start these with the same interval gap is made if usetics is
* set, otherwise we just do them as quickly as possible.
*/
static void *inject_thread(void *arg) {
injection_runner(*(int *)arg);
return NULL;
}
@@ -304,7 +320,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
*/
static void *send_thread(void *arg) {
if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting);
if (verbose) message("%d: send thread starts", *((int *)arg));
ticks starttics = getticks();
int ncalls;
@@ -375,9 +391,9 @@ static void pick_logs(void) {
size_t nlogs = mpiuse_nr_logs();
/* Duplicate of logs. */
reqs_queue = (struct mpiuse_log_entry **)calloc(
struct mpiuse_log_entry **reqs = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_reqs = 0;
int nreqs = 0;
sends_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends = 0;
@@ -398,9 +414,8 @@ static void pick_logs(void) {
/* And keep this log. */
log->data = NULL;
reqs_queue[nr_reqs] = log;
nr_reqs++;
reqs[nreqs] = log;
nreqs++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
@@ -408,26 +423,46 @@ static void pick_logs(void) {
}
/* Sort into increasing time. */
qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(reqs, nreqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
/* Check. */
for (int k = 0; k < nr_reqs - 1; k++) {
if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic)
message("reqs_queue: %lld > %lld", reqs_queue[k]->tic,
reqs_queue[k + 1]->tic);
for (int k = 0; k < nreqs - 1; k++) {
if (reqs[k]->tic > reqs[k + 1]->tic)
message("reqs_queue: %lld > %lld", reqs[k]->tic, reqs[k + 1]->tic);
}
/* And partition into queues for injection. Use interleave pick so that
* close in time injects are on different queues. */
for (int k = 0; k < NITHREADS; k++) {
reqs_queue[k] = (struct mpiuse_log_entry **)malloc(
sizeof(struct mpiuse_log_entry *) * nlogs);
nr_reqs[k] = 0;
ind_req[k] = 0;
}
for (int k = 0; k < nreqs; k++) {
int qid = k % NITHREADS;
reqs_queue[qid][nr_reqs[qid]] = reqs[k];
nr_reqs[qid]++;
}
for (int k = 0; k < NITHREADS; k++) {
message("nr_reqs[%d] = %d", k, nr_reqs[k]);
}
free(reqs);
}
/**
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s [-vfdc:s:] SWIFT_mpiuse-log-file.dat logfile.dat\n",
fprintf(stderr, "Usage: %s [-vfdc:s:n:] SWIFT_mpiuse-log-file.dat logfile.dat\n",
argv[0]);
fprintf(stderr, " options: -v verbose, -f fast injections, "
"-d fill messages and check values on receive, "
"-s <value> use fixed message of this size (bytes), "
"-c <value> scale factor for all messages\n");
"-c <value> scale factor for all messages, "
"-n nanoseconds to wait\n");
fflush(stderr);
}
@@ -453,7 +488,7 @@ int main(int argc, char *argv[]) {
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int opt;
while ((opt = getopt(argc, argv, "vfdc:s:")) != -1) {
while ((opt = getopt(argc, argv, "vfdn:s:n:")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
@@ -464,6 +499,8 @@ int main(int argc, char *argv[]) {
case 'v':
verbose = 1;
break;
case 'n':
waitns = atoll(optarg);
case 'c':
messagescale = atof(optarg);
break;
@@ -519,9 +556,14 @@ int main(int argc, char *argv[]) {
/* Make three threads, one for injecting tasks and two to check for
* completions of the sends and recv independently. */
pthread_t injectthread;
if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
error("Failed to create injection thread.");
static int ks[NITHREADS];
pthread_t injectthread[NITHREADS];
for (int k = 0; k < NITHREADS; k++) {
ks[k] = k;
if (pthread_create(&injectthread[k], NULL, &inject_thread, &ks[k]) != 0)
error("Failed to create injection thread.");
}
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
@@ -529,8 +571,9 @@ int main(int argc, char *argv[]) {
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread, NULL);
/* Wait until all threads have exited and all MPI requests have completed.
*/
for (int k = 0; k < NITHREADS; k++) pthread_join(injectthread[k], NULL);
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
Loading