diff --git a/swiftmpirdmastepsim.c b/swiftmpirdmastepsim.c index 605125f983ffcc094e7ac947d9d9f776c4e8c7ad..c81a1e47da84c0232f3acf1b46afd35f7a32c0e5 100644 --- a/swiftmpirdmastepsim.c +++ b/swiftmpirdmastepsim.c @@ -127,9 +127,6 @@ static void *send_thread(void *arg) { ((char *)log->data)[log->size] = READY; /* And send data to other rank. */ - MPI_Win_lock(MPI_LOCK_EXCLUSIVE, log->otherrank, 0, - mpi_window[log->subtype]); - MPI_Accumulate(log->data, log->size + 1, MPI_BYTE, log->otherrank, 0, log->size + 1, MPI_BYTE, MPI_REPLACE, mpi_window[log->subtype]); @@ -143,12 +140,12 @@ static void *send_thread(void *arg) { MPI_Compare_and_swap(&newval[0], &((char *)log->data)[log->size], &oldval[0], MPI_BYTE, log->otherrank, log->size, mpi_window[log->subtype]); - MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); + //MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); + MPI_Win_flush_all(mpi_window[log->subtype]); // XXX need to make sure the data area is free to overwrite once more XXX // flip the remote atomic. - MPI_Win_unlock(log->otherrank, mpi_window[log->subtype]); } message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), @@ -183,16 +180,17 @@ static void *recv_thread(void *arg) { if (log != NULL && !log->done) { ticks tics = getticks(); - MPI_Win_lock(MPI_LOCK_EXCLUSIVE, log->otherrank, 0, - mpi_window[log->subtype]); - int arrived = 0; - //message("Checking at %zd", log->offset); + //message("Checking at %zd", log->size); if (mpi_ptr[log->subtype][log->size] == DONE) arrived = 1; - // XXX match this to the expected receive... + // Need to allow for some MPI progession... Since we make no + // MPI calls. Should not be needed if using a progression thread? + int flag; + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, + &flag, MPI_STATUS_IGNORE); - MPI_Win_unlock(log->otherrank, mpi_window[log->subtype]); + // XXX match this to the expected receive... /* Increment etc. of statistics about time spent waiting. */ ticks dt = getticks() - tics; @@ -371,6 +369,9 @@ int main(int argc, char *argv[]) { MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); MPI_Win_allocate(maxsize + sizeof(size_t), sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i], &mpi_ptr[i], &mpi_window[i]); + + /* Assert a shared lock with all the other processes on this window. */ + MPI_Win_lock_all(0, mpi_window[i]); } message("Windows allocated"); @@ -399,6 +400,11 @@ int main(int argc, char *argv[]) { pthread_join(sendthread, NULL); pthread_join(recvthread, NULL); + /* Free the window locks. */ + for (int i = 0; i < task_subtype_count; i++) { + MPI_Win_unlock_all(mpi_window[i]); + } + /* Dump the updated MPI logs. */ MPI_Barrier(MPI_COMM_WORLD); fflush(stdout);