Skip to content
Snippets Groups Projects
Commit edf07c53 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Works on a fabric with the EAGLE/25 16 rank test now

Important change is moving flush to immediately after MPI_Rget, if before then the test loop could not see the update, also keep to the passive contract, only flush on the push side, even though we get from the destination, so it knows when to flush
parent aa13a613
No related branches found
No related tags found
2 merge requests!11Draft: Fast one-sided MPI version,!8Draft: RDMA version with wrapped infinity calls
......@@ -19,3 +19,16 @@ extern int myrank;
printf("[%04i] %s %s: " s "\n", myrank, clocks_get_timesincestart(), \
__FUNCTION__, ##__VA_ARGS__); \
})
/* Print MPI error string. */
#define mpi_error_message(res, s, ...) \
({ \
fflush(stdout); \
fprintf(stderr, "[%04i] %s %s:%s():%i: " s "\n", myrank, \
clocks_get_timesincestart(), __FILE__, __FUNCTION__, __LINE__, \
##__VA_ARGS__); \
int len = 1024; \
char buf[len]; \
MPI_Error_string(res, buf, &len); \
fprintf(stderr, "%s\n\n", buf); \
})
......@@ -120,7 +120,7 @@ static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); }
*/
static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
for (BLOCKTYPE i = 0; i < size; i++) {
data[i] = myrank;
data[i] = myrank;
}
}
......@@ -162,8 +162,7 @@ static void *send_thread(void *arg) {
log->data = dataptr;
/* Fill data with pattern. */
if (datacheck) datacheck_fill(toblocks(log->size),
&dataptr[HEADER_SIZE]);
if (datacheck) datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE]);
/* First element is marked as LOCKED, so only we can update. */
dataptr[0] = LOCKED;
......@@ -174,25 +173,28 @@ static void *send_thread(void *arg) {
int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE,
MPI_REPLACE, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS)
error("Failed to accumulate data: %d", ret);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data");
/* Now we change the last element to UNLOCKED so that the remote end can
* find out that the data has arrived. */
BLOCKTYPE newval[1];
BLOCKTYPE oldval[1];
newval[0] = UNLOCKED;
oldval[0] = 0;
MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_BLOCKTYPE,
log->otherrank, MESSAGE_SIZE * myrank,
mpi_window[log->subtype]);
ret = MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_BLOCKTYPE,
log->otherrank, MESSAGE_SIZE * myrank,
mpi_window[log->subtype]);
MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS)
mpi_error_message(ret, "MPI_Compare_and_swap error");
ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
if (verbose) {
if (oldval[0] == dataptr[0]) {
message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)", log->otherrank,
log->subtype, dataptr[0], oldval[0], newval[0],
message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)",
log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
MESSAGE_SIZE * myrank, k, nr_send);
} else {
message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd %d/%d",
......@@ -205,22 +207,38 @@ static void *send_thread(void *arg) {
* on a get, as the local window is only used for receiving. Use an Rget
* so we can use MPI_Test to get some local progression. */
newval[0] = UNLOCKED;
size_t counter = 0;
while (newval[0] != LOCKED) {
//MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); // emergency
MPI_Request request;
MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank,
MESSAGE_SIZE * myrank, 1, MPI_BLOCKTYPE,
mpi_window[log->subtype], &request);
ret = MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank,
MESSAGE_SIZE * myrank, 1, MPI_BLOCKTYPE,
mpi_window[log->subtype], &request);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Rget failed");
/* After the rget to make sure we get a chance at completion. */
ret =
MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); // emergency
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
int flag = 0;
while (flag == 0) {
MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
ret = MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Test failed");
counter++;
if (counter > 1000000) {
message("message sent to %d/%d %zd %d/%d stuck in test",
log->otherrank, log->subtype, newval[0], k, nr_send);
counter = 0;
}
}
}
if (verbose)
message("sent and acknowledged message to %d/%d %zd %d/%d)",
log->otherrank, log->subtype, dataptr[0], k, nr_send);
log->otherrank, log->subtype, dataptr[0], k, nr_send);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
......@@ -234,7 +252,10 @@ static void *send_thread(void *arg) {
*/
static void *recv_thread(void *arg) {
message("%d: recv thread starts", *((int *)arg));
message(
"%d: recv thread starts, checking for %d messages %d ranks "
" %d communicators",
*((int *)arg), nr_recv, nr_ranks, task_subtype_count);
ticks starttics = getticks();
/* Global statistics. */
......@@ -256,31 +277,27 @@ static void *recv_thread(void *arg) {
for (int j = 0; j < task_subtype_count; j++) {
if (todo_recv <= 0) break;
//MPI_Win_flush(n, mpi_window[j]); // XXX emergency measure
BLOCKTYPE *dataptr = &mpi_ptr[j][n * MESSAGE_SIZE];
BLOCKTYPE lockval = dataptr[0];
if (lockval == UNLOCKED) {
if (dataptr[0] == UNLOCKED) {
/* We have a message waiting to be handled, find the log. */
int found = 0;
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done && log->otherrank == n &&
log->subtype == j &&
log->size == dataptr[1] && log->tag == dataptr[2]) {
log->subtype == j && log->size == dataptr[1] &&
log->tag == dataptr[2]) {
found = 1;
if (verbose)
message("receive message %d/%d from %d @ %zd: lockval %zd",
message("receive message %d/%d from %d @ %zd: dataptr[0] %zd",
log->rank, log->subtype, n, n * MESSAGE_SIZE,
lockval);
dataptr[0]);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck &&
!datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], n)) {
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], n)) {
message("Data mismatch on completion");
}
......@@ -300,11 +317,13 @@ static void *recv_thread(void *arg) {
}
}
/* Need to allow for some MPI progession. Since we make no
* MPI calls. Should not be needed if using a progression thread? */
/* Need to allow for some MPI progession. Since we make no MPI calls
* (by intent receive is a passive target so only the sender should
* make calls that move data). */
int flag = 0;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag,
MPI_STATUS_IGNORE);
int ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag,
MPI_STATUS_IGNORE);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Iprobe failed");
}
}
}
......@@ -453,9 +472,9 @@ int main(int argc, char *argv[]) {
&mpi_window[i]);
/* Assert a shared lock with all the other processes on this window. */
MPI_Win_lock_all(0, mpi_window[i]);
MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]);
}
//message("Windows allocated");
// message("Windows allocated");
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment