Skip to content
Snippets Groups Projects
Commit e95cd7c1 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Increase no. of injection threads to 2

parent 3f28aa34
No related branches found
No related tags found
3 merge requests!9Draft: Multiple threads for inject, send and recv.,!3Draft: Multi injectors many,!1Increase no. of injection threads to 2
...@@ -47,14 +47,16 @@ static const int task_subtype_count = 30; // Just some upper limit on subtype. ...@@ -47,14 +47,16 @@ static const int task_subtype_count = 30; // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[30]; static MPI_Comm subtypeMPI_comms[30];
/* The local queues. */ /* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue; static struct mpiuse_log_entry **volatile reqs_queue[2];
static int volatile ind_req = 0; static int volatile ind_req[2] = {0,0};
static int volatile nr_reqs = 0; static int volatile nr_reqs[2] = {0,0};
static int volatile injecting = 1; static int volatile injecting[2] = {1,1};
static struct mpiuse_log_entry **volatile recvs_queue; static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0; static int volatile nr_recvs = 0;
static int volatile ind_recv = 0; static int volatile ind_recv = 0;
static int volatile todo_recv = 0; static int volatile todo_recv = 0;
static struct mpiuse_log_entry **volatile sends_queue; static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends = 0; static int volatile nr_sends = 0;
static int volatile ind_send = 0; static int volatile ind_send = 0;
...@@ -64,25 +66,18 @@ static int volatile todo_send = 0; ...@@ -64,25 +66,18 @@ static int volatile todo_send = 0;
// XXX need to store this in the data file. // XXX need to store this in the data file.
static double log_clocks_cpufreq = 2194844448.0; static double log_clocks_cpufreq = 2194844448.0;
/** static void injection_runner(int qid) {
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests. if (verbose) message("%d: injection thread starts", qid);
*
* The requests are initiated in the time order of the original log and an
* attempt to start these with the same interval gap is made if usetics is
* set, otherwise we just do them as quickly as possible.
*/
static void *inject_thread(void *arg) {
if (verbose) message("%d: injection thread starts", *((int *)arg));
ticks starttics = getticks(); ticks starttics = getticks();
struct mpiuse_log_entry **reqs = reqs_queue[qid];
/* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */ /* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */
ticks basetic = reqs_queue[0]->tic; ticks basetic = reqs[0]->tic;
ticks looptics = 0; ticks looptics = 0;
double deadtime = 0.0; double deadtime = 0.0;
while (ind_req < nr_reqs) { while (ind_req[qid] < nr_reqs[qid]) {
struct mpiuse_log_entry *log = reqs_queue[ind_req]; struct mpiuse_log_entry *log = reqs[ind_req[qid]];
if (usetics) { if (usetics) {
/* Expect time between this request and the previous one. */ /* Expect time between this request and the previous one. */
...@@ -101,7 +96,8 @@ static void *inject_thread(void *arg) { ...@@ -101,7 +96,8 @@ static void *inject_thread(void *arg) {
/* Remember to be fair and remove the looptics, then convert to /* Remember to be fair and remove the looptics, then convert to
* nanoseconds. */ * nanoseconds. */
double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9; //double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9;
double ns = (double)(looptics) / log_clocks_cpufreq * 1.0e9;
if (ns < 1.0e9) { if (ns < 1.0e9) {
sleep.tv_nsec = (long)ns; sleep.tv_nsec = (long)ns;
} else { } else {
...@@ -146,12 +142,12 @@ static void *inject_thread(void *arg) { ...@@ -146,12 +142,12 @@ static void *inject_thread(void *arg) {
} }
if (err != MPI_SUCCESS) error("Failed to activate send or recv"); if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++; ind_req[qid]++;
/* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are /* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are
* equally timed. */ * equally timed. */
if (looptics == 0 && usetics) { if (looptics == 0 && usetics) {
looptics = getticks() - starttics; looptics = getticks() - starttics;
if (verbose) if (verbose)
message("injection loop took %.3f %s.", clocks_from_ticks(looptics), message("injection loop took %.3f %s.", clocks_from_ticks(looptics),
clocks_getunit()); clocks_getunit());
...@@ -160,16 +156,39 @@ static void *inject_thread(void *arg) { ...@@ -160,16 +156,39 @@ static void *inject_thread(void *arg) {
/* All done, thread exiting. */ /* All done, thread exiting. */
if (verbose) { if (verbose) {
message("%d injections completed, sends = %d, recvs = %d", ind_req, message("%d injections completed, sends = %d, recvs = %d", ind_req[qid],
nr_sends, nr_recvs); nr_sends, nr_recvs);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv); message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
if (usetics) message("deadtime %.3f ms", deadtime / 1.0e6); if (usetics) message("deadtime %.3f ms", deadtime / 1.0e6);
} }
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit()); clocks_getunit());
atomic_dec(&injecting); atomic_dec(&injecting[qid]);
}
/**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
*
* The requests are initiated in the time order of the original log and an
* attempt to start these with the same interval gap is made if usetics is
* set, otherwise we just do them as quickly as possible.
*/
static void *inject_thread1(void *arg) {
injection_runner(0);
return NULL; return NULL;
} }
/**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
*
* The requests are initiated in the time order of the original log and an
* attempt to start these with the same interval gap is made if usetics is
* set, otherwise we just do them as quickly as possible.
*/
static void *inject_thread2(void *arg) {
injection_runner(1);
return NULL;
}
/** /**
* @brief main loop to run over a queue of MPI requests and test for when they * @brief main loop to run over a queue of MPI requests and test for when they
...@@ -198,7 +217,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, ...@@ -198,7 +217,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
/* We loop while new requests are being injected and we still have requests /* We loop while new requests are being injected and we still have requests
* to complete. */ * to complete. */
while (injecting || (!injecting && *todos > 0)) { while ((injecting[0] || injecting[1]) || (!injecting[0] && !injecting[1] && *todos > 0)) {
int nlogs = *nr_logs; int nlogs = *nr_logs;
for (int k = 0; k < nlogs; k++) { for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = logs[k]; struct mpiuse_log_entry *log = logs[k];
...@@ -248,7 +267,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, ...@@ -248,7 +267,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
*/ */
static void *send_thread(void *arg) { static void *send_thread(void *arg) {
if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting); if (verbose) message("%d: send thread starts", *((int *)arg));
ticks starttics = getticks(); ticks starttics = getticks();
int ncalls; int ncalls;
...@@ -319,9 +338,8 @@ static void pick_logs(void) { ...@@ -319,9 +338,8 @@ static void pick_logs(void) {
size_t nlogs = mpiuse_nr_logs(); size_t nlogs = mpiuse_nr_logs();
/* Duplicate of logs. */ /* Duplicate of logs. */
reqs_queue = (struct mpiuse_log_entry **)malloc( struct mpiuse_log_entry **reqs = (struct mpiuse_log_entry **)malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
sizeof(struct mpiuse_log_entry *) * nlogs); int nreqs = 0;
nr_reqs = 0;
sends_queue = (struct mpiuse_log_entry **)malloc( sends_queue = (struct mpiuse_log_entry **)malloc(
sizeof(struct mpiuse_log_entry *) * nlogs); sizeof(struct mpiuse_log_entry *) * nlogs);
nr_sends = 0; nr_sends = 0;
...@@ -338,8 +356,8 @@ static void pick_logs(void) { ...@@ -338,8 +356,8 @@ static void pick_logs(void) {
log->data = calloc(log->size, 1); log->data = calloc(log->size, 1);
/* And keep this log. */ /* And keep this log. */
reqs_queue[nr_reqs] = log; reqs[nreqs] = log;
nr_reqs++; nreqs++;
} else { } else {
error("task type '%d' is not a known send or recv task", log->type); error("task type '%d' is not a known send or recv task", log->type);
...@@ -348,7 +366,26 @@ static void pick_logs(void) { ...@@ -348,7 +366,26 @@ static void pick_logs(void) {
} }
/* Sort into increasing time. */ /* Sort into increasing time. */
qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs); qsort(reqs, nreqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
/* And partition into queues for injection. Use interleave pick so that
* close in time injects are on different queues. */
reqs_queue[0] = (struct mpiuse_log_entry **)malloc(
sizeof(struct mpiuse_log_entry *) * nlogs);
nr_reqs[0] = 0;
reqs_queue[1] = (struct mpiuse_log_entry **)malloc(
sizeof(struct mpiuse_log_entry *) * nlogs);
nr_reqs[1] = 0;
for (int k = 0; k < nreqs; k++) {
int qid = k % 2;
reqs_queue[qid][nr_reqs[qid]] = reqs[k];
nr_reqs[qid]++;
}
message("nr_reqs = %d, %d", nr_reqs[0], nr_reqs[1]);
free(reqs);
} }
/** /**
...@@ -432,8 +469,11 @@ int main(int argc, char *argv[]) { ...@@ -432,8 +469,11 @@ int main(int argc, char *argv[]) {
/* Make three threads, one for injecting tasks and two to check for /* Make three threads, one for injecting tasks and two to check for
* completions of the sends and recv independently. */ * completions of the sends and recv independently. */
pthread_t injectthread; pthread_t injectthread1;
if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0) if (pthread_create(&injectthread1, NULL, &inject_thread1, &myrank) != 0)
error("Failed to create injection thread.");
pthread_t injectthread2;
if (pthread_create(&injectthread2, NULL, &inject_thread2, &myrank) != 0)
error("Failed to create injection thread."); error("Failed to create injection thread.");
pthread_t sendthread; pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
...@@ -443,7 +483,8 @@ int main(int argc, char *argv[]) { ...@@ -443,7 +483,8 @@ int main(int argc, char *argv[]) {
error("Failed to create recv thread."); error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */ /* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread, NULL); pthread_join(injectthread1, NULL);
pthread_join(injectthread2, NULL);
pthread_join(sendthread, NULL); pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL); pthread_join(recvthread, NULL);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment