Skip to content
Snippets Groups Projects
Commit ace196f8 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Formatting

parent 1c2f92dd
No related branches found
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
......@@ -99,7 +99,7 @@ static size_t max_logs = 0;
/* Offsets of the ranktag regions within the receive windows and lists of the
* assocated tags. */
static size_t ranktag_sizes[MAX_NR_RANKS] = {0};
//static size_t *ranktag_counts;
// static size_t *ranktag_counts;
static size_t *ranktag_offsets;
static size_t *ranktag_lists;
......@@ -167,7 +167,9 @@ static BLOCKTYPE toblocks(BLOCKTYPE nr_bytes) {
*
* @result the number of bytes.
*/
static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) { return (nr_blocks * BYTESINBLOCK); }
static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) {
return (nr_blocks * BYTESINBLOCK);
}
/**
* @brief fill a data area with given value.
......@@ -223,15 +225,14 @@ static void *send_thread(void *arg) {
/* Short circuit. */
if (nr_sends[rank] == 0) {
if (verbose)
message("took 0.0 %s.", clocks_getunit());
if (verbose) message("took 0.0 %s.", clocks_getunit());
return NULL;
}
// Need a factory to create QP.
infinity::core::Context *context = new infinity::core::Context();
infinity::queues::QueuePairFactory *qpFactory =
new infinity::queues::QueuePairFactory(context);
new infinity::queues::QueuePairFactory(context);
// Get QP to the other rank. Note we cannot do this until the related
// server is up and running, so make sure that is true..
......@@ -240,14 +241,13 @@ static void *send_thread(void *arg) {
server_ip, rank, BASE_PORT + myrank);
auto *qp = qpFactory->connectToRemoteHost(server_ip, BASE_PORT + myrank);
if (verbose)
message("%d connected to remote server %s %d on %d", myrank, server_ip, rank,
BASE_PORT + myrank);
auto *remoteBufferToken = (infinity::memory::RegionToken *) qp->getUserData();
message("%d connected to remote server %s %d on %d", myrank, server_ip,
rank, BASE_PORT + myrank);
auto *remoteBufferToken = (infinity::memory::RegionToken *)qp->getUserData();
/* Register some memory for use by RDMA, make it large enough for our
* biggest message. */
auto *sendBuffer = new infinity::memory::Buffer(context,
tobytes(max_size));
auto *sendBuffer = new infinity::memory::Buffer(context, tobytes(max_size));
/* Queue of our sends. */
struct mpiuse_log_entry **send_queue = sends_queue[rank];
......@@ -278,17 +278,17 @@ static void *send_thread(void *arg) {
* subtype, tag and rank. Need to search the ranktag_lists for our ranktag
* value. XXX bisection search if sorted? XXX */
size_t ranktag = toranktag(log->subtype, log->otherrank, log->tag);
//size_t counts = ranktag_counts[INDEX2(MAX_NR_RANKS, myrank,
// size_t counts = ranktag_counts[INDEX2(MAX_NR_RANKS, myrank,
// log->otherrank)];
log->offset = 0;
int found = 0;
size_t counts = max_logs; // We search all of ranktag_lists...
size_t counts = max_logs; // We search all of ranktag_lists...
for (size_t j = 0; j < counts; j++) {
if (ranktag_lists[INDEX3(MAX_NR_RANKS, nr_ranks, log->rank,
log->otherrank, j)] == ranktag) {
log->offset = ranktag_offsets[INDEX3(MAX_NR_RANKS, nr_ranks,
log->rank, log->otherrank, j)];
log->offset = ranktag_offsets[INDEX3(MAX_NR_RANKS, nr_ranks, log->rank,
log->otherrank, j)];
found = 1;
break;
}
......@@ -297,14 +297,15 @@ static void *send_thread(void *arg) {
error(
"Failed sending a message of size %zd to %d/%d "
"@ %zd\n, no offset found for ranktag %zd, counts = %zd",
datasize, log->otherrank, log->subtype, log->offset, ranktag,
counts);
datasize, log->otherrank, log->subtype, log->offset, ranktag, counts);
}
if (verbose)
message("sending message subtype %d from %d to %d todo: %d/%d, offset %ld size %ld",
log->subtype, myrank, log->otherrank, nr_sends[rank] - k,
nr_sends[rank], log->offset, datasize);
message(
"sending message subtype %d from %d to %d todo: %d/%d, offset %ld "
"size %ld",
log->subtype, myrank, log->otherrank, nr_sends[rank] - k,
nr_sends[rank], log->offset, datasize);
// And send
infinity::requests::RequestToken requestToken(context);
......@@ -313,32 +314,29 @@ static void *send_thread(void *arg) {
remoteBufferToken, // destination
tobytes(log->offset), // remoteOffset
tobytes(datasize), // sizeInBytes
infinity::queues::OperationFlags(),
&requestToken);
infinity::queues::OperationFlags(), &requestToken);
requestToken.waitUntilCompleted();
requestToken.reset();
// Now we update the unlock field.
((BLOCKTYPE *)sendBuffer->getData())[0] = UNLOCKED;
qp->write(sendBuffer,
0, // localOffset
remoteBufferToken, // destination
tobytes(log->offset),// remoteOffset
BYTESINBLOCK, // sizeInBytes
infinity::queues::OperationFlags(),
&requestToken);
0, // localOffset
remoteBufferToken, // destination
tobytes(log->offset), // remoteOffset
BYTESINBLOCK, // sizeInBytes
infinity::queues::OperationFlags(), &requestToken);
requestToken.waitUntilCompleted(); // Since we reuse the sendBuffer.
//requestToken.reset();
// requestToken.reset();
log->endtic = getticks();
}
delete sendBuffer;
if (verbose)
message("sent %d to %d", nr_sends[rank], rank);
message("took %.3f %s sending to rank %d", clocks_from_ticks(getticks() - starttics),
clocks_getunit(), rank);
if (verbose) message("sent %d to %d", nr_sends[rank], rank);
message("took %.3f %s sending to rank %d",
clocks_from_ticks(getticks() - starttics), clocks_getunit(), rank);
delete sendBuffer;
delete qp;
delete qpFactory;
delete context;
......@@ -357,8 +355,7 @@ static void *recv_thread(void *arg) {
int todo_recv = nr_recvs[rank];
int nr_recv = nr_recvs[rank];
if (todo_recv == 0) {
if (verbose)
message("took 0.0 %s.", clocks_getunit());
if (verbose) message("took 0.0 %s.", clocks_getunit());
starting[rank] = 0; // Also fudge.
return NULL;
}
......@@ -369,9 +366,9 @@ static void *recv_thread(void *arg) {
/* Buffer to receive all the remote data.*/
auto *bufferToReadWrite =
new infinity::memory::Buffer(context, tobytes(ranktag_sizes[rank]));
new infinity::memory::Buffer(context, tobytes(ranktag_sizes[rank]));
infinity::memory::RegionToken *bufferToken =
bufferToReadWrite->createRegionToken();
bufferToReadWrite->createRegionToken();
// Port binding.
if (verbose) {
......@@ -380,11 +377,10 @@ static void *recv_thread(void *arg) {
}
qpFactory->bindToPort(BASE_PORT + rank);
if (verbose)
message("Blocking for first message on %d", BASE_PORT + rank);
if (verbose) message("Blocking for first message on %d", BASE_PORT + rank);
starting[rank] = 0; // really need to do this in acceptIncomingConnection().
auto qp = qpFactory->acceptIncomingConnection
(bufferToken, sizeof(infinity::memory::RegionToken));
auto qp = qpFactory->acceptIncomingConnection(
bufferToken, sizeof(infinity::memory::RegionToken));
if (verbose)
message("Accepting incoming connections on %d", BASE_PORT + rank);
......@@ -407,8 +403,8 @@ static void *recv_thread(void *arg) {
if (log->injtic == 0) log->injtic = getticks();
/* Get find the data for this message. */
BLOCKTYPE * dataptr =
&((BLOCKTYPE *)bufferToReadWrite->getData())[log->offset];
BLOCKTYPE *dataptr =
&((BLOCKTYPE *)bufferToReadWrite->getData())[log->offset];
/* Check if this has been unlocked. */
BLOCKTYPE volatile lock = dataptr[0];
......@@ -416,15 +412,16 @@ static void *recv_thread(void *arg) {
if (lock == UNLOCKED) {
// XXX should check these for correctness.
//int subtype = dataptr[1];
//size_t size = dataptr[2];
//int tag = dataptr[3];
// int subtype = dataptr[1];
// size_t size = dataptr[2];
// int tag = dataptr[3];
if (verbose)
message("receive message subtype %d from %d to %d todo: %d/%d,"
" offset %ld size %ld",
log->subtype, myrank, log->otherrank, todo_recv, nr_recv,
log->offset, toblocks(log->size) + HEADER_SIZE);
message(
"receive message subtype %d from %d to %d todo: %d/%d,"
" offset %ld size %ld",
log->subtype, myrank, log->otherrank, todo_recv, nr_recv,
log->offset, toblocks(log->size) + HEADER_SIZE);
/* Check data sent data is unchanged and received data is as
* expected. */
......@@ -442,13 +439,14 @@ static void *recv_thread(void *arg) {
}
}
message("took %.3f %s listening for rank %d", clocks_from_ticks(getticks() - starttics),
clocks_getunit(), rank);
message("took %.3f %s listening for rank %d",
clocks_from_ticks(getticks() - starttics), clocks_getunit(), rank);
delete qp;
// XXX Let this leak, tearing it down can be premature for the remote.
// delete qp;
qp->getLocalDeviceId(); // Stop compiler complaining.
delete qpFactory;
/* Thread exits. */
return NULL;
}
......@@ -474,11 +472,11 @@ static size_t pick_logs() {
size_t maxsize = 0;
/* Queues of send and receive logs. */
struct mpiuse_log_entry **send_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
struct mpiuse_log_entry **send_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
int nr_send = 0;
struct mpiuse_log_entry **recv_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
struct mpiuse_log_entry **recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
int nr_recv = 0;
for (size_t k = 0; k < nlogs; k++) {
......@@ -533,8 +531,8 @@ static size_t pick_logs() {
/* Recvs */
if (nr_recvs[k] > 0) {
recvs_queue[k] = (struct mpiuse_log_entry **)
calloc(nr_recvs[k], sizeof(struct mpiuse_log_entry *));
recvs_queue[k] = (struct mpiuse_log_entry **)calloc(
nr_recvs[k], sizeof(struct mpiuse_log_entry *));
int i = 0;
for (int j = 0; j < nr_recv; j++) {
struct mpiuse_log_entry *log = recv_queue[j];
......@@ -543,15 +541,14 @@ static size_t pick_logs() {
i++;
}
}
}
else {
} else {
recvs_queue[k] = NULL;
}
/* Sends. */
if (nr_sends[k] > 0) {
sends_queue[k] = (struct mpiuse_log_entry **)
calloc(nr_sends[k], sizeof(struct mpiuse_log_entry *));
sends_queue[k] = (struct mpiuse_log_entry **)calloc(
nr_sends[k], sizeof(struct mpiuse_log_entry *));
int i = 0;
for (int j = 0; j < nr_send; j++) {
struct mpiuse_log_entry *log = send_queue[j];
......@@ -560,18 +557,17 @@ static size_t pick_logs() {
i++;
}
}
}
else {
} else {
sends_queue[k] = NULL;
}
}
/* Offsets and ranktags. */
ranktag_offsets = (size_t *)
calloc(MAX_NR_RANKS * nr_ranks * max_logs, sizeof(size_t));
ranktag_lists = (size_t *)
calloc(MAX_NR_RANKS * nr_ranks * max_logs, sizeof(size_t));
//ranktag_counts = (size_t *)calloc(MAX_NR_RANKS * nr_ranks, sizeof(size_t));
ranktag_offsets =
(size_t *)calloc(MAX_NR_RANKS * nr_ranks * max_logs, sizeof(size_t));
ranktag_lists =
(size_t *)calloc(MAX_NR_RANKS * nr_ranks * max_logs, sizeof(size_t));
// ranktag_counts = (size_t *)calloc(MAX_NR_RANKS * nr_ranks, sizeof(size_t));
/* Setup the ranktag offsets for our receive windows. Also define the sizes
* of the windows. XXX note these are over the complete lists not the
......@@ -588,7 +584,7 @@ static size_t pick_logs() {
size_t size = toblocks(log->size) + HEADER_SIZE;
ranktag_sizes[log->otherrank] += size;
//ranktag_counts[INDEX2(MAX_NR_RANKS, log->otherrank, myrank)]++;
// ranktag_counts[INDEX2(MAX_NR_RANKS, log->otherrank, myrank)]++;
}
free(recv_queue);
......@@ -690,11 +686,10 @@ int main(int argc, char *argv[]) {
MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets,
MAX_NR_RANKS * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
MPI_COMM_WORLD);
//MPI_Allreduce(MPI_IN_PLACE, ranktag_counts, MAX_NR_RANKS * nr_ranks,
// MPI_Allreduce(MPI_IN_PLACE, ranktag_counts, MAX_NR_RANKS * nr_ranks,
// MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, ranktag_lists,
MAX_NR_RANKS * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, ranktag_lists, MAX_NR_RANKS * nr_ranks * max_logs,
MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
/* Now for the RDMA setup. We need the IP addresses of all the ranks. */
......@@ -761,9 +756,8 @@ int main(int argc, char *argv[]) {
if (myrank == 0) message("All RDMA servers are started");
/* Reset time as previous can be thought of as setup costs? */
MPI_Barrier(MPI_COMM_WORLD); // Vital...
if (myrank == 0)
message("All synchronized, restarting time.");
MPI_Barrier(MPI_COMM_WORLD); // Vital...
if (myrank == 0) message("All synchronized, restarting time.");
clocks_set_cpufreq(0);
/* Now we have a thread per rank to send the messages. */
......
......@@ -127,7 +127,9 @@ static int toblocks(BLOCKTYPE nr_bytes) {
*
* @result the number of bytes.
*/
static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) { return (nr_blocks * BYTESINBLOCK); }
static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) {
return (nr_blocks * BYTESINBLOCK);
}
/**
* @brief fill a data area with given value.
......@@ -184,14 +186,15 @@ static void *send_thread(void *arg) {
// Need a factory to create QP.
infinity::core::Context *context = new infinity::core::Context();
infinity::queues::QueuePairFactory *qpFactory =
new infinity::queues::QueuePairFactory(context);
new infinity::queues::QueuePairFactory(context);
// Get QP to the other rank. Note we cannot do this until the related
// server is up and running, so make sure that is true..
//message("%d waiting for connection to remote server %s %d on %d", myrank,
// message("%d waiting for connection to remote server %s %d on %d", myrank,
// server_ip, rank, BASE_PORT + myrank);
auto *qp = qpFactory->connectToRemoteHost(server_ip, BASE_PORT + myrank);
//message("%d connected to remote server %s %d on %d", myrank, server_ip, rank,
// message("%d connected to remote server %s %d on %d", myrank, server_ip,
// rank,
// BASE_PORT + myrank);
for (int k = 0; k < nr_send; k++) {
......@@ -218,8 +221,8 @@ static void *send_thread(void *arg) {
dataptr[3] = log->tag;
/* Need to assign to a buffer to register memory. */
auto *sendBuffer = new infinity::memory::Buffer(context, dataptr,
tobytes(datasize));
auto *sendBuffer =
new infinity::memory::Buffer(context, dataptr, tobytes(datasize));
// And send
infinity::requests::RequestToken requestToken(context);
......@@ -227,7 +230,7 @@ static void *send_thread(void *arg) {
requestToken.waitUntilCompleted();
log->endtic = getticks();
delete sendBuffer;// XXX Can we reuse ?
delete sendBuffer; // XXX Can we reuse ?
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
......@@ -255,7 +258,7 @@ static void *recv_thread(void *arg) {
// Create buffer to receive messages. This is big enough for the largest
// message.
auto *receiveBuffer =
new infinity::memory::Buffer(context, tobytes(MESSAGE_SIZE));
new infinity::memory::Buffer(context, tobytes(MESSAGE_SIZE));
context->postReceiveBuffer(receiveBuffer);
// Port binding.
......@@ -263,8 +266,7 @@ static void *recv_thread(void *arg) {
message("%d binding to %d on port %d", myrank, rank, BASE_PORT + rank);
qpFactory->bindToPort(BASE_PORT + rank);
if (verbose)
message("Blocking for first message on %d", BASE_PORT + rank);
if (verbose) message("Blocking for first message on %d", BASE_PORT + rank);
starting[rank] = 0; // really need to do this in acceptIncomingConnection().
auto qp = qpFactory->acceptIncomingConnection();
if (verbose)
......@@ -365,11 +367,11 @@ static size_t pick_logs() {
size_t maxsize = 0;
/* Queues of send and receive logs. */
send_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
send_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_send = 0;
struct mpiuse_log_entry **recv_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
struct mpiuse_log_entry **recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
int nr_recv = 0;
for (size_t k = 0; k < nlogs; k++) {
......@@ -413,8 +415,8 @@ static size_t pick_logs() {
}
for (int k = 0; k < nr_ranks; k++) {
if (nr_recvs[k] > 0) {
recvs_queue[k] = (struct mpiuse_log_entry **)
calloc(nr_recvs[k], sizeof(struct mpiuse_log_entry *));
recvs_queue[k] = (struct mpiuse_log_entry **)calloc(
nr_recvs[k], sizeof(struct mpiuse_log_entry *));
int i = 0;
for (int j = 0; j < nr_recv; j++) {
struct mpiuse_log_entry *log = recv_queue[j];
......@@ -423,14 +425,12 @@ static size_t pick_logs() {
i++;
}
}
}
else {
} else {
recvs_queue[k] = NULL;
}
}
free(recv_queue);
if (verbose) {
message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
nr_recv);
......@@ -576,7 +576,7 @@ int main(int argc, char *argv[]) {
message("All servers are started");
/* Reset time as previous can be thought of as setup costs? */
MPI_Barrier(MPI_COMM_WORLD); // Vital...
MPI_Barrier(MPI_COMM_WORLD); // Vital...
message("All synchronized");
clocks_set_cpufreq(0);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment