Skip to content
Snippets Groups Projects
Commit 59cef982 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Works with 2 ranks, including -d

parent 2f4364d6
Branches
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
......@@ -50,10 +50,6 @@ static int nr_ranks;
/* Base port no. Ranks use +rank. */
static int BASE_PORT = 17771;
/* Flags for controlling access. */
static int LOCKED = -2;
static int UNLOCKED = -3;
/* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as
* as we need to align in memory. */
#define BLOCKTYPE size_t
......@@ -79,11 +75,6 @@ static const int task_type_recv = 23;
/* Global communicators for each of the subtypes. */
#define task_subtype_count 22 // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[task_subtype_count];
/* And the windows for one-sided communications. */
static MPI_Win mpi_window[task_subtype_count];
static BLOCKTYPE *mpi_ptr[task_subtype_count];
/* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue;
......@@ -92,17 +83,10 @@ static int volatile todo_send = 0;
/* The local receive queue. */
static struct mpiuse_log_entry **volatile recv_queue;
static int volatile nr_recv = 0;
static int volatile todo_recv = 0;
/* Flags of active ranks and subtypes. */
static int *volatile active_ranks = NULL;
static int volatile nr_active_ranks = 0;
static int *volatile active_subtypes = NULL;
static int volatile nr_active_subtypes = 0;
static int volatile nr_recv[16] = {0}; // XXX needs to be dynamic
/* Starting up the server ends. */
static int volatile starting = 1;
static int volatile starting[16] = {1}; // XXX needs to be dynamic...
/**
* @brief Find an IP address for the given hostname.
......@@ -165,16 +149,21 @@ static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
*/
static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) {
for (size_t i = 0; i < size; i++) {
if (data[i] != rank) {
message("see %zd expected %d @ %zd", data[i], rank, i);
if (data[i] != (BLOCKTYPE)rank) {
message("see %zd expected %d @ %zd (%zd to go)", data[i], rank, i, size);
return 0;
}
}
return 1;
}
struct stuff {
char server_ip[32];
int rank;
};
/**
* @brief Send thread, sends RDMA messages to other ranks one-by-one.
* @brief Send thread, sends RDMA messages to another rank.
*
* Messages are all considered in order.
*/
......@@ -182,28 +171,29 @@ static void *send_thread(void *arg) {
ticks starttics = getticks();
// The server IPs
char *server_ips = (char *)arg;
// Get the destination IP and rank.
struct stuff *stuff = (struct stuff *)arg;
char *server_ip = stuff->server_ip;
int rank = stuff->rank;
// Need a factory to create QPs.
// Need a factory to create QP.
auto context = std::make_shared<infinity::core::Context>();
auto qpFactory =
std::make_shared<infinity::queues::QueuePairFactory>(context);
// Get QPs to all the other ranks.
std::shared_ptr<infinity::queues::QueuePair> qps[nr_ranks];
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
message("attempting connection to rank %d from %d", k, myrank);
qps[k] = qpFactory->connectToRemoteHost(
&server_ips[k * MPI_MAX_PROCESSOR_NAME], BASE_PORT + k);
message("connected to rank %d from %d", k, myrank);
}
}
// Get QP to the other rank. Note we cannot do this until the related
// server is up and running, so make sure that is true..
message("waiting for connection to remote server %s %d", server_ip,
BASE_PORT + myrank);
auto qp = qpFactory->connectToRemoteHost(server_ip, BASE_PORT + myrank);
message("connected to remote server %s %d", server_ip, BASE_PORT + myrank);
for (int k = 0; k < nr_send; k++) {
struct mpiuse_log_entry *log = send_queue[k];
// Only send messages to the expected rank.
if (log->otherrank != rank) continue;
/* Data has the actual data and room for the header. */
BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
BLOCKTYPE *dataptr = (BLOCKTYPE *)calloc(datasize, BYTESINBLOCK);
......@@ -220,15 +210,12 @@ static void *send_thread(void *arg) {
dataptr[3] = log->tag;
/* Need to assign to a buffer. */
auto sendBuffer =
infinity::memory::Buffer::createBuffer(context, dataptr, datasize);
auto sendBuffer = infinity::memory::Buffer::createBuffer(context, dataptr,
tobytes(datasize));
// And send... XXX The pattern is to only use a requestToken when
// completing the available no. of buffers, we have a problem with the
// multiple ranks as targets. We'd need to know this is the last message for
// that rank...
// And send
infinity::requests::RequestToken requestToken(context);
qps[log->otherrank]->send(sendBuffer, datasize, &requestToken);
qp->send(sendBuffer, &requestToken);
requestToken.waitUntilCompleted();
}
......@@ -239,43 +226,43 @@ static void *send_thread(void *arg) {
}
/**
* @brief recv thread, listens for remote sends from all the other ranks...
* @brief recv thread, listens for remote sends from another rank.
*/
static void *recv_thread(void *arg) {
int rank = *(int *)arg;
ticks starttics = getticks();
// Each receive thread needs a factory to create QPs.
// Each receive port needs a factory to create QPs.
auto context = std::make_shared<infinity::core::Context>();
auto qpFactory =
std::make_shared<infinity::queues::QueuePairFactory>(context);
// Create buffer to receive messages. Only size for one. XXX can just add
// more...
auto receiveBuffer =
infinity::memory::Buffer::createBuffer(context, MESSAGE_SIZE);
// Create buffer to receive messages. Only size for one, or not...
auto receiveBuffer = infinity::memory::Buffer::createBuffer(
context, 16 * tobytes(MESSAGE_SIZE));
context->postReceiveBuffer(receiveBuffer);
// Bind to our port. XXX do we need more qps for each remote? How do they
// separate?
qpFactory->bindToPort(BASE_PORT + myrank);
message("Blocking for first message on %d", BASE_PORT + myrank);
starting = 0; // Still not quite correct, we need to do this in acceptIncomingConnection().
auto qp = qpFactory->acceptIncomingConnection(); // We block here for first message.
message("Accepting incoming connections on %d", BASE_PORT + myrank);
// Port binding.
qpFactory->bindToPort(BASE_PORT + rank);
message("Blocking for first message on %d", BASE_PORT + rank);
starting[rank] = 0; // really need to do this in acceptIncomingConnection().
auto qp =
qpFactory
->acceptIncomingConnection(); // We block here for first message.
message("Accepting incoming connections on %d", BASE_PORT + rank);
/* No. of receives to process. */
todo_recv = nr_recv;
int todo_recv = nr_recv[rank];
/* We loop while new requests are being send and we still have messages
* to receive. */
infinity::core::receive_element_t receiveElement;
while (todo_recv > 0) {
// Listen for a message to arrive.
while (!context->receive(receiveElement))
;
context->postReceiveBuffer(receiveElement.buffer);
// Unpack the header.
BLOCKTYPE *dataptr = (BLOCKTYPE *)receiveElement.buffer->getData();
......@@ -286,33 +273,36 @@ static void *recv_thread(void *arg) {
/* Now find the associated log. */
int found = 0;
for (int k = 0; k < nr_recv; k++) {
for (int k = 0; k < nr_recv[rank]; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done && log->otherrank == rank &&
log->subtype == subtype && log->size == size &&
log->tag == tag) {
log->subtype == subtype && log->size == size && log->tag == tag) {
found = 1;
if (verbose)
message("receive message %d/%d from %d @ %zd: dataptr[0] %zd",
log->rank, log->subtype, rank, rank * MESSAGE_SIZE,
dataptr[0]);
// if (verbose)
message("receive message subtype %d from %d on %d", log->subtype, rank,
myrank);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(toblocks(log->size), &dataptr[HEADER_SIZE], rank)) {
if (datacheck &&
!datacheck_test(toblocks(log->size), &dataptr[HEADER_SIZE], rank)) {
message("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
//free(log->data); // XXX should really offload the data to be fair.
atomic_dec(&todo_recv);
// free(log->data); // XXX should really offload the data to be fair.
todo_recv--;
}
}
if (!found) {
error("Failed to find a matching receive");
error("No matching receive on connections to %d (%d of %d todo)",
BASE_PORT + rank, todo_recv, nr_recv[rank]);
}
// Ready for next use of buffer?
context->postReceiveBuffer(receiveElement.buffer);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
......@@ -348,15 +338,9 @@ static size_t pick_logs() {
nr_send = 0;
recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recv = 0;
int all_nr_recv = 0;
/* Flags for active elements. */
active_ranks = (int *)calloc(nr_ranks, sizeof(int));
nr_active_ranks = 0;
active_subtypes = (int *)calloc(task_subtype_count, sizeof(int));
nr_active_subtypes = 0;
for (int k = 0; k < nlogs; k++) {
for (size_t k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->activation) {
if (log->rank == myrank) {
......@@ -366,15 +350,11 @@ static size_t pick_logs() {
send_queue[nr_send] = log;
nr_send++;
} else if (log->type == task_type_recv) {
recv_queue[nr_recv] = log;
nr_recv++;
recv_queue[all_nr_recv] = log;
all_nr_recv++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
/* Activative rank and subtype. */
active_ranks[log->otherrank] = 1;
active_subtypes[log->subtype] = 1;
}
/* Across all ranks. */
......@@ -382,34 +362,20 @@ static size_t pick_logs() {
}
}
/* Active ranks and subtypes into indices. */
nr_active_ranks = 0;
for (int k = 0; k < nr_ranks; k++) {
if (active_ranks[k]) {
active_ranks[nr_active_ranks] = k;
message("active rank: %d", k);
nr_active_ranks++;
}
}
nr_active_subtypes = 0;
for (int k = 0; k < task_subtype_count; k++) {
if (active_subtypes[k]) {
active_subtypes[nr_active_subtypes] = k;
message("active subtype: %d", k);
nr_active_subtypes++;
}
/* Now we need to count the numbers of messages to send per rank. */
for (int k = 0; k < nr_ranks; k++) nr_recv[k] = 0;
for (int k = 0; k < all_nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
nr_recv[log->otherrank]++;
}
/* Sort into increasing tag. */
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(recv_queue, all_nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
if (verbose) {
message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
nr_recv);
message("active ranks = %d, active subtypes = %d", nr_active_ranks,
nr_active_subtypes);
message("maxsize = %zd, nr_send = %d, all_nr_recv = %d", maxsize, nr_send,
all_nr_recv);
}
return maxsize;
......@@ -480,8 +446,6 @@ int main(int argc, char *argv[]) {
/* Extract the send and recv messages for our rank. */
size_t maxsize = pick_logs();
message("active ranks = %d, active subtypes = %d", nr_active_ranks,
nr_active_subtypes);
/* Size of a message board. Needs to align on size_t. */
MESSAGE_SIZE = toblocks(maxsize) + HEADER_SIZE;
......@@ -496,16 +460,23 @@ int main(int argc, char *argv[]) {
strncpy(ip, toipaddr(name), MPI_MAX_PROCESSOR_NAME);
/* And distribute, so we all know everyone's IPs. */
char *server_ips = (char *)malloc(sizeof(char) * nr_ranks * MPI_MAX_PROCESSOR_NAME);
char *server_ips =
(char *)malloc(sizeof(char) * nr_ranks * MPI_MAX_PROCESSOR_NAME);
MPI_Allgather(ip, MPI_MAX_PROCESSOR_NAME, MPI_BYTE, server_ips,
MPI_MAX_PROCESSOR_NAME, MPI_BYTE, MPI_COMM_WORLD);
if (myrank == 0) {
message("RDMA servers will listen on:");
for (int k = 0; k < nr_ranks; k++) {
message(" %s on port %d", &server_ips[k * MPI_MAX_PROCESSOR_NAME], BASE_PORT + k);
for (int j = 0; j < nr_ranks; j++) {
for (int k = 0; k < nr_ranks; k++) {
if (k != j) {
message(" %d: %s on port %d", j,
&server_ips[j * MPI_MAX_PROCESSOR_NAME], BASE_PORT + k);
}
}
}
}
for (int k = 0; k < nr_ranks; k++) starting[k] = 1;
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
......@@ -519,24 +490,54 @@ int main(int argc, char *argv[]) {
}
}
/* Make two threads, one for sending and one for receiving RDMA messages. */
/* Make a thread per rank, each one has a QP that connects between this rank
* and that rank. We need to start all the server threads first. */
pthread_t recvthread[nr_ranks];
int ranks[nr_ranks];
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
starting[k] = 1;
ranks[k] = k;
if (pthread_create(&recvthread[k], NULL, &recv_thread, &ranks[k]) != 0)
error("Failed to create recv thread.");
}
}
pthread_t recvthread; // Start servers first..
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
// Wait on all the local servers to start.
int ready = 0;
while (ready != nr_ranks - 1) {
ready = 0;
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
if (!starting[k]) ready++;
}
}
}
message("All servers are started");
// And make sure all remotes are also ready.
MPI_Barrier(MPI_COMM_WORLD);
message("All synchronized");
// Wait for all servers to start listening... Then release the messages.
while (starting)
;
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, server_ips) != 0)
error("Failed to create send thread.");
/* Now we have a thread per rank to send the messages. */
pthread_t sendthread[nr_ranks];
struct stuff stuff[nr_ranks];
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
strcpy(stuff[k].server_ip, &server_ips[k * MPI_MAX_PROCESSOR_NAME]);
stuff[k].rank = k;
if (pthread_create(&sendthread[k], NULL, &send_thread, &stuff[k]) != 0)
error("Failed to create send thread.");
}
}
/* Wait until all threads have exited and all message exchanges have
* completed. */
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
pthread_join(sendthread[k], NULL);
pthread_join(recvthread[k], NULL);
}
}
MPI_Barrier(MPI_COMM_WORLD);
/* Dump the updated MPI logs. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment