diff --git a/error.h b/error.h index d5e331486620f77cbf16b15ad673a9c43ad7876b..26b99fb0fa76adcf601c35717b3e1bc19bdb826a 100644 --- a/error.h +++ b/error.h @@ -19,3 +19,16 @@ extern int myrank; printf("[%04i] %s %s: " s "\n", myrank, clocks_get_timesincestart(), \ __FUNCTION__, ##__VA_ARGS__); \ }) + +/* Print MPI error string. */ +#define mpi_error_message(res, s, ...) \ + ({ \ + fflush(stdout); \ + fprintf(stderr, "[%04i] %s %s:%s():%i: " s "\n", myrank, \ + clocks_get_timesincestart(), __FILE__, __FUNCTION__, __LINE__, \ + ##__VA_ARGS__); \ + int len = 1024; \ + char buf[len]; \ + MPI_Error_string(res, buf, &len); \ + fprintf(stderr, "%s\n\n", buf); \ + }) diff --git a/swiftmpirdmastepsim.c b/swiftmpirdmastepsim.c index cd73d1c939c38ca2d3f04c504a255d6d65068e9f..f2e21e3fff18b6e887d083e08319c7e8315bd7cc 100644 --- a/swiftmpirdmastepsim.c +++ b/swiftmpirdmastepsim.c @@ -120,7 +120,7 @@ static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); } */ static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) { for (BLOCKTYPE i = 0; i < size; i++) { - data[i] = myrank; + data[i] = myrank; } } @@ -162,8 +162,7 @@ static void *send_thread(void *arg) { log->data = dataptr; /* Fill data with pattern. */ - if (datacheck) datacheck_fill(toblocks(log->size), - &dataptr[HEADER_SIZE]); + if (datacheck) datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE]); /* First element is marked as LOCKED, so only we can update. */ dataptr[0] = LOCKED; @@ -174,25 +173,28 @@ static void *send_thread(void *arg) { int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank, MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE, MPI_REPLACE, mpi_window[log->subtype]); - if (ret != MPI_SUCCESS) - error("Failed to accumulate data: %d", ret); - + if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data"); + /* Now we change the last element to UNLOCKED so that the remote end can * find out that the data has arrived. */ BLOCKTYPE newval[1]; BLOCKTYPE oldval[1]; newval[0] = UNLOCKED; oldval[0] = 0; - MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_BLOCKTYPE, - log->otherrank, MESSAGE_SIZE * myrank, - mpi_window[log->subtype]); + ret = MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_BLOCKTYPE, + log->otherrank, MESSAGE_SIZE * myrank, + mpi_window[log->subtype]); - MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); + if (ret != MPI_SUCCESS) + mpi_error_message(ret, "MPI_Compare_and_swap error"); + + ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed"); if (verbose) { if (oldval[0] == dataptr[0]) { - message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)", log->otherrank, - log->subtype, dataptr[0], oldval[0], newval[0], + message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)", + log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0], MESSAGE_SIZE * myrank, k, nr_send); } else { message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd %d/%d", @@ -205,22 +207,38 @@ static void *send_thread(void *arg) { * on a get, as the local window is only used for receiving. Use an Rget * so we can use MPI_Test to get some local progression. */ newval[0] = UNLOCKED; + size_t counter = 0; while (newval[0] != LOCKED) { - //MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); // emergency MPI_Request request; - MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank, - MESSAGE_SIZE * myrank, 1, MPI_BLOCKTYPE, - mpi_window[log->subtype], &request); + ret = MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank, + MESSAGE_SIZE * myrank, 1, MPI_BLOCKTYPE, + mpi_window[log->subtype], &request); + + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Rget failed"); + + /* After the rget to make sure we get a chance at completion. */ + ret = + MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); // emergency + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed"); + int flag = 0; while (flag == 0) { - MPI_Test(&request, &flag, MPI_STATUS_IGNORE); + ret = MPI_Test(&request, &flag, MPI_STATUS_IGNORE); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Test failed"); + + counter++; + if (counter > 1000000) { + message("message sent to %d/%d %zd %d/%d stuck in test", + log->otherrank, log->subtype, newval[0], k, nr_send); + counter = 0; + } } } if (verbose) message("sent and acknowledged message to %d/%d %zd %d/%d)", - log->otherrank, log->subtype, dataptr[0], k, nr_send); + log->otherrank, log->subtype, dataptr[0], k, nr_send); } message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), @@ -234,7 +252,10 @@ static void *send_thread(void *arg) { */ static void *recv_thread(void *arg) { - message("%d: recv thread starts", *((int *)arg)); + message( + "%d: recv thread starts, checking for %d messages %d ranks " + " %d communicators", + *((int *)arg), nr_recv, nr_ranks, task_subtype_count); ticks starttics = getticks(); /* Global statistics. */ @@ -256,31 +277,27 @@ static void *recv_thread(void *arg) { for (int j = 0; j < task_subtype_count; j++) { if (todo_recv <= 0) break; - //MPI_Win_flush(n, mpi_window[j]); // XXX emergency measure BLOCKTYPE *dataptr = &mpi_ptr[j][n * MESSAGE_SIZE]; - BLOCKTYPE lockval = dataptr[0]; - - if (lockval == UNLOCKED) { + if (dataptr[0] == UNLOCKED) { /* We have a message waiting to be handled, find the log. */ int found = 0; for (int k = 0; k < nr_recv; k++) { struct mpiuse_log_entry *log = recv_queue[k]; if (log != NULL && !log->done && log->otherrank == n && - log->subtype == j && - log->size == dataptr[1] && log->tag == dataptr[2]) { + log->subtype == j && log->size == dataptr[1] && + log->tag == dataptr[2]) { found = 1; if (verbose) - message("receive message %d/%d from %d @ %zd: lockval %zd", + message("receive message %d/%d from %d @ %zd: dataptr[0] %zd", log->rank, log->subtype, n, n * MESSAGE_SIZE, - lockval); + dataptr[0]); /* Check data sent data is unchanged and received data is as * expected. */ - if (datacheck && - !datacheck_test(toblocks(log->size), - &dataptr[HEADER_SIZE], n)) { + if (datacheck && !datacheck_test(toblocks(log->size), + &dataptr[HEADER_SIZE], n)) { message("Data mismatch on completion"); } @@ -300,11 +317,13 @@ static void *recv_thread(void *arg) { } } - /* Need to allow for some MPI progession. Since we make no - * MPI calls. Should not be needed if using a progression thread? */ + /* Need to allow for some MPI progession. Since we make no MPI calls + * (by intent receive is a passive target so only the sender should + * make calls that move data). */ int flag = 0; - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, - MPI_STATUS_IGNORE); + int ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, + MPI_STATUS_IGNORE); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Iprobe failed"); } } } @@ -453,9 +472,9 @@ int main(int argc, char *argv[]) { &mpi_window[i]); /* Assert a shared lock with all the other processes on this window. */ - MPI_Win_lock_all(0, mpi_window[i]); + MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]); } - //message("Windows allocated"); + // message("Windows allocated"); /* Time to start time. Try to make it synchronous across the ranks. */ MPI_Barrier(MPI_COMM_WORLD);