Skip to content
Snippets Groups Projects
Commit e10ad14e authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Capture RDMA write version, runs the tests, but same speed as before. Issue...

Capture RDMA write version, runs the tests, but same speed as before. Issue with unique tags to sort out, now using subtype instead of rank, is that always correct?
parent 2d26e28a
No related branches found
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
......@@ -83,6 +83,9 @@ struct mpiuse_log_entry {
/* Minimum time in a test. */
ticks tmin;
/* Rank and tag as single index. */
size_t ranktag;
/* Insertion index. */
size_t offset;
};
......
......@@ -18,6 +18,7 @@
******************************************************************************/
// Pure RDMA version, we use MPI for process control and synchronization.
// Write variant, attempting to use eager-like one-sided sends.
#include <arpa/inet.h>
#include <limits.h>
......@@ -41,27 +42,31 @@
#include "error.h"
#include "mpiuse.h"
/* Maximum no. of ranks. XXX dynamic please XXX */
#define MAX_NR_RANKS 16
/* Our rank for all to see. */
int myrank = -1;
/* Number of ranks. */
static int nr_ranks;
/* Base port no. Ranks use +rank. */
/* Base port no. Ranks use +rank. XXX argument, autodiscovery? XXX */
static int BASE_PORT = 27771;
/* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as
* as we need to align in memory. */
/* Size of a block of memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Size of message header in blocks. The rank, subtype, size and tag. */
/* Flags for controlling access. High end of size_t. */
static size_t UNLOCKED = (((size_t)2 << 63) - 1);
/* Size of message header in blocks. The lock/rank, subtype, size and tag. */
static const size_t HEADER_SIZE = 4;
/* Size of a message board in blocks, we have one of these per rank per
* communicator (i.e. per window). */
static size_t MESSAGE_SIZE = 0;
/* Maximum Size of a message in blocks. */
static size_t max_size = 0;
/* Are we verbose. */
static int verbose = 0;
......@@ -76,20 +81,52 @@ static int datacheck = 0;
static const int task_type_send = 22;
static const int task_type_recv = 23;
/* Global communicators for each of the subtypes. */
#define task_subtype_count 22 // Just some upper limit on subtype.
/* 3D index of array. */
#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x)
/* 2D index of array. */
#define INDEX2(nx, x, y) (nx * y + x)
/* Bit shift to accomodate all the bits of the maximum rank id. */
static int rank_shift = 0;
/* Maximum no. of messages (logs). */
static size_t max_logs = 0;
/* Offsets of the ranktag regions within the receive windows and lists of the
* assocated tags. */
static size_t ranktag_sizes[MAX_NR_RANKS] = {0};
//static size_t *ranktag_counts;
static size_t *ranktag_offsets;
static size_t *ranktag_lists;
/* The local send queue. */
static struct mpiuse_log_entry **volatile sends_queue[16];
static int volatile nr_sends[16] = {0};
static struct mpiuse_log_entry **volatile sends_queue[MAX_NR_RANKS];
static int volatile nr_sends[MAX_NR_RANKS] = {0};
static int volatile todo_send = 0;
/* Local receive queues separated by rank. XXX needs to be dynamic. */
static int volatile nr_recvs[16] = {0};
static struct mpiuse_log_entry **volatile recvs_queue[16];
/* Local receive queues separated by rank. */
static int volatile nr_recvs[MAX_NR_RANKS] = {0};
static struct mpiuse_log_entry **volatile recvs_queue[MAX_NR_RANKS];
/* Starting up the server ends. */
static int volatile starting[16] = {1}; // XXX needs to be dynamic...
static int volatile starting[MAX_NR_RANKS] = {1};
/**
* @brief Convert two ranks and tag into a single unique value.
*
* Assumes there is enough space in a size_t for these values.
*
* @param sendrank the send rank XXX currently the subtype... XXX
* @param recvrank the receive rank
* @param tag the tag
*
* @result a unique value based on both values
*/
static size_t toranktag(int sendrank, int recvrank, int tag) {
size_t result = sendrank | recvrank << rank_shift | tag << (rank_shift * 2);
return result;
}
/**
* @brief Find an IP address for the given hostname.
......@@ -116,7 +153,7 @@ static char *toipaddr(char *hostname) {
*
* @result the number of blocks needed.
*/
static int toblocks(BLOCKTYPE nr_bytes) {
static BLOCKTYPE toblocks(BLOCKTYPE nr_bytes) {
return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
}
......@@ -195,28 +232,25 @@ static void *send_thread(void *arg) {
// Get QP to the other rank. Note we cannot do this until the related
// server is up and running, so make sure that is true..
//message("%d waiting for connection to remote server %s %d on %d", myrank,
// server_ip, rank, BASE_PORT + myrank);
if (verbose)
message("%d waiting for connection to remote server %s %d on %d", myrank,
server_ip, rank, BASE_PORT + myrank);
auto *qp = qpFactory->connectToRemoteHost(server_ip, BASE_PORT + myrank);
//message("%d connected to remote server %s %d on %d", myrank, server_ip, rank,
// BASE_PORT + myrank);
auto *remoteBufferToken =
(infinity::memory::RegionToken *) qp->getUserData();
if (verbose)
message("%d connected to remote server %s %d on %d", myrank, server_ip, rank,
BASE_PORT + myrank);
auto *remoteBufferToken = (infinity::memory::RegionToken *) qp->getUserData();
/* Register some memory for use by RDMA, make it large enough for our
* biggest message. */
auto *sendBuffer = new infinity::memory::Buffer(context,
tobytes(MESSAGE_SIZE));
tobytes(max_size));
/* Queue of our sends. */
struct mpiuse_log_entry **send_queue = sends_queue[rank];
for (int k = 0; k < nr_sends[rank]; k++) {
struct mpiuse_log_entry *log = send_queue[k];
// Only send messages to the expected rank.
if (log->otherrank != rank) continue;
/* Data has the actual data and room for the header. */
BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
BLOCKTYPE *dataptr = (BLOCKTYPE *)calloc(datasize, BYTESINBLOCK);
......@@ -227,34 +261,80 @@ static void *send_thread(void *arg) {
if (datacheck)
datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE], log->tag);
/* First element has our rank, other elements replicate what we need to
* define an MPI message. */
/* First element is the lock element, which can have any value other than
* UNLOCKED, the other elements define an MPI message. */
dataptr[0] = myrank;
dataptr[1] = log->subtype;
dataptr[2] = log->size;
dataptr[3] = log->tag;
message("send message subtype %d from %d to %d (todo: %d)", log->subtype,
myrank, log->otherrank, nr_sends[rank] -k);
/* Copy this to the registered memory. */
memcpy(sendBuffer->getData(), dataptr, tobytes(datasize));
/* Need to find the offset for this data in the remotes window. We match
* subtype, tag and rank. Need to search the ranktag_lists for our ranktag
* value. XXX bisection search if sorted? XXX */
size_t ranktag = toranktag(log->subtype, log->otherrank, log->tag);
//size_t counts = ranktag_counts[INDEX2(MAX_NR_RANKS, myrank,
// log->otherrank)];
log->offset = 0;
int found = 0;
size_t counts = max_logs; // We search all of ranktag_lists...
for (size_t j = 0; j < counts; j++) {
if (ranktag_lists[INDEX3(MAX_NR_RANKS, nr_ranks, log->rank,
log->otherrank, j)] == ranktag) {
log->offset = ranktag_offsets[INDEX3(MAX_NR_RANKS, nr_ranks,
log->rank, log->otherrank, j)];
found = 1;
break;
}
}
if (!found) {
error(
"Failed sending a message of size %zd to %d/%d "
"@ %zd\n, no offset found for ranktag %zd, counts = %zd",
datasize, log->otherrank, log->subtype, log->offset, ranktag,
counts);
}
if (verbose)
message("sending message subtype %d from %d to %d todo: %d/%d, offset %ld size %ld",
log->subtype, myrank, log->otherrank, nr_sends[rank] - k,
nr_sends[rank], log->offset, datasize);
// And send
infinity::requests::RequestToken requestToken(context);
//qp->send(sendBuffer, tobytes(datasize), &requestToken);
qp->write(sendBuffer, remoteBufferToken, tobytes(datasize),
qp->write(sendBuffer,
0, // localOffset
remoteBufferToken, // destination
tobytes(log->offset), // remoteOffset
tobytes(datasize), // sizeInBytes
infinity::queues::OperationFlags(),
&requestToken);
requestToken.waitUntilCompleted();
// Not sufficient for the local handoff to suceed, we also need to know
// that the remote buffer is available...
requestToken.reset();
// Now we update the unlock field.
((BLOCKTYPE *)sendBuffer->getData())[0] = UNLOCKED;
qp->write(sendBuffer,
0, // localOffset
remoteBufferToken, // destination
tobytes(log->offset),// remoteOffset
BYTESINBLOCK, // sizeInBytes
infinity::queues::OperationFlags(),
&requestToken);
requestToken.waitUntilCompleted(); // Since we reuse the sendBuffer.
//requestToken.reset();
log->endtic = getticks();
}
delete sendBuffer;
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
if (verbose)
message("sent %d to %d", nr_sends[rank], rank);
message("took %.3f %s sending to rank %d", clocks_from_ticks(getticks() - starttics),
clocks_getunit(), rank);
delete qp;
delete qpFactory;
......@@ -272,6 +352,7 @@ static void *recv_thread(void *arg) {
/* Short circuit. */
int todo_recv = nr_recvs[rank];
int nr_recv = nr_recvs[rank];
if (todo_recv == 0) {
if (verbose)
message("took 0.0 %s.", clocks_getunit());
......@@ -283,23 +364,28 @@ static void *recv_thread(void *arg) {
auto *context = new infinity::core::Context();
auto *qpFactory = new infinity::queues::QueuePairFactory(context);
// Buffer for remote exchanges.
auto *bufferToReadWrite = new infinity::memory::Buffer(
context, 16 * tobytes(MESSAGE_SIZE));
/* Buffer to receive all the remote data. XXX mystery factor of 2.*/
auto *bufferToReadWrite =
new infinity::memory::Buffer(context, 2 * tobytes(ranktag_sizes[rank]));
infinity::memory::RegionToken *bufferToken =
bufferToReadWrite->createRegionToken();
((BLOCKTYPE *)bufferToReadWrite->getData())[0] = -1;
// XXX need to make sure it is initialised. XXX
// Port binding.
message("%d binding to %d on port %d", myrank, rank, BASE_PORT + rank);
fflush(stdout);
if (verbose) {
message("%d binding to %d on port %d", myrank, rank, BASE_PORT + rank);
fflush(stdout);
}
qpFactory->bindToPort(BASE_PORT + rank);
message("Blocking for first message on %d", BASE_PORT + rank);
if (verbose)
message("Blocking for first message on %d", BASE_PORT + rank);
starting[rank] = 0; // really need to do this in acceptIncomingConnection().
auto qp = qpFactory->acceptIncomingConnection
(bufferToken, sizeof(infinity::memory::RegionToken)); // We block here for first message.
message("Accepting incoming connections on %d", BASE_PORT + rank);
(bufferToken, sizeof(infinity::memory::RegionToken));
if (verbose)
message("Accepting incoming connections on %d", BASE_PORT + rank);
/* Ignore the timing on previous part, which is fixed. */
ticks starttics = getticks();
......@@ -309,76 +395,54 @@ static void *recv_thread(void *arg) {
/* We loop while new requests are being send and we still have messages
* to receive. */
ticks findsum = 0;
while (todo_recv > 0) {
ticks findtics = getticks();
/* Loop over remaining messages, checking if any have been unlocked. */
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done) {
if (findsum == 0) {
// Ignore overheads of first exchange?
starttics = getticks();
findtics = starttics;
}
/* On the first attempt we start listening for this receive. */
if (log->injtic == 0) log->injtic = getticks();
/* Get find the data for this message. */
BLOCKTYPE * dataptr =
&((BLOCKTYPE *)bufferToReadWrite->getData())[log->offset];
/* Check if this has been unlocked. */
BLOCKTYPE volatile lock = dataptr[0];
if (lock == UNLOCKED) {
// Unpack the header, has been updated if rank header is set.
BLOCKTYPE *dataptr = (BLOCKTYPE *)bufferToReadWrite->getData();
int rank = dataptr[0];
if (rank > 0) {
int subtype = dataptr[1];
size_t size = dataptr[2];
int tag = dataptr[3];
/* Now find the associated log. XXX speed this up, local queue. */
int found = 0;
for (int k = 0; k < nr_recvs[rank]; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done) {
/* On the first attempt we start listening for this receive. */
if (log->injtic == 0) log->injtic = getticks();
if (log->otherrank == rank && log->subtype == subtype &&
log->size == size && log->tag == tag) {
found = 1;
//if (verbose)
message("receive message subtype %d from %d on %d (todo: %d)", log->subtype,
rank, myrank, todo_recv);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], log->tag)) {
message("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
// free(log->data); // XXX should really offload the data to be fair.
log->endtic = getticks();
todo_recv--;
// XXX should check these for correctness.
//int subtype = dataptr[1];
//size_t size = dataptr[2];
//int tag = dataptr[3];
if (verbose)
message("receive message subtype %d from %d to %d todo: %d/%d,"
" offset %ld size %ld",
log->subtype, myrank, log->otherrank, todo_recv, nr_recv,
log->offset, toblocks(log->size) + HEADER_SIZE);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], log->tag)) {
message("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
todo_recv--;
}
}
findsum += (getticks() - findtics);
if (!found) {
error(
"No matching receive on connections to %d (%d of %d todo:"
" rank = %d otherrank = %d subtype = %d size = %zd tag = %d)",
BASE_PORT + rank, todo_recv, nr_recvs[rank], myrank, rank, subtype,
size, tag);
}
}
// Ready for next use of buffer?
((BLOCKTYPE *)bufferToReadWrite->getData())[0] = -1;
}
message("recv locate took %.3f %s.",
clocks_from_ticks(findsum),
clocks_getunit());
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
message("took %.3f %s listening for rank %d", clocks_from_ticks(getticks() - starttics),
clocks_getunit(), rank);
delete qp;
delete qpFactory;
......@@ -424,6 +488,7 @@ static size_t pick_logs() {
log->injtic = 0;
log->endtic = 0;
log->data = NULL;
log->ranktag = toranktag(log->subtype, log->rank, log->tag);
/* Scale size. */
log->size *= messagescale;
......@@ -464,7 +529,7 @@ static size_t pick_logs() {
}
for (int k = 0; k < nr_ranks; k++) {
/* Recvs */
if (nr_recvs[k] > 0) {
recvs_queue[k] = (struct mpiuse_log_entry **)
......@@ -499,6 +564,33 @@ static size_t pick_logs() {
sends_queue[k] = NULL;
}
}
/* Offsets and ranktags. */
ranktag_offsets = (size_t *)
calloc(MAX_NR_RANKS * nr_ranks * max_logs, sizeof(size_t));
ranktag_lists = (size_t *)
calloc(MAX_NR_RANKS * nr_ranks * max_logs, sizeof(size_t));
//ranktag_counts = (size_t *)calloc(MAX_NR_RANKS * nr_ranks, sizeof(size_t));
/* Setup the ranktag offsets for our receive windows. Also define the sizes
* of the windows. XXX note these are over the complete lists not the
* separated ones, k != index of sub queues.
* XXX need to also include subtype in ranktag XXX */
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
ranktag_lists[INDEX3(MAX_NR_RANKS, nr_ranks, log->otherrank, log->rank,
k)] = log->ranktag;
ranktag_offsets[INDEX3(MAX_NR_RANKS, nr_ranks, log->otherrank, log->rank,
k)] = ranktag_sizes[log->otherrank];
log->offset = ranktag_sizes[log->otherrank];
/* Need to use a multiple of blocks to keep alignment. */
size_t size = toblocks(log->size) + HEADER_SIZE;
ranktag_sizes[log->otherrank] += size;
//ranktag_counts[INDEX2(MAX_NR_RANKS, log->otherrank, myrank)]++;
}
free(recv_queue);
free(send_queue);
......@@ -576,11 +668,32 @@ int main(int argc, char *argv[]) {
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_ranks);
/* Index of most significant bit in the maximum rank id. Assumes GCC
* intrinsic. */
rank_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(nr_ranks);
/* We all need to agree on a maximum count of logs, so we can exchange the
* offset arrays (would be ragged otherwise and difficult to exchange). */
max_logs = mpiuse_nr_logs() / 2 + 1;
MPI_Allreduce(MPI_IN_PLACE, &max_logs, 1, MPI_AINT, MPI_MAX, MPI_COMM_WORLD);
/* Extract the send and recv messages for our rank. */
size_t maxsize = pick_logs();
/* Size of a message board. Needs to align on size_t. */
MESSAGE_SIZE = toblocks(maxsize) + HEADER_SIZE;
/* Largest Size of a message. Needs to align on size_t. */
max_size = toblocks(maxsize) + HEADER_SIZE;
/* We need to share all the offsets for each communicator with all the other
* ranks so they can push data into the correct parts of our receive
* window. */
MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets,
MAX_NR_RANKS * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
MPI_COMM_WORLD);
//MPI_Allreduce(MPI_IN_PLACE, ranktag_counts, MAX_NR_RANKS * nr_ranks,
// MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, ranktag_lists,
MAX_NR_RANKS * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
MPI_COMM_WORLD);
/* Now for the RDMA setup. We need the IP addresses of all the ranks. */
......@@ -597,7 +710,7 @@ int main(int argc, char *argv[]) {
MPI_Allgather(ip, MPI_MAX_PROCESSOR_NAME, MPI_BYTE, server_ips,
MPI_MAX_PROCESSOR_NAME, MPI_BYTE, MPI_COMM_WORLD);
if (myrank == 0) {
if (myrank == 0 && verbose) {
message("RDMA servers will listen on:");
for (int j = 0; j < nr_ranks; j++) {
for (int k = 0; k < nr_ranks; k++) {
......@@ -644,11 +757,12 @@ int main(int argc, char *argv[]) {
}
}
}
message("All servers are started");
if (myrank == 0) message("All RDMA servers are started");
/* Reset time as previous can be thought of as setup costs? */
MPI_Barrier(MPI_COMM_WORLD); // Vital...
message("All synchronized");
if (myrank == 0)
message("All synchronized, restarting time.");
clocks_set_cpufreq(0);
/* Now we have a thread per rank to send the messages. */
......@@ -671,10 +785,9 @@ int main(int argc, char *argv[]) {
pthread_join(recvthread[k], NULL);
}
}
MPI_Barrier(MPI_COMM_WORLD);
/* Dump the updated MPI logs. */
fflush(stdout);
MPI_Barrier(MPI_COMM_WORLD);
if (myrank == 0) message("Dumping updated log");
mpiuse_dump_logs(nranks, logfile);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment