Skip to content
Snippets Groups Projects
Commit 2d26e28a authored by Peter W. Draper's avatar Peter W. Draper
Browse files

One sided works, but doesn't run to completion, still need to control the synchronization

parent 280f4142
No related branches found
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
......@@ -66,6 +66,9 @@ static size_t MESSAGE_SIZE = 0;
/* Are we verbose. */
static int verbose = 0;
/* Scale to apply to the size of the messages we send. */
static float messagescale = 1.0;
/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;
......@@ -77,8 +80,8 @@ static const int task_type_recv = 23;
#define task_subtype_count 22 // Just some upper limit on subtype.
/* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue;
static int volatile nr_send = 0;
static struct mpiuse_log_entry **volatile sends_queue[16];
static int volatile nr_sends[16] = {0};
static int volatile todo_send = 0;
/* Local receive queues separated by rank. XXX needs to be dynamic. */
......@@ -178,6 +181,13 @@ static void *send_thread(void *arg) {
char *server_ip = stuff->server_ip;
int rank = stuff->rank;
/* Short circuit. */
if (nr_sends[rank] == 0) {
if (verbose)
message("took 0.0 %s.", clocks_getunit());
return NULL;
}
// Need a factory to create QP.
infinity::core::Context *context = new infinity::core::Context();
infinity::queues::QueuePairFactory *qpFactory =
......@@ -190,8 +200,18 @@ static void *send_thread(void *arg) {
auto *qp = qpFactory->connectToRemoteHost(server_ip, BASE_PORT + myrank);
//message("%d connected to remote server %s %d on %d", myrank, server_ip, rank,
// BASE_PORT + myrank);
auto *remoteBufferToken =
(infinity::memory::RegionToken *) qp->getUserData();
for (int k = 0; k < nr_send; k++) {
/* Register some memory for use by RDMA, make it large enough for our
* biggest message. */
auto *sendBuffer = new infinity::memory::Buffer(context,
tobytes(MESSAGE_SIZE));
/* Queue of our sends. */
struct mpiuse_log_entry **send_queue = sends_queue[rank];
for (int k = 0; k < nr_sends[rank]; k++) {
struct mpiuse_log_entry *log = send_queue[k];
// Only send messages to the expected rank.
......@@ -213,19 +233,25 @@ static void *send_thread(void *arg) {
dataptr[1] = log->subtype;
dataptr[2] = log->size;
dataptr[3] = log->tag;
message("send message subtype %d from %d to %d (todo: %d)", log->subtype,
myrank, log->otherrank, nr_sends[rank] -k);
/* Need to assign to a buffer to register memory. */
auto *sendBuffer = new infinity::memory::Buffer(context, dataptr,
tobytes(datasize));
/* Copy this to the registered memory. */
memcpy(sendBuffer->getData(), dataptr, tobytes(datasize));
// And send
infinity::requests::RequestToken requestToken(context);
qp->send(sendBuffer, &requestToken);
//qp->send(sendBuffer, tobytes(datasize), &requestToken);
qp->write(sendBuffer, remoteBufferToken, tobytes(datasize),
&requestToken);
requestToken.waitUntilCompleted();
log->endtic = getticks();
delete sendBuffer;// XXX Can we reuse ?
// Not sufficient for the local handoff to suceed, we also need to know
// that the remote buffer is available...
log->endtic = getticks();
}
delete sendBuffer;
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
......@@ -243,44 +269,61 @@ static void *send_thread(void *arg) {
static void *recv_thread(void *arg) {
int rank = *(int *)arg;
ticks starttics = getticks();
/* Short circuit. */
int todo_recv = nr_recvs[rank];
if (todo_recv == 0) {
if (verbose)
message("took 0.0 %s.", clocks_getunit());
starting[rank] = 0; // Also fudge.
return NULL;
}
// Each receive port needs a factory to create QPs.
auto *context = new infinity::core::Context();
auto *qpFactory = new infinity::queues::QueuePairFactory(context);
// Create buffer to receive messages. Only size for one, or not...
auto *receiveBuffer = new infinity::memory::Buffer(
// Buffer for remote exchanges.
auto *bufferToReadWrite = new infinity::memory::Buffer(
context, 16 * tobytes(MESSAGE_SIZE));
context->postReceiveBuffer(receiveBuffer);
infinity::memory::RegionToken *bufferToken =
bufferToReadWrite->createRegionToken();
((BLOCKTYPE *)bufferToReadWrite->getData())[0] = -1;
// Port binding.
//message("%d binding to %d on port %d", myrank, rank, BASE_PORT + rank);
message("%d binding to %d on port %d", myrank, rank, BASE_PORT + rank);
fflush(stdout);
qpFactory->bindToPort(BASE_PORT + rank);
//message("Blocking for first message on %d", BASE_PORT + rank);
message("Blocking for first message on %d", BASE_PORT + rank);
starting[rank] = 0; // really need to do this in acceptIncomingConnection().
auto qp =
qpFactory
->acceptIncomingConnection(); // We block here for first message.
//message("Accepting incoming connections on %d", BASE_PORT + rank);
auto qp = qpFactory->acceptIncomingConnection
(bufferToken, sizeof(infinity::memory::RegionToken)); // We block here for first message.
message("Accepting incoming connections on %d", BASE_PORT + rank);
/* No. of receives to process and associated queue. */
int todo_recv = nr_recvs[rank];
/* Ignore the timing on previous part, which is fixed. */
ticks starttics = getticks();
/* Our local queue. */
struct mpiuse_log_entry **recv_queue = recvs_queue[rank];
/* We loop while new requests are being send and we still have messages
* to receive. */
infinity::core::receive_element_t receiveElement;
ticks findsum = 0;
while (todo_recv > 0) {
while (!context->receive(&receiveElement))
;
ticks findtics = getticks();
if (findsum == 0) {
// Ignore overheads of first exchange?
starttics = getticks();
findtics = starttics;
}
// Unpack the header.
BLOCKTYPE *dataptr = (BLOCKTYPE *)receiveElement.buffer->getData();
// Unpack the header, has been updated if rank header is set.
BLOCKTYPE *dataptr = (BLOCKTYPE *)bufferToReadWrite->getData();
int rank = dataptr[0];
if (rank > 0) {
int subtype = dataptr[1];
size_t size = dataptr[2];
int tag = dataptr[3];
......@@ -298,9 +341,9 @@ static void *recv_thread(void *arg) {
log->size == size && log->tag == tag) {
found = 1;
if (verbose)
message("receive message subtype %d from %d on %d", log->subtype,
rank, myrank);
//if (verbose)
message("receive message subtype %d from %d on %d (todo: %d)", log->subtype,
rank, myrank, todo_recv);
/* Check data sent data is unchanged and received data is as
* expected. */
......@@ -317,6 +360,7 @@ static void *recv_thread(void *arg) {
}
}
}
findsum += (getticks() - findtics);
if (!found) {
error(
"No matching receive on connections to %d (%d of %d todo:"
......@@ -324,15 +368,18 @@ static void *recv_thread(void *arg) {
BASE_PORT + rank, todo_recv, nr_recvs[rank], myrank, rank, subtype,
size, tag);
}
}
// Ready for next use of buffer?
context->postReceiveBuffer(receiveElement.buffer);
((BLOCKTYPE *)bufferToReadWrite->getData())[0] = -1;
}
message("recv locate took %.3f %s.",
clocks_from_ticks(findsum),
clocks_getunit());
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
delete receiveBuffer;
delete qp;
delete qpFactory;
......@@ -362,9 +409,9 @@ static size_t pick_logs() {
size_t maxsize = 0;
/* Queues of send and receive logs. */
send_queue = (struct mpiuse_log_entry **)
struct mpiuse_log_entry **send_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
nr_send = 0;
int nr_send = 0;
struct mpiuse_log_entry **recv_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
int nr_recv = 0;
......@@ -377,6 +424,10 @@ static size_t pick_logs() {
log->injtic = 0;
log->endtic = 0;
log->data = NULL;
/* Scale size. */
log->size *= messagescale;
if (log->type == task_type_send) {
send_queue[nr_send] = log;
nr_send++;
......@@ -399,12 +450,22 @@ static size_t pick_logs() {
/* Now we need to count the numbers of messages to send per rank
* and create sub-queues for these.*/
for (int k = 0; k < nr_ranks; k++) nr_recvs[k] = 0;
for (int k = 0; k < nr_ranks; k++) {
nr_recvs[k] = 0;
nr_sends[k] = 0;
}
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
nr_recvs[log->otherrank]++;
}
for (int k = 0; k < nr_send; k++) {
struct mpiuse_log_entry *log = send_queue[k];
nr_sends[log->otherrank]++;
}
for (int k = 0; k < nr_ranks; k++) {
/* Recvs */
if (nr_recvs[k] > 0) {
recvs_queue[k] = (struct mpiuse_log_entry **)
calloc(nr_recvs[k], sizeof(struct mpiuse_log_entry *));
......@@ -420,9 +481,26 @@ static size_t pick_logs() {
else {
recvs_queue[k] = NULL;
}
/* Sends. */
if (nr_sends[k] > 0) {
sends_queue[k] = (struct mpiuse_log_entry **)
calloc(nr_sends[k], sizeof(struct mpiuse_log_entry *));
int i = 0;
for (int j = 0; j < nr_send; j++) {
struct mpiuse_log_entry *log = send_queue[j];
if (log->otherrank == k) {
sends_queue[k][i] = send_queue[j];
i++;
}
}
}
else {
sends_queue[k] = NULL;
}
}
free(recv_queue);
free(send_queue);
if (verbose) {
message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
......@@ -463,7 +541,7 @@ int main(int argc, char *argv[]) {
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int opt;
while ((opt = getopt(argc, argv, "vd")) != -1) {
while ((opt = getopt(argc, argv, "vds:")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
......@@ -471,6 +549,9 @@ int main(int argc, char *argv[]) {
case 'v':
verbose = 1;
break;
case 's':
messagescale = atof(optarg);
break;
default:
if (myrank == 0) usage(argv);
return 1;
......@@ -565,9 +646,10 @@ int main(int argc, char *argv[]) {
}
message("All servers are started");
// And make sure all remotes are also ready.
/* Reset time as previous can be thought of as setup costs? */
MPI_Barrier(MPI_COMM_WORLD); // Vital...
message("All synchronized");
clocks_set_cpufreq(0);
/* Now we have a thread per rank to send the messages. */
pthread_t sendthread[nr_ranks];
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment