diff --git a/Makefile b/Makefile index 64b311ffb9b4759f5c1553b3d7f20aca8702221b..8bb053a79fd9e45aebacce1a36a56069b72393bf 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,7 @@ #CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined -CFLAGS = -g -O0 -Wall -Iinfinity/include +#CFLAGS = -g -O0 -Wall -Iinfinity/include +CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread + all: swiftmpistepsim swiftmpirdmastepsim diff --git a/swiftmpirdmastepsim.c b/swiftmpirdmastepsim.c index bd91778013f17923564d7ba9e74cb0d863bcab08..cb355c3f380c22466c7d9354247a3dd8ffd141e2 100644 --- a/swiftmpirdmastepsim.c +++ b/swiftmpirdmastepsim.c @@ -48,7 +48,7 @@ int myrank = -1; static int nr_ranks; /* Base port no. Ranks use +rank. */ -static int BASE_PORT = 17771; +static int BASE_PORT = 27771; /* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as * as we need to align in memory. */ @@ -128,14 +128,15 @@ static int toblocks(BLOCKTYPE nr_bytes) { static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); } /** - * @brief fill a data area with our rank. + * @brief fill a data area with given value. * * @param size size of data in bytes. * @param data the data to fill. + * @param value the value to fill. */ -static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) { +static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) { for (BLOCKTYPE i = 0; i < size; i++) { - data[i] = myrank; + data[i] = value; } } @@ -144,14 +145,15 @@ static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) { * * @param size size of data in bytes. * @param data the data to check. - * @param rank the value to, i.e. original rank. + * @param value the value expected. * * @result 1 on success, 0 otherwise. */ -static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) { +static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) { for (size_t i = 0; i < size; i++) { - if (data[i] != (BLOCKTYPE)rank) { - message("see %zd expected %d @ %zd (%zd to go)", data[i], rank, i, size); + if (data[i] != value) { + message("see %zd expected %zd @ %zd (%zd to go)", data[i], value, i, + size); return 0; } } @@ -184,10 +186,11 @@ static void *send_thread(void *arg) { // Get QP to the other rank. Note we cannot do this until the related // server is up and running, so make sure that is true.. - message("waiting for connection to remote server %s %d", server_ip, - BASE_PORT + myrank); + message("%d waiting for connection to remote server %s %d on %d", myrank, + server_ip, rank, BASE_PORT + myrank); auto qp = qpFactory->connectToRemoteHost(server_ip, BASE_PORT + myrank); - message("connected to remote server %s %d", server_ip, BASE_PORT + myrank); + message("%d connected to remote server %s %d on %d", myrank, server_ip, rank, + BASE_PORT + myrank); for (int k = 0; k < nr_send; k++) { struct mpiuse_log_entry *log = send_queue[k]; @@ -201,7 +204,8 @@ static void *send_thread(void *arg) { log->data = dataptr; /* Fill data with pattern. */ - if (datacheck) datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE]); + if (datacheck) + datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE], log->tag); /* First element has our rank, other elements replicate what we need to * define an MPI message. */ @@ -245,6 +249,8 @@ static void *recv_thread(void *arg) { context->postReceiveBuffer(receiveBuffer); // Port binding. + message("%d binding to %d on port %d", myrank, rank, BASE_PORT + rank); + fflush(stdout); qpFactory->bindToPort(BASE_PORT + rank); message("Blocking for first message on %d", BASE_PORT + rank); @@ -281,13 +287,13 @@ static void *recv_thread(void *arg) { found = 1; if (verbose) - message("receive message subtype %d from %d on %d", log->subtype, rank, - myrank); + message("receive message subtype %d from %d on %d", log->subtype, + rank, myrank); /* Check data sent data is unchanged and received data is as * expected. */ - if (datacheck && - !datacheck_test(toblocks(log->size), &dataptr[HEADER_SIZE], rank)) { + if (datacheck && !datacheck_test(toblocks(log->size), + &dataptr[HEADER_SIZE], log->tag)) { message("Data mismatch on completion"); } @@ -298,9 +304,11 @@ static void *recv_thread(void *arg) { } } if (!found) { - error("No matching receive on connections to %d (%d of %d todo:" - " rank = %d otherrank = %d subtype = %d size = %zd tag = %d)", - BASE_PORT + rank, todo_recv, nr_recvs[rank], myrank, rank, subtype, size, tag); + error( + "No matching receive on connections to %d (%d of %d todo:" + " rank = %d otherrank = %d subtype = %d size = %zd tag = %d)", + BASE_PORT + rank, todo_recv, nr_recvs[rank], myrank, rank, subtype, + size, tag); } // Ready for next use of buffer? @@ -310,6 +318,8 @@ static void *recv_thread(void *arg) { message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), clocks_getunit()); + message("context use_count = %zd", context.use_count()); + /* Thread exits. */ return NULL; } @@ -495,10 +505,9 @@ int main(int argc, char *argv[]) { /* Make a thread per rank, each one has a QP that connects between this rank * and that rank. We need to start all the server threads first. */ pthread_t recvthread[nr_ranks]; - int ranks[nr_ranks]; + int *ranks = (int *)malloc(nr_ranks * sizeof(int)); for (int k = 0; k < nr_ranks; k++) { if (k != myrank) { - starting[k] = 1; ranks[k] = k; if (pthread_create(&recvthread[k], NULL, &recv_thread, &ranks[k]) != 0) error("Failed to create recv thread."); @@ -516,13 +525,14 @@ int main(int argc, char *argv[]) { } } message("All servers are started"); + // And make sure all remotes are also ready. MPI_Barrier(MPI_COMM_WORLD); message("All synchronized"); /* Now we have a thread per rank to send the messages. */ pthread_t sendthread[nr_ranks]; - struct stuff stuff[nr_ranks]; + struct stuff *stuff = (struct stuff *)malloc(nr_ranks * sizeof(struct stuff)); for (int k = 0; k < nr_ranks; k++) { if (k != myrank) { strcpy(stuff[k].server_ip, &server_ips[k * MPI_MAX_PROCESSOR_NAME]); @@ -552,6 +562,11 @@ int main(int argc, char *argv[]) { if (res != MPI_SUCCESS) error("call to MPI_Finalize failed with error %i.", res); + /* Free resources. */ + free(server_ips); + free(stuff); + free(ranks); + if (myrank == 0) message("Bye"); return 0;