Skip to content
Snippets Groups Projects
Commit 589dbc34 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Crashes, but could have bones

parent d1bf5081
No related branches found
No related tags found
2 merge requests!11Draft: Fast one-sided MPI version,!8Draft: RDMA version with wrapped infinity calls
...@@ -82,6 +82,9 @@ struct mpiuse_log_entry { ...@@ -82,6 +82,9 @@ struct mpiuse_log_entry {
/* Minimum time in a test. */ /* Minimum time in a test. */
ticks tmin; ticks tmin;
/* Insertion index. */
size_t offset;
}; };
/* API. */ /* API. */
......
...@@ -30,15 +30,18 @@ ...@@ -30,15 +30,18 @@
#include "error.h" #include "error.h"
#include "mpiuse.h" #include "mpiuse.h"
/* Global: Our rank for all to see. */ /* Our rank for all to see. */
int myrank = -1; int myrank = -1;
/* Number of ranks. */
static int nr_ranks;
#define READY -2
#define DONE -10
/* Are we verbose. */ /* Are we verbose. */
static int verbose = 0; static int verbose = 0;
/* Attempt to keep original injection time differences. */
static int usetics = 1;
/* Set a data pattern and check we get this back, slow... */ /* Set a data pattern and check we get this back, slow... */
static int datacheck = 0; static int datacheck = 0;
...@@ -51,8 +54,9 @@ static const int task_type_recv = 23; ...@@ -51,8 +54,9 @@ static const int task_type_recv = 23;
static MPI_Comm subtypeMPI_comms[task_subtype_count]; static MPI_Comm subtypeMPI_comms[task_subtype_count];
/* And the windows for one-sided communications. */ /* And the windows for one-sided communications. */
static MPI_Win mpi_window[task_subtype_count]; static MPI_Win mpi_recv_window[task_subtype_count];
static void *mpi_ptr[task_subtype_count]; static MPI_Win mpi_send_window[task_subtype_count];
static char *mpi_ptr[task_subtype_count];
static size_t *mpi_offsets; static size_t *mpi_offsets;
// The plan, each rank has a window per communicator, this is to receive // The plan, each rank has a window per communicator, this is to receive
...@@ -74,25 +78,17 @@ static size_t *mpi_offsets; ...@@ -74,25 +78,17 @@ static size_t *mpi_offsets;
// need to associate a buffer with it, just sending the memory from each // need to associate a buffer with it, just sending the memory from each
// message with the appropriate offset. // message with the appropriate offset.
/* The local queues, these are rank specific. */ /* The local receive queue. */
static struct mpiuse_log_entry **volatile reqs_queue;
static int volatile ind_req = 0;
static int volatile nr_reqs = 0;
static int volatile injecting = 1;
static struct mpiuse_log_entry **volatile recvs_queue; static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0; static int volatile nr_recvs = 0;
static int volatile ind_recv = 0; static int volatile ind_recv = 0;
static int volatile todo_recv = 0; static int volatile todo_recv = 0;
static struct mpiuse_log_entry **volatile sends_queue; /* Requests from the messages log that we need to process on this rank. */
static int volatile nr_sends = 0; static struct mpiuse_log_entry **volatile reqs_queue;
static int volatile ind_send = 0; static int volatile ind_req = 0;
static int volatile todo_send = 0; static int volatile nr_reqs = 0;
static int volatile injecting = 1;
/* CPU frequency of the machine that created the MPI log. */
// XXX need to store this in the data file.
static double log_clocks_cpufreq = 2194844448.0;
/** /**
* @brief fill a data area with a pattern that can be checked for changes. * @brief fill a data area with a pattern that can be checked for changes.
...@@ -124,56 +120,17 @@ static int datacheck_test(size_t size, void *data) { ...@@ -124,56 +120,17 @@ static int datacheck_test(size_t size, void *data) {
} }
/** /**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests. * @brief Injection thread, sends messages to other ranks.
*
* The requests are initiated in the time order of the original log and an
* attempt to start these with the same interval gap is made if usetics is
* set, otherwise we just do them as quickly as possible.
*/ */
static void *inject_thread(void *arg) { static void *inject_thread(void *arg) {
if (verbose) message("%d: injection thread starts", *((int *)arg)); message("%d: injection thread starts", *((int *)arg));
ticks starttics = getticks(); ticks starttics = getticks();
/* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */ int offset = 0;
ticks basetic = reqs_queue[0]->tic;
ticks looptics = 0;
double deadtime = 0.0;
struct timespec sleep;
sleep.tv_sec = 0;
while (ind_req < nr_reqs) { while (ind_req < nr_reqs) {
struct mpiuse_log_entry *log = reqs_queue[ind_req]; struct mpiuse_log_entry *log = reqs_queue[ind_req];
if (usetics) {
/* Expect time between this request and the previous one. */
ticks dt = log->tic - basetic;
basetic = log->tic;
/* We guess some time below which we should not attempt to wait,
* otherwise we'll start to overrun, and just inject the next call if we
* are below that (we time the ticks this loop takes without any waiting
* and use that). Otherwise we wait a while. Note we need to convert the
* ticks of the log file into nanoseconds, that requires the original
* CPU frequency. */
if (dt > looptics) {
/* Remember to be fair and remove the looptics, then convert to
* nanoseconds. */
double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9;
if (ns < 1.0e9) {
sleep.tv_nsec = (long)ns;
} else {
/* Wait more than one second. Must be an error, but complain and
* continue. */
sleep.tv_nsec = (long)1.0e9;
message("wait greater than one second");
}
nanosleep(&sleep, NULL);
deadtime += sleep.tv_nsec;
}
}
/* Initialise new log elements. */ /* Initialise new log elements. */
log->done = 0; log->done = 0;
log->nr_tests = 0; log->nr_tests = 0;
...@@ -186,102 +143,96 @@ static void *inject_thread(void *arg) { ...@@ -186,102 +143,96 @@ static void *inject_thread(void *arg) {
/* Differences to SWIFT: MPI_BYTE not the MPI_Type. */ /* Differences to SWIFT: MPI_BYTE not the MPI_Type. */
int err = 0; int err = 0;
if (log->type == task_type_send) { if (log->type == task_type_send) {
log->data = calloc(log->size, 1); log->data = calloc(log->size + sizeof(int), 1);
/* Fill data with pattern. */ /* Fill data with pattern. */
if (datacheck) datacheck_fill(log->size, log->data); if (datacheck) datacheck_fill(log->size, log->data);
/* And send. */ /* Last char is marked as READY (to receive) */
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, ((char *)log->data)[log->size] = READY;
subtypeMPI_comms[log->subtype], &log->req);
/* XXX And send data to other rank at the appropriate offset. */
/* Add a new send request. */ MPI_Win_lock(MPI_LOCK_EXCLUSIVE, log->otherrank, 0,
int ind = atomic_inc(&nr_sends); mpi_send_window[log->subtype]);
sends_queue[ind] = log; size_t index = (log->otherrank * nr_ranks * nr_ranks) +
atomic_inc(&todo_send); (log->rank * nr_ranks) + ind_req;
MPI_Raccumulate(log->data, log->size, MPI_BYTE, log->otherrank,
mpi_offsets[index], log->size, MPI_BYTE, MPI_REPLACE,
mpi_send_window[log->subtype], &log->req);
/* Wait for local completion? */
int flag = 0;
while (flag == 0) MPI_Test(&log->req, &flag, MPI_STATUS_IGNORE);
/* Now we change the last element to DONE so that the remote end can
* find out that the data has arrived. */
char newval[1];
char oldval[1];
newval[0] = DONE;
oldval[0] = DONE;
MPI_Compare_and_swap(&newval[0], &((char *)log->data)[log->size],
&oldval[0], MPI_BYTE, 0, log->size,
mpi_send_window[log->subtype]);
/* Proceed locally. */
MPI_Win_flush_local(0, mpi_send_window[log->subtype]);
MPI_Win_unlock(log->otherrank, mpi_send_window[log->subtype]);
} else { } else {
/* Ready to receive. */ /* Add entry so we start checking for the remote send. */
log->data = calloc(log->size, 1);
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
subtypeMPI_comms[log->subtype], &log->req);
/* Add a new recv request. */
int ind = atomic_inc(&nr_recvs); int ind = atomic_inc(&nr_recvs);
recvs_queue[ind] = log; recvs_queue[ind] = log;
log->offset = offset;
offset += log->size + 1;
atomic_inc(&todo_recv); atomic_inc(&todo_recv);
} }
if (err != MPI_SUCCESS) error("Failed to activate send or recv"); if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++; ind_req++;
/* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are
* equally timed. Note we include a nanosleep, they are slow. */
if (looptics == 0 && usetics) {
sleep.tv_nsec = 1;
nanosleep(&sleep, NULL);
looptics = (getticks() - starttics);
if (verbose)
message("injection loop took %.3f %s.", clocks_from_ticks(looptics),
clocks_getunit());
}
} }
/* All done, thread exiting. */
if (verbose) {
message("%d injections completed, sends = %d, recvs = %d", ind_req,
nr_sends, nr_recvs);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
if (usetics) message("deadtime %.3f ms", deadtime / 1.0e6);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit()); clocks_getunit());
/* All injected, so stop checking for new receives. */
atomic_dec(&injecting); atomic_dec(&injecting);
return NULL; return NULL;
} }
/** /**
* @brief main loop to run over a queue of MPI requests and test for when they * @brief Recv thread, checks if MPI_Irecv requests have completed.
* complete. Returns the total amount of time spent in calls to MPI_Test and
* the number of times it was called.
*
* @param logs the list of logs pointing to requests.
* @param nr_logs pointer to the variable containing the current number of
* logs.
* @param todos pointer to the variable containing the number of requests that
* are still active.
* @param sum the total number of ticks spent in calls to MPI_Test.
* @param ncalls the total number of calls to MPI_Test.
* @param mint the minimum ticks an MPI_Test call took.
* @param maxt the maximum ticks an MPI_Test call took.
*/ */
static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, static void *recv_thread(void *arg) {
int volatile *todos, double *sum, int *ncalls,
ticks *mint, ticks *maxt) {
/* Global MPI_Test statistics. */ message("%d: recv thread starts", *((int *)arg));
ticks starttics = getticks();
/* Global statistics. */
int lncalls = 0; int lncalls = 0;
double lsum = 0.0; double lsum = 0.0;
ticks lmint = INT_MAX; ticks lmint = INT_MAX;
ticks lmaxt = 0; ticks lmaxt = 0;
/* We loop while new requests are being injected and we still have requests /* We loop while new requests are being injected and we still have requests
* to complete. */ * to receive. */
while (injecting || (!injecting && *todos > 0)) { while (injecting || (!injecting && todo_recv > 0)) {
int nlogs = *nr_logs; int nlogs = nr_recvs;
for (int k = 0; k < nlogs; k++) { for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = logs[k]; struct mpiuse_log_entry *log = recvs_queue[k];
if (log != NULL && !log->done) { if (log != NULL && !log->done) {
ticks tics = getticks(); ticks tics = getticks();
int res;
MPI_Status stat;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
/* Increment etc. of statistics about time in MPI_Test. */ MPI_Win_lock(MPI_LOCK_SHARED, log->otherrank, 0,
mpi_recv_window[log->subtype]);
int arrived = 0;
message("Checking at %zd", log->offset);
if (mpi_ptr[log->subtype][log->offset+log->size] == DONE) arrived = 1;
MPI_Win_unlock(log->otherrank, mpi_recv_window[log->subtype]);
/* Increment etc. of statistics about time spent waiting. */
ticks dt = getticks() - tics; ticks dt = getticks() - tics;
log->tsum += (double)dt; log->tsum += (double)dt;
lsum += (double)dt; lsum += (double)dt;
...@@ -294,7 +245,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, ...@@ -294,7 +245,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
if (dt < lmint) lmint = dt; if (dt < lmint) lmint = dt;
if (dt > lmaxt) lmaxt = dt; if (dt > lmaxt) lmaxt = dt;
if (res) { if (arrived) {
/* Check data sent data is unchanged and received data is as /* Check data sent data is unchanged and received data is as
* expected. */ * expected. */
if (datacheck && !datacheck_test(log->size, log->data)) { if (datacheck && !datacheck_test(log->size, log->data)) {
...@@ -305,66 +256,18 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, ...@@ -305,66 +256,18 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
log->done = 1; log->done = 1;
log->endtic = getticks(); log->endtic = getticks();
free(log->data); free(log->data);
atomic_dec(todos); atomic_dec(&todo_recv);
} }
} }
} }
} }
/* All done. */
*sum = lsum;
*ncalls = lncalls;
*mint = lmint;
*maxt = lmaxt;
return;
}
/**
* @brief Send thread, checks if MPI_Isend requests have completed.
*/
static void *send_thread(void *arg) {
if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting);
ticks starttics = getticks();
int ncalls;
double sum;
ticks mint;
ticks maxt;
queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt);
message( message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " "%d MPI waiting calls, which took: %.3f, mean time %.3f, min time %.3f, "
"max time "
"%.3f (%s)", "%.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), lncalls, clocks_from_ticks(lsum), clocks_from_ticks(lsum / lncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); clocks_from_ticks(lmint), clocks_from_ticks(lmaxt), clocks_getunit());
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
}
/**
* @brief Recv thread, checks if MPI_Irecv requests have completed.
*/
static void *recv_thread(void *arg) {
if (verbose) message("%d: recv thread starts", *((int *)arg));
ticks starttics = getticks();
int ncalls;
double sum;
ticks mint;
ticks maxt;
queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt);
message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time "
"%.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose) if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit()); clocks_getunit());
...@@ -391,14 +294,12 @@ static int cmp_logs(const void *p1, const void *p2) { ...@@ -391,14 +294,12 @@ static int cmp_logs(const void *p1, const void *p2) {
* The final list is sorted into increasing tag so we should have the * The final list is sorted into increasing tag so we should have the
* neccesary order for sending. * neccesary order for sending.
* *
* @param nr_ranks the number of MPI ranks.
* @param count_persubtype count of messages per subtype. * @param count_persubtype count of messages per subtype.
* @param total_persubtype total memory needed per subtype. * @param total_persubtype total memory needed per subtype.
* @param count_persubtype_perrank message counts per rank per subtype. * @param count_persubtype_perrank message counts per rank per subtype.
* @param total_persubtype_perrank total memory per rank per subtype. * @param total_persubtype_perrank total memory per rank per subtype.
*/ */
static void pick_logs(int nr_ranks, size_t *count_persubtype, static void pick_logs(size_t *count_persubtype, size_t *total_persubtype,
size_t *total_persubtype,
size_t *count_persubtype_perrank, size_t *count_persubtype_perrank,
size_t *total_persubtype_perrank) { size_t *total_persubtype_perrank) {
size_t nlogs = mpiuse_nr_logs(); size_t nlogs = mpiuse_nr_logs();
...@@ -407,9 +308,6 @@ static void pick_logs(int nr_ranks, size_t *count_persubtype, ...@@ -407,9 +308,6 @@ static void pick_logs(int nr_ranks, size_t *count_persubtype,
reqs_queue = (struct mpiuse_log_entry **)calloc( reqs_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *)); nlogs, sizeof(struct mpiuse_log_entry *));
nr_reqs = 0; nr_reqs = 0;
sends_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends = 0;
recvs_queue = (struct mpiuse_log_entry **)calloc( recvs_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *)); nlogs, sizeof(struct mpiuse_log_entry *));
nr_recvs = 0; nr_recvs = 0;
...@@ -424,18 +322,15 @@ static void pick_logs(int nr_ranks, size_t *count_persubtype, ...@@ -424,18 +322,15 @@ static void pick_logs(int nr_ranks, size_t *count_persubtype,
reqs_queue[nr_reqs] = log; reqs_queue[nr_reqs] = log;
nr_reqs++; nr_reqs++;
/* Totals of memory use for receives, + sizeof(int) for atomics, plus /* Totals of memory use for receives, + char for atomic flags */
* we round to sizeof(int) boundaries. */
if (log->type == task_type_recv) { if (log->type == task_type_recv) {
size_t sint = sizeof(int);
size_t size = (log->size + (sint - 1)) / sint * sint + sint;
count_persubtype[log->subtype]++; count_persubtype[log->subtype]++;
total_persubtype[log->subtype] += size; total_persubtype[log->subtype] += log->size + 1;
size_t index = log->otherrank * nr_ranks + log->subtype; size_t index = log->otherrank * nr_ranks + log->subtype;
count_persubtype_perrank[index]++; count_persubtype_perrank[index]++;
total_persubtype_perrank[index] += size; total_persubtype_perrank[index] += log->size + 1;
} }
} else { } else {
...@@ -449,9 +344,9 @@ static void pick_logs(int nr_ranks, size_t *count_persubtype, ...@@ -449,9 +344,9 @@ static void pick_logs(int nr_ranks, size_t *count_persubtype,
/* Check. */ /* Check. */
for (int k = 0; k < nr_reqs - 1; k++) { for (int k = 0; k < nr_reqs - 1; k++) {
if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic) if (reqs_queue[k]->tag > reqs_queue[k + 1]->tag)
message("reqs_queue: %lld > %lld", reqs_queue[k]->tic, message("reqs_queue: %d > %d", reqs_queue[k]->tag,
reqs_queue[k + 1]->tic); reqs_queue[k + 1]->tag);
} }
} }
...@@ -476,7 +371,6 @@ int main(int argc, char *argv[]) { ...@@ -476,7 +371,6 @@ int main(int argc, char *argv[]) {
if (res != MPI_SUCCESS) if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res); error("Call to MPI_Init_thread failed with error %i.", res);
int nr_ranks = 0;
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks); res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks);
if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res); if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
...@@ -487,14 +381,11 @@ int main(int argc, char *argv[]) { ...@@ -487,14 +381,11 @@ int main(int argc, char *argv[]) {
/* Handle the command-line, we expect a mpiuse data file to read and various /* Handle the command-line, we expect a mpiuse data file to read and various
* options. */ * options. */
int opt; int opt;
while ((opt = getopt(argc, argv, "vfd")) != -1) { while ((opt = getopt(argc, argv, "vd")) != -1) {
switch (opt) { switch (opt) {
case 'd': case 'd':
datacheck = 1; datacheck = 1;
break; break;
case 'f':
usetics = 0;
break;
case 'v': case 'v':
verbose = 1; verbose = 1;
break; break;
...@@ -529,28 +420,45 @@ int main(int argc, char *argv[]) { ...@@ -529,28 +420,45 @@ int main(int argc, char *argv[]) {
calloc(task_subtype_count * nr_ranks, sizeof(size_t)); calloc(task_subtype_count * nr_ranks, sizeof(size_t));
size_t *total_persubtype_perrank = size_t *total_persubtype_perrank =
calloc(task_subtype_count * nr_ranks, sizeof(size_t)); calloc(task_subtype_count * nr_ranks, sizeof(size_t));
pick_logs(nr_ranks, count_persubtype, total_persubtype, pick_logs(count_persubtype, total_persubtype, count_persubtype_perrank,
count_persubtype_perrank, total_persubtype_perrank); total_persubtype_perrank);
/* Now for the one-sided setup... We need a buffer with space for receives /* Now for the one-sided setup... We need a buffer with space for receives
* from all ranks in a communicator, plus an additional nr_ranks counts per * from all ranks in a communicator, plus an additional nr_ranks counts per
* communicator. */ * communicator. */
for (int i = 0; i < task_subtype_count; i++) { for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
MPI_Win_allocate(total_persubtype[i] + nr_ranks * sizeof(size_t), //if (total_persubtype[i] > 0) {
sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i], message("allocate window for communicator %d", i);
mpi_ptr[i], &mpi_window[i]); MPI_Win_allocate(total_persubtype[i] + nr_ranks * sizeof(size_t),
sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i],
&mpi_ptr[i], &mpi_recv_window[i]);
//} else {
//mpi_ptr[i] = NULL;
//mpi_recv_window[i] = MPI_WIN_NULL;
//}
/* Send window, just needs to attach to communicator, not data, since it
* does not receive. */
message("create window for communicator %d", i);
MPI_Win_create(NULL, 0, sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i],
&mpi_send_window[i]);
} }
message("Windows created");
/* Each rank needs to be informed what the offsets of its sends to this rank /* Each rank needs to be informed what the offsets of its sends to this rank
* should be. */ * should be. */
mpi_offsets = calloc(sizeof(size_t), nr_ranks * task_subtype_count); mpi_offsets =
calloc(sizeof(size_t), nr_ranks * nr_ranks * task_subtype_count);
for (int k = 0; k < task_subtype_count; k++) { for (int k = 0; k < task_subtype_count; k++) {
for (int i = 0; i < nr_ranks; i++) { for (int i = 0; i < nr_ranks; i++) {
size_t index = i * nr_ranks + k; size_t index = (myrank * nr_ranks * nr_ranks) + (i * nr_ranks) + k;
mpi_offsets[index] += total_persubtype_perrank[index]; mpi_offsets[index] += total_persubtype_perrank[index];
} }
} }
MPI_Allreduce(MPI_IN_PLACE, mpi_offsets,
nr_ranks * nr_ranks * task_subtype_count, MPI_AINT, MPI_SUM,
MPI_COMM_WORLD);
/* Time to start time. Try to make it synchronous across the ranks. */ /* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
...@@ -559,27 +467,22 @@ int main(int argc, char *argv[]) { ...@@ -559,27 +467,22 @@ int main(int argc, char *argv[]) {
message("Start of MPI tests"); message("Start of MPI tests");
message("=================="); message("==================");
if (verbose) { if (verbose) {
if (!usetics) message("using fast untimed injections");
if (datacheck) if (datacheck)
message("checking data pattern on send and recv completion"); message("checking data pattern on send and recv completion");
} }
} }
/* Make three threads, one for injecting tasks and two to check for /* Make two threads, one for injecting tasks and one to check for
* completions of the sends and recv independently. */ * completions of the remote sends. */
pthread_t injectthread; pthread_t injectthread;
if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0) if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
error("Failed to create injection thread."); error("Failed to create injection thread.");
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
pthread_t recvthread; pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread."); error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */ /* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread, NULL); pthread_join(injectthread, NULL);
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL); pthread_join(recvthread, NULL);
/* Dump the updated MPI logs. */ /* Dump the updated MPI logs. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment