Skip to content
Snippets Groups Projects
Commit 5bc5307b authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Faster recv loop, still no gains

parent 8cb761fe
Branches simplerdma
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
......@@ -129,7 +129,8 @@ static struct mpiuse_log_entry **volatile recv_queue;
*/
static size_t toranktag(int subtype, int sendrank, int recvrank, int tag) {
size_t result = subtype | sendrank << subtype_shift |
recvrank << (subtype_shift + rank_shift)| tag << (subtype_shift * 2 + rank_shift);
recvrank << (subtype_shift + rank_shift) |
tag << (subtype_shift * 2 + rank_shift);
return result;
}
......@@ -227,12 +228,13 @@ static void *send_thread(void *arg) {
new infinity::queues::QueuePairFactory(context);
// Create the QPs connecting to all the other ranks.
infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)
calloc(nr_ranks, sizeof(infinity::queues::QueuePair *));
infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)calloc(
nr_ranks, sizeof(infinity::queues::QueuePair *));
// And pointers to the remote memory.
infinity::memory::RegionToken **remoteBufferToken = (infinity::memory::RegionToken **)
calloc(nr_ranks, sizeof(infinity::memory::RegionToken *));
infinity::memory::RegionToken **remoteBufferToken =
(infinity::memory::RegionToken **)calloc(
nr_ranks, sizeof(infinity::memory::RegionToken *));
// We need to listen for messages from the other rank servers that we can
// connect to them as they need to be up first.
......@@ -261,9 +263,10 @@ static void *send_thread(void *arg) {
ip, index, BASE_PORT + myrank);
qps[index] = qpFactory->connectToRemoteHost(ip, BASE_PORT + myrank);
if (verbose)
message("%d connected to remote server %s %d on %d", myrank, ip,
index, BASE_PORT + myrank);
remoteBufferToken[index] = (infinity::memory::RegionToken *)qps[index]->getUserData();
message("%d connected to remote server %s %d on %d", myrank, ip, index,
BASE_PORT + myrank);
remoteBufferToken[index] =
(infinity::memory::RegionToken *)qps[index]->getUserData();
if (remoteBufferToken[index] == NULL) {
message("remoteBufferToken for %d is NULL", index);
} else {
......@@ -278,10 +281,11 @@ static void *send_thread(void *arg) {
/* Extract the offset lists that we use. */
int nr = 0;
int size = (max_logs / 16 + 1);
size_t *ranktags = (size_t *)malloc(size * sizeof(size_t));
size_t *offsets = (size_t *)malloc(size * sizeof(size_t));
size_t *ranktags = (size_t *)malloc(size * sizeof(size_t));
size_t *offsets = (size_t *)malloc(size * sizeof(size_t));
/* A tag that will match any subtype or tag with our rank for all the otherranks. */
/* A tag that will match any subtype or tag with our rank for all the
* otherranks. */
for (int k = 0; k < nr_ranks; k++) {
size_t matchranktag = toranktag(0, myrank, k, 0);
for (size_t j = 0; j < max_logs; j++) {
......@@ -332,7 +336,8 @@ static void *send_thread(void *arg) {
memcpy(sendBuffer->getData(), dataptr, tobytes(datasize));
/* Need to find the offset for this data in the remotes window. */
size_t ranktag = toranktag(log->subtype, log->rank, log->otherrank, log->tag);
size_t ranktag =
toranktag(log->subtype, log->rank, log->otherrank, log->tag);
log->offset = 0;
int found = 0;
......@@ -351,35 +356,41 @@ static void *send_thread(void *arg) {
}
if (verbose)
message("sending message subtype %d from %d to %d todo: %d/%d, offset %ld size %ld",
log->subtype, myrank, log->otherrank, nr_sends - k, nr_sends, log->offset, datasize);
message(
"sending message subtype %d from %d to %d todo: %d/%d, offset %ld "
"size %ld",
log->subtype, myrank, log->otherrank, nr_sends - k, nr_sends,
log->offset, datasize);
// And send
infinity::requests::RequestToken requestToken(context);
qps[log->otherrank]->write(sendBuffer,
0, // localOffset
remoteBufferToken[log->otherrank], // destination
tobytes(log->offset), // remoteOffset
tobytes(datasize), // sizeInBytes
infinity::queues::OperationFlags(), &requestToken);
qps[log->otherrank]->write(
sendBuffer,
0, // localOffset
remoteBufferToken[log->otherrank], // destination
tobytes(log->offset), // remoteOffset
tobytes(datasize), // sizeInBytes
infinity::queues::OperationFlags(), &requestToken);
requestToken.waitUntilCompleted();
requestToken.reset();
// Now we update the unlock field.
((BLOCKTYPE *)sendBuffer->getData())[0] = UNLOCKED;
qps[log->otherrank]->write(sendBuffer,
0, // localOffset
remoteBufferToken[log->otherrank], // destination
tobytes(log->offset), // remoteOffset
BYTESINBLOCK, // sizeInBytes
infinity::queues::OperationFlags(), &requestToken);
qps[log->otherrank]->write(
sendBuffer,
0, // localOffset
remoteBufferToken[log->otherrank], // destination
tobytes(log->offset), // remoteOffset
BYTESINBLOCK, // sizeInBytes
infinity::queues::OperationFlags(), &requestToken);
requestToken.waitUntilCompleted(); // Since we reuse the sendBuffer.
// requestToken.reset();
log->endtic = getticks();
}
message("took %.3f %s", clocks_from_ticks(getticks() - starttics), clocks_getunit());
message("took %.3f %s", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
for (int k = 0; k < nr_ranks; k++) delete qps[k];
free(qps);
......@@ -403,21 +414,25 @@ static void *recv_thread(void *arg) {
new infinity::queues::QueuePairFactory(context);
// Create the QPs connecting to all the other ranks.
infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)
calloc(nr_ranks, sizeof(infinity::queues::QueuePair *));
infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)calloc(
nr_ranks, sizeof(infinity::queues::QueuePair *));
// Create buffers to receive all the remote data.
infinity::memory::Buffer **bufferToReadWrite = (infinity::memory::Buffer **)
calloc(nr_ranks, sizeof(infinity::memory::Buffer *));
infinity::memory::RegionToken **bufferToken = (infinity::memory::RegionToken **)
calloc(nr_ranks, sizeof(infinity::memory::RegionToken *));
infinity::memory::Buffer **bufferToReadWrite =
(infinity::memory::Buffer **)calloc(nr_ranks,
sizeof(infinity::memory::Buffer *));
infinity::memory::RegionToken **bufferToken =
(infinity::memory::RegionToken **)calloc(
nr_ranks, sizeof(infinity::memory::RegionToken *));
for (int k = 0; k < nr_ranks; k++) {
if (ranktag_sizes[k] > 0) {
bufferToReadWrite[k] = new infinity::memory::Buffer(context, tobytes(ranktag_sizes[k]));
bufferToReadWrite[k] =
new infinity::memory::Buffer(context, tobytes(ranktag_sizes[k]));
bufferToken[k] = bufferToReadWrite[k]->createRegionToken();
} else {
// Dummy.
bufferToReadWrite[k] = new infinity::memory::Buffer(context, BYTESINBLOCK);
bufferToReadWrite[k] =
new infinity::memory::Buffer(context, BYTESINBLOCK);
bufferToken[k] = bufferToReadWrite[k]->createRegionToken();
}
}
......@@ -434,8 +449,8 @@ static void *recv_thread(void *arg) {
// Send message this port is about to block for a connection.
if (verbose) message("Blocking for first message on %d", BASE_PORT + k);
MPI_Isend(buf, 1, MPI_INT, k, myrank, MPI_COMM_WORLD, &req);
qps[k] = qpFactory->acceptIncomingConnection(bufferToken[k],
sizeof(infinity::memory::RegionToken));
qps[k] = qpFactory->acceptIncomingConnection(
bufferToken[k], sizeof(infinity::memory::RegionToken));
if (verbose)
message("Accepting incoming connections on %d", BASE_PORT + k);
}
......@@ -444,9 +459,9 @@ static void *recv_thread(void *arg) {
/* Now we wait for the remotes to connect before proceeding, otherwise the
* timings will be incorrect. */
while(starting_up)
while (starting_up)
;
/* Ignore the timing on previous part, which is fixed. */
ticks starttics = getticks();
......@@ -456,51 +471,59 @@ static void *recv_thread(void *arg) {
while (todo_recv > 0) {
/* Loop over remaining messages, checking if any have been unlocked. */
for (int k = 0; k < nr_recvs; k++) {
int k = 0;
while (k < todo_recv) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done) {
/* On the first attempt we start listening for this receive. */
if (log->injtic == 0) log->injtic = getticks();
/* Get find the data for this message. */
BLOCKTYPE *dataptr =
&((BLOCKTYPE *)bufferToReadWrite[log->otherrank]->getData())[log->offset];
/* Check if this has been unlocked. */
BLOCKTYPE volatile lock = dataptr[0];
if (lock == UNLOCKED) {
// XXX should check these for correctness.
// int subtype = dataptr[1];
// size_t size = dataptr[2];
// int tag = dataptr[3];
if (verbose)
message(
"receive message subtype %d from %d to %d todo: %d/%d,"
" offset %ld size %ld",
log->subtype, myrank, log->otherrank, todo_recv, nr_recvs,
log->offset, toblocks(log->size) + HEADER_SIZE);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], log->tag)) {
message("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
todo_recv--;
/* On the first attempt we start listening for this receive. */
if (log->injtic == 0) log->injtic = getticks();
/* Get find the data for this message. */
BLOCKTYPE *dataptr = &((BLOCKTYPE *)bufferToReadWrite[log->otherrank]
->getData())[log->offset];
/* Check if this has been unlocked. */
BLOCKTYPE volatile lock = dataptr[0];
if (lock == UNLOCKED) {
// XXX should check these for correctness.
// int subtype = dataptr[1];
// size_t size = dataptr[2];
// int tag = dataptr[3];
if (verbose)
message(
"receive message subtype %d from %d to %d todo: %d/%d,"
" offset %ld size %ld",
log->subtype, myrank, log->otherrank, todo_recv, nr_recvs,
log->offset, toblocks(log->size) + HEADER_SIZE);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], log->tag)) {
message("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
todo_recv--;
/* And swap with the last log if we can. */
if (k < todo_recv) {
recv_queue[k] = recv_queue[todo_recv];
/* Do this log again, we wanted to check it this loop. */
k--;
}
}
k++;
}
}
message("took %.3f %s", clocks_from_ticks(getticks() - starttics), clocks_getunit());
message("took %.3f %s", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
for (int k = 0; k < nr_ranks; k++) {
delete qps[k];
......@@ -536,9 +559,11 @@ static size_t pick_logs() {
size_t maxsize = 0;
/* Queues of send and receive logs. */
send_queue = (struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *));
send_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends = 0;
recv_queue = (struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *));
recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recvs = 0;
for (size_t k = 0; k < nlogs; k++) {
......@@ -549,7 +574,8 @@ static size_t pick_logs() {
log->injtic = 0;
log->endtic = 0;
log->data = NULL;
log->ranktag = toranktag(log->subtype, log->otherrank, log->rank, log->tag);
log->ranktag =
toranktag(log->subtype, log->otherrank, log->rank, log->tag);
/* Scale size. */
log->size *= messagescale;
......@@ -586,10 +612,10 @@ static size_t pick_logs() {
* of the windows. */
for (int k = 0; k < nr_recvs; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
ranktag_lists[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank,
k)] = log->ranktag;
ranktag_offsets[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank,
k)] = ranktag_sizes[log->otherrank];
ranktag_lists[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, k)] =
log->ranktag;
ranktag_offsets[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, k)] =
ranktag_sizes[log->otherrank];
log->offset = ranktag_sizes[log->otherrank];
/* Need to use a multiple of blocks to keep alignment. */
......@@ -694,9 +720,8 @@ int main(int argc, char *argv[]) {
/* We need to share all the offsets for each communicator with all the other
* ranks so they can push data into the correct parts of our receive
* window. */
MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets,
nr_ranks * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets, nr_ranks * nr_ranks * max_logs,
MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, ranktag_lists, nr_ranks * nr_ranks * max_logs,
MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
......@@ -711,8 +736,7 @@ int main(int argc, char *argv[]) {
/* And distribute, so we all know everyone's IPs. */
struct servers servers;
servers.ip =
(char *)malloc(sizeof(char) * nr_ranks * MPI_MAX_PROCESSOR_NAME);
servers.ip = (char *)malloc(sizeof(char) * nr_ranks * MPI_MAX_PROCESSOR_NAME);
MPI_Allgather(ip, MPI_MAX_PROCESSOR_NAME, MPI_BYTE, servers.ip,
MPI_MAX_PROCESSOR_NAME, MPI_BYTE, MPI_COMM_WORLD);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment