Skip to content
Snippets Groups Projects

Draft: Fast one-sided MPI version

Open Peter W. Draper requested to merge asyncreallyonesided-fast into master
1 file
+ 28
25
Compare changes
  • Side-by-side
  • Inline
+ 28
25
@@ -189,22 +189,24 @@ static void *send_thread(void *arg) {
MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
// if (oldval[0] == dataptr[0]) {
// message("sent a message to %d/%d (%zd:%zd:%zd @ %zd)", log->otherrank,
// log->subtype, dataptr[0], oldval[0], newval[0],
// MESSAGE_SIZE * myrank);
//} else {
// message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd",
// log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
// MESSAGE_SIZE * myrank);
//}
if (verbose) {
if (oldval[0] == dataptr[0]) {
message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)", log->otherrank,
log->subtype, dataptr[0], oldval[0], newval[0],
MESSAGE_SIZE * myrank, k, nr_send);
} else {
message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd %d/%d",
log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
MESSAGE_SIZE * myrank, k, nr_send);
}
}
/* Wait for completion, this is when remote flips back to LOCKED. We poll
* on a get, as the local window is only used for receiving. Use an Rget
* so we can use MPI_Test to get some local progression. */
newval[0] = UNLOCKED;
while (newval[0] != LOCKED) {
// MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
//MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); // emergency
MPI_Request request;
MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank,
@@ -216,9 +218,9 @@ static void *send_thread(void *arg) {
}
}
//message("sent and received... %d/%d/%d", k, nr_send,
// ((char *)log->data)[0]);
/* Ready the next send. */
if (verbose)
message("sent and acknowledged message to %d/%d %zd %d/%d)",
log->otherrank, log->subtype, dataptr[0], k, nr_send);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
@@ -269,9 +271,10 @@ static void *recv_thread(void *arg) {
log->size == dataptr[1] && log->tag == dataptr[2]) {
found = 1;
//message("We have a ready message %d/%d at %zd: lockval %zd",
// log->rank, log->subtype, n * MESSAGE_SIZE,
// lockval);
if (verbose)
message("receive message %d/%d from %d @ %zd: lockval %zd",
log->rank, log->subtype, n, n * MESSAGE_SIZE,
lockval);
/* Check data sent data is unchanged and received data is as
* expected. */
@@ -295,13 +298,13 @@ static void *recv_thread(void *arg) {
if (!found) {
error("Failed to find a matching receive");
}
/* Need to allow for some MPI progession. Since we make no
* MPI calls. Should not be needed if using a progression thread? */
int flag = 0;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag,
MPI_STATUS_IGNORE);
}
/* Need to allow for some MPI progession. Since we make no
* MPI calls. Should not be needed if using a progression thread? */
int flag = 0;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag,
MPI_STATUS_IGNORE);
}
}
}
@@ -452,7 +455,7 @@ int main(int argc, char *argv[]) {
/* Assert a shared lock with all the other processes on this window. */
MPI_Win_lock_all(0, mpi_window[i]);
}
message("Windows allocated");
//message("Windows allocated");
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
@@ -479,14 +482,14 @@ int main(int argc, char *argv[]) {
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Free the window locks. */
/* Free the window locks. Once we all arrive. */
MPI_Barrier(MPI_COMM_WORLD);
for (int i = 0; i < task_subtype_count; i++) {
MPI_Win_unlock_all(mpi_window[i]);
MPI_Win_free(&mpi_window[i]);
}
/* Dump the updated MPI logs. */
MPI_Barrier(MPI_COMM_WORLD);
fflush(stdout);
if (myrank == 0) message("Dumping updated log");
mpiuse_dump_logs(nranks, logfile);
Loading