Skip to content
Snippets Groups Projects
Commit f860ad09 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Cannot use locks across threads in the same rank, so need a global shared lock...

Cannot use locks across threads in the same rank, so need a global shared lock and use flushes to update the data, still not working we as need to synchronize on the send side...
parent 85e75e83
Branches
No related tags found
2 merge requests!11Draft: Fast one-sided MPI version,!8Draft: RDMA version with wrapped infinity calls
...@@ -127,9 +127,6 @@ static void *send_thread(void *arg) { ...@@ -127,9 +127,6 @@ static void *send_thread(void *arg) {
((char *)log->data)[log->size] = READY; ((char *)log->data)[log->size] = READY;
/* And send data to other rank. */ /* And send data to other rank. */
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, log->otherrank, 0,
mpi_window[log->subtype]);
MPI_Accumulate(log->data, log->size + 1, MPI_BYTE, log->otherrank, 0, MPI_Accumulate(log->data, log->size + 1, MPI_BYTE, log->otherrank, 0,
log->size + 1, MPI_BYTE, MPI_REPLACE, log->size + 1, MPI_BYTE, MPI_REPLACE,
mpi_window[log->subtype]); mpi_window[log->subtype]);
...@@ -143,12 +140,12 @@ static void *send_thread(void *arg) { ...@@ -143,12 +140,12 @@ static void *send_thread(void *arg) {
MPI_Compare_and_swap(&newval[0], &((char *)log->data)[log->size], MPI_Compare_and_swap(&newval[0], &((char *)log->data)[log->size],
&oldval[0], MPI_BYTE, log->otherrank, log->size, &oldval[0], MPI_BYTE, log->otherrank, log->size,
mpi_window[log->subtype]); mpi_window[log->subtype]);
MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); //MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
MPI_Win_flush_all(mpi_window[log->subtype]);
// XXX need to make sure the data area is free to overwrite once more XXX // XXX need to make sure the data area is free to overwrite once more XXX
// flip the remote atomic. // flip the remote atomic.
MPI_Win_unlock(log->otherrank, mpi_window[log->subtype]);
} }
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
...@@ -183,16 +180,17 @@ static void *recv_thread(void *arg) { ...@@ -183,16 +180,17 @@ static void *recv_thread(void *arg) {
if (log != NULL && !log->done) { if (log != NULL && !log->done) {
ticks tics = getticks(); ticks tics = getticks();
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, log->otherrank, 0,
mpi_window[log->subtype]);
int arrived = 0; int arrived = 0;
//message("Checking at %zd", log->offset); //message("Checking at %zd", log->size);
if (mpi_ptr[log->subtype][log->size] == DONE) arrived = 1; if (mpi_ptr[log->subtype][log->size] == DONE) arrived = 1;
// XXX match this to the expected receive... // Need to allow for some MPI progession... Since we make no
// MPI calls. Should not be needed if using a progression thread?
int flag;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
&flag, MPI_STATUS_IGNORE);
MPI_Win_unlock(log->otherrank, mpi_window[log->subtype]); // XXX match this to the expected receive...
/* Increment etc. of statistics about time spent waiting. */ /* Increment etc. of statistics about time spent waiting. */
ticks dt = getticks() - tics; ticks dt = getticks() - tics;
...@@ -371,6 +369,9 @@ int main(int argc, char *argv[]) { ...@@ -371,6 +369,9 @@ int main(int argc, char *argv[]) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
MPI_Win_allocate(maxsize + sizeof(size_t), sizeof(int), MPI_INFO_NULL, MPI_Win_allocate(maxsize + sizeof(size_t), sizeof(int), MPI_INFO_NULL,
subtypeMPI_comms[i], &mpi_ptr[i], &mpi_window[i]); subtypeMPI_comms[i], &mpi_ptr[i], &mpi_window[i]);
/* Assert a shared lock with all the other processes on this window. */
MPI_Win_lock_all(0, mpi_window[i]);
} }
message("Windows allocated"); message("Windows allocated");
...@@ -399,6 +400,11 @@ int main(int argc, char *argv[]) { ...@@ -399,6 +400,11 @@ int main(int argc, char *argv[]) {
pthread_join(sendthread, NULL); pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL); pthread_join(recvthread, NULL);
/* Free the window locks. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Win_unlock_all(mpi_window[i]);
}
/* Dump the updated MPI logs. */ /* Dump the updated MPI logs. */
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
fflush(stdout); fflush(stdout);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment