Skip to content
Snippets Groups Projects
Commit 00d2b44a authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Capture progress, runs sort of, but suspect that the razeh fork is not thread...

Capture progress, runs sort of, but suspect that the razeh fork is not thread safe so we have issue...
parent 16ff100c
No related branches found
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined
CFLAGS = -g -O0 -Wall -Iinfinity/include
#CFLAGS = -g -O0 -Wall -Iinfinity/include
CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread
all: swiftmpistepsim swiftmpirdmastepsim
......
......@@ -48,7 +48,7 @@ int myrank = -1;
static int nr_ranks;
/* Base port no. Ranks use +rank. */
static int BASE_PORT = 17771;
static int BASE_PORT = 27771;
/* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as
* as we need to align in memory. */
......@@ -128,14 +128,15 @@ static int toblocks(BLOCKTYPE nr_bytes) {
static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); }
/**
* @brief fill a data area with our rank.
* @brief fill a data area with given value.
*
* @param size size of data in bytes.
* @param data the data to fill.
* @param value the value to fill.
*/
static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) {
for (BLOCKTYPE i = 0; i < size; i++) {
data[i] = myrank;
data[i] = value;
}
}
......@@ -144,14 +145,15 @@ static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
*
* @param size size of data in bytes.
* @param data the data to check.
* @param rank the value to, i.e. original rank.
* @param value the value expected.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) {
static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) {
for (size_t i = 0; i < size; i++) {
if (data[i] != (BLOCKTYPE)rank) {
message("see %zd expected %d @ %zd (%zd to go)", data[i], rank, i, size);
if (data[i] != value) {
message("see %zd expected %zd @ %zd (%zd to go)", data[i], value, i,
size);
return 0;
}
}
......@@ -184,10 +186,11 @@ static void *send_thread(void *arg) {
// Get QP to the other rank. Note we cannot do this until the related
// server is up and running, so make sure that is true..
message("waiting for connection to remote server %s %d", server_ip,
BASE_PORT + myrank);
message("%d waiting for connection to remote server %s %d on %d", myrank,
server_ip, rank, BASE_PORT + myrank);
auto qp = qpFactory->connectToRemoteHost(server_ip, BASE_PORT + myrank);
message("connected to remote server %s %d", server_ip, BASE_PORT + myrank);
message("%d connected to remote server %s %d on %d", myrank, server_ip, rank,
BASE_PORT + myrank);
for (int k = 0; k < nr_send; k++) {
struct mpiuse_log_entry *log = send_queue[k];
......@@ -201,7 +204,8 @@ static void *send_thread(void *arg) {
log->data = dataptr;
/* Fill data with pattern. */
if (datacheck) datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE]);
if (datacheck)
datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE], log->tag);
/* First element has our rank, other elements replicate what we need to
* define an MPI message. */
......@@ -245,6 +249,8 @@ static void *recv_thread(void *arg) {
context->postReceiveBuffer(receiveBuffer);
// Port binding.
message("%d binding to %d on port %d", myrank, rank, BASE_PORT + rank);
fflush(stdout);
qpFactory->bindToPort(BASE_PORT + rank);
message("Blocking for first message on %d", BASE_PORT + rank);
......@@ -281,13 +287,13 @@ static void *recv_thread(void *arg) {
found = 1;
if (verbose)
message("receive message subtype %d from %d on %d", log->subtype, rank,
myrank);
message("receive message subtype %d from %d on %d", log->subtype,
rank, myrank);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck &&
!datacheck_test(toblocks(log->size), &dataptr[HEADER_SIZE], rank)) {
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], log->tag)) {
message("Data mismatch on completion");
}
......@@ -298,9 +304,11 @@ static void *recv_thread(void *arg) {
}
}
if (!found) {
error("No matching receive on connections to %d (%d of %d todo:"
" rank = %d otherrank = %d subtype = %d size = %zd tag = %d)",
BASE_PORT + rank, todo_recv, nr_recvs[rank], myrank, rank, subtype, size, tag);
error(
"No matching receive on connections to %d (%d of %d todo:"
" rank = %d otherrank = %d subtype = %d size = %zd tag = %d)",
BASE_PORT + rank, todo_recv, nr_recvs[rank], myrank, rank, subtype,
size, tag);
}
// Ready for next use of buffer?
......@@ -310,6 +318,8 @@ static void *recv_thread(void *arg) {
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
message("context use_count = %zd", context.use_count());
/* Thread exits. */
return NULL;
}
......@@ -495,10 +505,9 @@ int main(int argc, char *argv[]) {
/* Make a thread per rank, each one has a QP that connects between this rank
* and that rank. We need to start all the server threads first. */
pthread_t recvthread[nr_ranks];
int ranks[nr_ranks];
int *ranks = (int *)malloc(nr_ranks * sizeof(int));
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
starting[k] = 1;
ranks[k] = k;
if (pthread_create(&recvthread[k], NULL, &recv_thread, &ranks[k]) != 0)
error("Failed to create recv thread.");
......@@ -516,13 +525,14 @@ int main(int argc, char *argv[]) {
}
}
message("All servers are started");
// And make sure all remotes are also ready.
MPI_Barrier(MPI_COMM_WORLD);
message("All synchronized");
/* Now we have a thread per rank to send the messages. */
pthread_t sendthread[nr_ranks];
struct stuff stuff[nr_ranks];
struct stuff *stuff = (struct stuff *)malloc(nr_ranks * sizeof(struct stuff));
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
strcpy(stuff[k].server_ip, &server_ips[k * MPI_MAX_PROCESSOR_NAME]);
......@@ -552,6 +562,11 @@ int main(int argc, char *argv[]) {
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
/* Free resources. */
free(server_ips);
free(stuff);
free(ranks);
if (myrank == 0) message("Bye");
return 0;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment