From edf07c532a5d29f960afa3216fa83923741db566 Mon Sep 17 00:00:00 2001 From: "Peter W. Draper" <p.w.draper@durham.ac.uk> Date: Thu, 7 May 2020 18:16:45 +0100 Subject: [PATCH] Works on a fabric with the EAGLE/25 16 rank test now Important change is moving flush to immediately after MPI_Rget, if before then the test loop could not see the update, also keep to the passive contract, only flush on the push side, even though we get from the destination, so it knows when to flush --- error.h | 13 +++++++ swiftmpirdmastepsim.c | 91 ++++++++++++++++++++++++++----------------- 2 files changed, 68 insertions(+), 36 deletions(-) diff --git a/error.h b/error.h index d5e3314..26b99fb 100644 --- a/error.h +++ b/error.h @@ -19,3 +19,16 @@ extern int myrank; printf("[%04i] %s %s: " s "\n", myrank, clocks_get_timesincestart(), \ __FUNCTION__, ##__VA_ARGS__); \ }) + +/* Print MPI error string. */ +#define mpi_error_message(res, s, ...) \ + ({ \ + fflush(stdout); \ + fprintf(stderr, "[%04i] %s %s:%s():%i: " s "\n", myrank, \ + clocks_get_timesincestart(), __FILE__, __FUNCTION__, __LINE__, \ + ##__VA_ARGS__); \ + int len = 1024; \ + char buf[len]; \ + MPI_Error_string(res, buf, &len); \ + fprintf(stderr, "%s\n\n", buf); \ + }) diff --git a/swiftmpirdmastepsim.c b/swiftmpirdmastepsim.c index cd73d1c..f2e21e3 100644 --- a/swiftmpirdmastepsim.c +++ b/swiftmpirdmastepsim.c @@ -120,7 +120,7 @@ static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); } */ static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) { for (BLOCKTYPE i = 0; i < size; i++) { - data[i] = myrank; + data[i] = myrank; } } @@ -162,8 +162,7 @@ static void *send_thread(void *arg) { log->data = dataptr; /* Fill data with pattern. */ - if (datacheck) datacheck_fill(toblocks(log->size), - &dataptr[HEADER_SIZE]); + if (datacheck) datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE]); /* First element is marked as LOCKED, so only we can update. */ dataptr[0] = LOCKED; @@ -174,25 +173,28 @@ static void *send_thread(void *arg) { int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank, MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE, MPI_REPLACE, mpi_window[log->subtype]); - if (ret != MPI_SUCCESS) - error("Failed to accumulate data: %d", ret); - + if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data"); + /* Now we change the last element to UNLOCKED so that the remote end can * find out that the data has arrived. */ BLOCKTYPE newval[1]; BLOCKTYPE oldval[1]; newval[0] = UNLOCKED; oldval[0] = 0; - MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_BLOCKTYPE, - log->otherrank, MESSAGE_SIZE * myrank, - mpi_window[log->subtype]); + ret = MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_BLOCKTYPE, + log->otherrank, MESSAGE_SIZE * myrank, + mpi_window[log->subtype]); - MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); + if (ret != MPI_SUCCESS) + mpi_error_message(ret, "MPI_Compare_and_swap error"); + + ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed"); if (verbose) { if (oldval[0] == dataptr[0]) { - message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)", log->otherrank, - log->subtype, dataptr[0], oldval[0], newval[0], + message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)", + log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0], MESSAGE_SIZE * myrank, k, nr_send); } else { message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd %d/%d", @@ -205,22 +207,38 @@ static void *send_thread(void *arg) { * on a get, as the local window is only used for receiving. Use an Rget * so we can use MPI_Test to get some local progression. */ newval[0] = UNLOCKED; + size_t counter = 0; while (newval[0] != LOCKED) { - //MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); // emergency MPI_Request request; - MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank, - MESSAGE_SIZE * myrank, 1, MPI_BLOCKTYPE, - mpi_window[log->subtype], &request); + ret = MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank, + MESSAGE_SIZE * myrank, 1, MPI_BLOCKTYPE, + mpi_window[log->subtype], &request); + + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Rget failed"); + + /* After the rget to make sure we get a chance at completion. */ + ret = + MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); // emergency + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed"); + int flag = 0; while (flag == 0) { - MPI_Test(&request, &flag, MPI_STATUS_IGNORE); + ret = MPI_Test(&request, &flag, MPI_STATUS_IGNORE); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Test failed"); + + counter++; + if (counter > 1000000) { + message("message sent to %d/%d %zd %d/%d stuck in test", + log->otherrank, log->subtype, newval[0], k, nr_send); + counter = 0; + } } } if (verbose) message("sent and acknowledged message to %d/%d %zd %d/%d)", - log->otherrank, log->subtype, dataptr[0], k, nr_send); + log->otherrank, log->subtype, dataptr[0], k, nr_send); } message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), @@ -234,7 +252,10 @@ static void *send_thread(void *arg) { */ static void *recv_thread(void *arg) { - message("%d: recv thread starts", *((int *)arg)); + message( + "%d: recv thread starts, checking for %d messages %d ranks " + " %d communicators", + *((int *)arg), nr_recv, nr_ranks, task_subtype_count); ticks starttics = getticks(); /* Global statistics. */ @@ -256,31 +277,27 @@ static void *recv_thread(void *arg) { for (int j = 0; j < task_subtype_count; j++) { if (todo_recv <= 0) break; - //MPI_Win_flush(n, mpi_window[j]); // XXX emergency measure BLOCKTYPE *dataptr = &mpi_ptr[j][n * MESSAGE_SIZE]; - BLOCKTYPE lockval = dataptr[0]; - - if (lockval == UNLOCKED) { + if (dataptr[0] == UNLOCKED) { /* We have a message waiting to be handled, find the log. */ int found = 0; for (int k = 0; k < nr_recv; k++) { struct mpiuse_log_entry *log = recv_queue[k]; if (log != NULL && !log->done && log->otherrank == n && - log->subtype == j && - log->size == dataptr[1] && log->tag == dataptr[2]) { + log->subtype == j && log->size == dataptr[1] && + log->tag == dataptr[2]) { found = 1; if (verbose) - message("receive message %d/%d from %d @ %zd: lockval %zd", + message("receive message %d/%d from %d @ %zd: dataptr[0] %zd", log->rank, log->subtype, n, n * MESSAGE_SIZE, - lockval); + dataptr[0]); /* Check data sent data is unchanged and received data is as * expected. */ - if (datacheck && - !datacheck_test(toblocks(log->size), - &dataptr[HEADER_SIZE], n)) { + if (datacheck && !datacheck_test(toblocks(log->size), + &dataptr[HEADER_SIZE], n)) { message("Data mismatch on completion"); } @@ -300,11 +317,13 @@ static void *recv_thread(void *arg) { } } - /* Need to allow for some MPI progession. Since we make no - * MPI calls. Should not be needed if using a progression thread? */ + /* Need to allow for some MPI progession. Since we make no MPI calls + * (by intent receive is a passive target so only the sender should + * make calls that move data). */ int flag = 0; - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, - MPI_STATUS_IGNORE); + int ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, + MPI_STATUS_IGNORE); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Iprobe failed"); } } } @@ -453,9 +472,9 @@ int main(int argc, char *argv[]) { &mpi_window[i]); /* Assert a shared lock with all the other processes on this window. */ - MPI_Win_lock_all(0, mpi_window[i]); + MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]); } - //message("Windows allocated"); + // message("Windows allocated"); /* Time to start time. Try to make it synchronous across the ranks. */ MPI_Barrier(MPI_COMM_WORLD); -- GitLab