diff --git a/swiftmpirdmaonestepsim2.c b/swiftmpirdmaonestepsim2.c index cb590ea4e44490742f30fea65f269b66f9245692..5799992204f8eba05dfb60744cf6cefab08138d3 100644 --- a/swiftmpirdmaonestepsim2.c +++ b/swiftmpirdmaonestepsim2.c @@ -129,7 +129,8 @@ static struct mpiuse_log_entry **volatile recv_queue; */ static size_t toranktag(int subtype, int sendrank, int recvrank, int tag) { size_t result = subtype | sendrank << subtype_shift | - recvrank << (subtype_shift + rank_shift)| tag << (subtype_shift * 2 + rank_shift); + recvrank << (subtype_shift + rank_shift) | + tag << (subtype_shift * 2 + rank_shift); return result; } @@ -227,12 +228,13 @@ static void *send_thread(void *arg) { new infinity::queues::QueuePairFactory(context); // Create the QPs connecting to all the other ranks. - infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **) - calloc(nr_ranks, sizeof(infinity::queues::QueuePair *)); + infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)calloc( + nr_ranks, sizeof(infinity::queues::QueuePair *)); // And pointers to the remote memory. - infinity::memory::RegionToken **remoteBufferToken = (infinity::memory::RegionToken **) - calloc(nr_ranks, sizeof(infinity::memory::RegionToken *)); + infinity::memory::RegionToken **remoteBufferToken = + (infinity::memory::RegionToken **)calloc( + nr_ranks, sizeof(infinity::memory::RegionToken *)); // We need to listen for messages from the other rank servers that we can // connect to them as they need to be up first. @@ -261,9 +263,10 @@ static void *send_thread(void *arg) { ip, index, BASE_PORT + myrank); qps[index] = qpFactory->connectToRemoteHost(ip, BASE_PORT + myrank); if (verbose) - message("%d connected to remote server %s %d on %d", myrank, ip, - index, BASE_PORT + myrank); - remoteBufferToken[index] = (infinity::memory::RegionToken *)qps[index]->getUserData(); + message("%d connected to remote server %s %d on %d", myrank, ip, index, + BASE_PORT + myrank); + remoteBufferToken[index] = + (infinity::memory::RegionToken *)qps[index]->getUserData(); if (remoteBufferToken[index] == NULL) { message("remoteBufferToken for %d is NULL", index); } else { @@ -278,10 +281,11 @@ static void *send_thread(void *arg) { /* Extract the offset lists that we use. */ int nr = 0; int size = (max_logs / 16 + 1); - size_t *ranktags = (size_t *)malloc(size * sizeof(size_t)); - size_t *offsets = (size_t *)malloc(size * sizeof(size_t)); + size_t *ranktags = (size_t *)malloc(size * sizeof(size_t)); + size_t *offsets = (size_t *)malloc(size * sizeof(size_t)); - /* A tag that will match any subtype or tag with our rank for all the otherranks. */ + /* A tag that will match any subtype or tag with our rank for all the + * otherranks. */ for (int k = 0; k < nr_ranks; k++) { size_t matchranktag = toranktag(0, myrank, k, 0); for (size_t j = 0; j < max_logs; j++) { @@ -332,7 +336,8 @@ static void *send_thread(void *arg) { memcpy(sendBuffer->getData(), dataptr, tobytes(datasize)); /* Need to find the offset for this data in the remotes window. */ - size_t ranktag = toranktag(log->subtype, log->rank, log->otherrank, log->tag); + size_t ranktag = + toranktag(log->subtype, log->rank, log->otherrank, log->tag); log->offset = 0; int found = 0; @@ -351,35 +356,41 @@ static void *send_thread(void *arg) { } if (verbose) - message("sending message subtype %d from %d to %d todo: %d/%d, offset %ld size %ld", - log->subtype, myrank, log->otherrank, nr_sends - k, nr_sends, log->offset, datasize); - + message( + "sending message subtype %d from %d to %d todo: %d/%d, offset %ld " + "size %ld", + log->subtype, myrank, log->otherrank, nr_sends - k, nr_sends, + log->offset, datasize); + // And send infinity::requests::RequestToken requestToken(context); - qps[log->otherrank]->write(sendBuffer, - 0, // localOffset - remoteBufferToken[log->otherrank], // destination - tobytes(log->offset), // remoteOffset - tobytes(datasize), // sizeInBytes - infinity::queues::OperationFlags(), &requestToken); + qps[log->otherrank]->write( + sendBuffer, + 0, // localOffset + remoteBufferToken[log->otherrank], // destination + tobytes(log->offset), // remoteOffset + tobytes(datasize), // sizeInBytes + infinity::queues::OperationFlags(), &requestToken); requestToken.waitUntilCompleted(); requestToken.reset(); // Now we update the unlock field. ((BLOCKTYPE *)sendBuffer->getData())[0] = UNLOCKED; - qps[log->otherrank]->write(sendBuffer, - 0, // localOffset - remoteBufferToken[log->otherrank], // destination - tobytes(log->offset), // remoteOffset - BYTESINBLOCK, // sizeInBytes - infinity::queues::OperationFlags(), &requestToken); + qps[log->otherrank]->write( + sendBuffer, + 0, // localOffset + remoteBufferToken[log->otherrank], // destination + tobytes(log->offset), // remoteOffset + BYTESINBLOCK, // sizeInBytes + infinity::queues::OperationFlags(), &requestToken); requestToken.waitUntilCompleted(); // Since we reuse the sendBuffer. // requestToken.reset(); log->endtic = getticks(); } - message("took %.3f %s", clocks_from_ticks(getticks() - starttics), clocks_getunit()); + message("took %.3f %s", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); for (int k = 0; k < nr_ranks; k++) delete qps[k]; free(qps); @@ -403,21 +414,25 @@ static void *recv_thread(void *arg) { new infinity::queues::QueuePairFactory(context); // Create the QPs connecting to all the other ranks. - infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **) - calloc(nr_ranks, sizeof(infinity::queues::QueuePair *)); + infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)calloc( + nr_ranks, sizeof(infinity::queues::QueuePair *)); // Create buffers to receive all the remote data. - infinity::memory::Buffer **bufferToReadWrite = (infinity::memory::Buffer **) - calloc(nr_ranks, sizeof(infinity::memory::Buffer *)); - infinity::memory::RegionToken **bufferToken = (infinity::memory::RegionToken **) - calloc(nr_ranks, sizeof(infinity::memory::RegionToken *)); + infinity::memory::Buffer **bufferToReadWrite = + (infinity::memory::Buffer **)calloc(nr_ranks, + sizeof(infinity::memory::Buffer *)); + infinity::memory::RegionToken **bufferToken = + (infinity::memory::RegionToken **)calloc( + nr_ranks, sizeof(infinity::memory::RegionToken *)); for (int k = 0; k < nr_ranks; k++) { if (ranktag_sizes[k] > 0) { - bufferToReadWrite[k] = new infinity::memory::Buffer(context, tobytes(ranktag_sizes[k])); + bufferToReadWrite[k] = + new infinity::memory::Buffer(context, tobytes(ranktag_sizes[k])); bufferToken[k] = bufferToReadWrite[k]->createRegionToken(); } else { // Dummy. - bufferToReadWrite[k] = new infinity::memory::Buffer(context, BYTESINBLOCK); + bufferToReadWrite[k] = + new infinity::memory::Buffer(context, BYTESINBLOCK); bufferToken[k] = bufferToReadWrite[k]->createRegionToken(); } } @@ -434,8 +449,8 @@ static void *recv_thread(void *arg) { // Send message this port is about to block for a connection. if (verbose) message("Blocking for first message on %d", BASE_PORT + k); MPI_Isend(buf, 1, MPI_INT, k, myrank, MPI_COMM_WORLD, &req); - qps[k] = qpFactory->acceptIncomingConnection(bufferToken[k], - sizeof(infinity::memory::RegionToken)); + qps[k] = qpFactory->acceptIncomingConnection( + bufferToken[k], sizeof(infinity::memory::RegionToken)); if (verbose) message("Accepting incoming connections on %d", BASE_PORT + k); } @@ -444,9 +459,9 @@ static void *recv_thread(void *arg) { /* Now we wait for the remotes to connect before proceeding, otherwise the * timings will be incorrect. */ - while(starting_up) + while (starting_up) ; - + /* Ignore the timing on previous part, which is fixed. */ ticks starttics = getticks(); @@ -456,51 +471,59 @@ static void *recv_thread(void *arg) { while (todo_recv > 0) { /* Loop over remaining messages, checking if any have been unlocked. */ - for (int k = 0; k < nr_recvs; k++) { + int k = 0; + while (k < todo_recv) { struct mpiuse_log_entry *log = recv_queue[k]; - if (log != NULL && !log->done) { - - /* On the first attempt we start listening for this receive. */ - if (log->injtic == 0) log->injtic = getticks(); - - /* Get find the data for this message. */ - BLOCKTYPE *dataptr = - &((BLOCKTYPE *)bufferToReadWrite[log->otherrank]->getData())[log->offset]; - - /* Check if this has been unlocked. */ - BLOCKTYPE volatile lock = dataptr[0]; - - if (lock == UNLOCKED) { - - // XXX should check these for correctness. - // int subtype = dataptr[1]; - // size_t size = dataptr[2]; - // int tag = dataptr[3]; - - if (verbose) - message( - "receive message subtype %d from %d to %d todo: %d/%d," - " offset %ld size %ld", - log->subtype, myrank, log->otherrank, todo_recv, nr_recvs, - log->offset, toblocks(log->size) + HEADER_SIZE); - - /* Check data sent data is unchanged and received data is as - * expected. */ - if (datacheck && !datacheck_test(toblocks(log->size), - &dataptr[HEADER_SIZE], log->tag)) { - message("Data mismatch on completion"); - } - - /* Done, clean up. */ - log->done = 1; - log->endtic = getticks(); - todo_recv--; + + /* On the first attempt we start listening for this receive. */ + if (log->injtic == 0) log->injtic = getticks(); + + /* Get find the data for this message. */ + BLOCKTYPE *dataptr = &((BLOCKTYPE *)bufferToReadWrite[log->otherrank] + ->getData())[log->offset]; + + /* Check if this has been unlocked. */ + BLOCKTYPE volatile lock = dataptr[0]; + + if (lock == UNLOCKED) { + // XXX should check these for correctness. + // int subtype = dataptr[1]; + // size_t size = dataptr[2]; + // int tag = dataptr[3]; + + if (verbose) + message( + "receive message subtype %d from %d to %d todo: %d/%d," + " offset %ld size %ld", + log->subtype, myrank, log->otherrank, todo_recv, nr_recvs, + log->offset, toblocks(log->size) + HEADER_SIZE); + + /* Check data sent data is unchanged and received data is as + * expected. */ + if (datacheck && !datacheck_test(toblocks(log->size), + &dataptr[HEADER_SIZE], log->tag)) { + message("Data mismatch on completion"); + } + + /* Done, clean up. */ + log->done = 1; + log->endtic = getticks(); + todo_recv--; + + /* And swap with the last log if we can. */ + if (k < todo_recv) { + recv_queue[k] = recv_queue[todo_recv]; + + /* Do this log again, we wanted to check it this loop. */ + k--; } } + k++; } } - message("took %.3f %s", clocks_from_ticks(getticks() - starttics), clocks_getunit()); + message("took %.3f %s", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); for (int k = 0; k < nr_ranks; k++) { delete qps[k]; @@ -536,9 +559,11 @@ static size_t pick_logs() { size_t maxsize = 0; /* Queues of send and receive logs. */ - send_queue = (struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *)); + send_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); nr_sends = 0; - recv_queue = (struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *)); + recv_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); nr_recvs = 0; for (size_t k = 0; k < nlogs; k++) { @@ -549,7 +574,8 @@ static size_t pick_logs() { log->injtic = 0; log->endtic = 0; log->data = NULL; - log->ranktag = toranktag(log->subtype, log->otherrank, log->rank, log->tag); + log->ranktag = + toranktag(log->subtype, log->otherrank, log->rank, log->tag); /* Scale size. */ log->size *= messagescale; @@ -586,10 +612,10 @@ static size_t pick_logs() { * of the windows. */ for (int k = 0; k < nr_recvs; k++) { struct mpiuse_log_entry *log = recv_queue[k]; - ranktag_lists[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, - k)] = log->ranktag; - ranktag_offsets[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, - k)] = ranktag_sizes[log->otherrank]; + ranktag_lists[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, k)] = + log->ranktag; + ranktag_offsets[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, k)] = + ranktag_sizes[log->otherrank]; log->offset = ranktag_sizes[log->otherrank]; /* Need to use a multiple of blocks to keep alignment. */ @@ -694,9 +720,8 @@ int main(int argc, char *argv[]) { /* We need to share all the offsets for each communicator with all the other * ranks so they can push data into the correct parts of our receive * window. */ - MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets, - nr_ranks * nr_ranks * max_logs, MPI_AINT, MPI_SUM, - MPI_COMM_WORLD); + MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets, nr_ranks * nr_ranks * max_logs, + MPI_AINT, MPI_SUM, MPI_COMM_WORLD); MPI_Allreduce(MPI_IN_PLACE, ranktag_lists, nr_ranks * nr_ranks * max_logs, MPI_AINT, MPI_SUM, MPI_COMM_WORLD); @@ -711,8 +736,7 @@ int main(int argc, char *argv[]) { /* And distribute, so we all know everyone's IPs. */ struct servers servers; - servers.ip = - (char *)malloc(sizeof(char) * nr_ranks * MPI_MAX_PROCESSOR_NAME); + servers.ip = (char *)malloc(sizeof(char) * nr_ranks * MPI_MAX_PROCESSOR_NAME); MPI_Allgather(ip, MPI_MAX_PROCESSOR_NAME, MPI_BYTE, servers.ip, MPI_MAX_PROCESSOR_NAME, MPI_BYTE, MPI_COMM_WORLD);