Skip to content
Snippets Groups Projects
Commit 2f4364d6 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Sort of runs a simple 2 rank test...

parent 9c25a2e1
No related branches found
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
#CFLAGS = -g -O0 -Wall -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined
CFLAGS = -g -O0 -Wall
CFLAGS = -g -O0 -Wall -Iinfinity/include
all: swiftmpistepsim swiftmpirdmastepsim
swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h
mpicc $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c
mpicxx $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c
swiftmpirdmastepsim: swiftmpirdmastepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h
mpicc $(CFLAGS) -o swiftmpirdmastepsim swiftmpirdmastepsim.c mpiuse.c clocks.c
mpicxx $(CFLAGS) -o swiftmpirdmastepsim swiftmpirdmastepsim.c mpiuse.c clocks.c -Linfinity -linfinity -libverbs
clean:
rm -f swiftmpistepsim
......
......
......@@ -17,25 +17,25 @@
*
******************************************************************************/
// Simple approach, use the window as a message board, capable of receiving a
// single message per ranks at a time, so needs to be larger than the largest
// message, and we need one per of these per rank.
//
// So we poll all ranks waiting for a push update unlocks its board, we then
// check for the tag and size, which need to match one of the expected
// messages, at which point we copy that away and release the board.
//
// On the send side we work synchronously, sending a message at a time
// waiting for our board to be unlocked by the receiver.
// Pure RDMA version, we use MPI for process control and synchronization.
#include <arpa/inet.h>
#include <limits.h>
#include <mpi.h>
#include <netdb.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <infinity/core/Context.h>
#include <infinity/memory/Buffer.h>
#include <infinity/memory/RegionToken.h>
#include <infinity/queues/QueuePair.h>
#include <infinity/queues/QueuePairFactory.h>
#include <infinity/requests/RequestToken.h>
#include "atomic.h"
#include "clocks.h"
#include "error.h"
......@@ -47,6 +47,9 @@ int myrank = -1;
/* Number of ranks. */
static int nr_ranks;
/* Base port no. Ranks use +rank. */
static int BASE_PORT = 17771;
/* Flags for controlling access. */
static int LOCKED = -2;
static int UNLOCKED = -3;
......@@ -57,8 +60,8 @@ static int UNLOCKED = -3;
#define MPI_BLOCKTYPE MPI_AINT
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Size of message header in blocks. The flag, size and tag. */
static const size_t HEADER_SIZE = 3;
/* Size of message header in blocks. The rank, subtype, size and tag. */
static const size_t HEADER_SIZE = 4;
/* Size of a message board in blocks, we have one of these per rank per
* communicator (i.e. per window). */
......@@ -98,6 +101,27 @@ static int volatile nr_active_ranks = 0;
static int *volatile active_subtypes = NULL;
static int volatile nr_active_subtypes = 0;
/* Starting up the server ends. */
static int volatile starting = 1;
/**
* @brief Find an IP address for the given hostname.
*
* @param hostname the hostname
*
* @result the IP address, note copy away to keep.
*/
static char *toipaddr(char *hostname) {
struct hostent *hostent = gethostbyname(hostname);
if (hostent == NULL) {
error("Failed to convert hostname '%s' to an IP address", hostname);
}
struct in_addr **addr_list = (struct in_addr **)hostent->h_addr_list;
return inet_ntoa(*addr_list[0]);
}
/**
* @brief Convert a byte count into a number of blocks, rounds up.
*
......@@ -150,92 +174,62 @@ static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) {
}
/**
* @brief Send thread, sends messages to other ranks one-by-one.
* @brief Send thread, sends RDMA messages to other ranks one-by-one.
*
* Messages are all considered in order, regardless of the subtype.
* Messages are all considered in order.
*/
static void *send_thread(void *arg) {
message("%d: send thread starts with %d messages", *((int *)arg), nr_send);
ticks starttics = getticks();
// The server IPs
char *server_ips = (char *)arg;
// Need a factory to create QPs.
auto context = std::make_shared<infinity::core::Context>();
auto qpFactory =
std::make_shared<infinity::queues::QueuePairFactory>(context);
// Get QPs to all the other ranks.
std::shared_ptr<infinity::queues::QueuePair> qps[nr_ranks];
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
message("attempting connection to rank %d from %d", k, myrank);
qps[k] = qpFactory->connectToRemoteHost(
&server_ips[k * MPI_MAX_PROCESSOR_NAME], BASE_PORT + k);
message("connected to rank %d from %d", k, myrank);
}
}
for (int k = 0; k < nr_send; k++) {
struct mpiuse_log_entry *log = send_queue[k];
/* Data has the actual data and room for the header. */
BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
BLOCKTYPE *dataptr = calloc(datasize, BYTESINBLOCK);
BLOCKTYPE *dataptr = (BLOCKTYPE *)calloc(datasize, BYTESINBLOCK);
log->data = dataptr;
/* Fill data with pattern. */
if (datacheck) datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE]);
/* First element is marked as LOCKED, so only we can update. */
dataptr[0] = LOCKED;
dataptr[1] = log->size;
dataptr[2] = log->tag;
/* And send data to other rank. */
int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE,
MPI_REPLACE, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data");
/* Now we change the last element to UNLOCKED so that the remote end can
* find out that the data has arrived. */
BLOCKTYPE newval[1];
BLOCKTYPE oldval[1];
newval[0] = UNLOCKED;
oldval[0] = 0;
ret = MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_BLOCKTYPE,
log->otherrank, MESSAGE_SIZE * myrank,
mpi_window[log->subtype]);
if (ret != MPI_SUCCESS)
mpi_error_message(ret, "MPI_Compare_and_swap error");
ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
if (verbose) {
if (oldval[0] == dataptr[0]) {
message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)",
log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
MESSAGE_SIZE * myrank, k, nr_send);
} else {
message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd %d/%d",
log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
MESSAGE_SIZE * myrank, k, nr_send);
}
}
/* Wait for completion, this is when remote flips back to LOCKED. We poll
* on a get, as the local window is only used for receiving. Use an Rget
* so we can use MPI_Test to get some local progression. */
newval[0] = UNLOCKED;
while (newval[0] != LOCKED) {
MPI_Request request;
ret = MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank,
MESSAGE_SIZE * myrank, 1, MPI_BLOCKTYPE,
mpi_window[log->subtype], &request);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Rget failed");
/* First element has our rank, other elements replicate what we need to
* define an MPI message. */
dataptr[0] = myrank;
dataptr[1] = log->subtype;
dataptr[2] = log->size;
dataptr[3] = log->tag;
/* After the rget to make sure we get a chance at completion. */
ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
/* Need to assign to a buffer. */
auto sendBuffer =
infinity::memory::Buffer::createBuffer(context, dataptr, datasize);
int flag = 0;
while (flag == 0) {
ret = MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Test failed");
}
}
if (verbose)
message("sent and acknowledged message to %d/%d %zd %d/%d)",
log->otherrank, log->subtype, dataptr[0], k, nr_send);
// And send... XXX The pattern is to only use a requestToken when
// completing the available no. of buffers, we have a problem with the
// multiple ranks as targets. We'd need to know this is the last message for
// that rank...
infinity::requests::RequestToken requestToken(context);
qps[log->otherrank]->send(sendBuffer, datasize, &requestToken);
requestToken.waitUntilCompleted();
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
......@@ -245,48 +239,58 @@ static void *send_thread(void *arg) {
}
/**
* @brief Recv thread, checks for messages in the window from other ranks.
* @brief recv thread, listens for remote sends from all the other ranks...
*/
static void *recv_thread(void *arg) {
message(
"%d: recv thread starts, checking for %d messages %d "
"ranks %d communicators",
*((int *)arg), nr_recv, nr_ranks, task_subtype_count);
ticks starttics = getticks();
/* Global statistics. */
int lncalls = 0;
double lsum = 0.0;
ticks lmint = INT_MAX;
ticks lmaxt = 0;
// Each receive thread needs a factory to create QPs.
auto context = std::make_shared<infinity::core::Context>();
auto qpFactory =
std::make_shared<infinity::queues::QueuePairFactory>(context);
// Create buffer to receive messages. Only size for one. XXX can just add
// more...
auto receiveBuffer =
infinity::memory::Buffer::createBuffer(context, MESSAGE_SIZE);
context->postReceiveBuffer(receiveBuffer);
// Bind to our port. XXX do we need more qps for each remote? How do they
// separate?
qpFactory->bindToPort(BASE_PORT + myrank);
message("Blocking for first message on %d", BASE_PORT + myrank);
starting = 0; // Still not quite correct, we need to do this in acceptIncomingConnection().
auto qp = qpFactory->acceptIncomingConnection(); // We block here for first message.
message("Accepting incoming connections on %d", BASE_PORT + myrank);
/* No. of receives to process. */
todo_recv = nr_recv;
/* We loop while new requests are being send and we still have messages
* to receive. */
infinity::core::receive_element_t receiveElement;
while (todo_recv > 0) {
for (int n = 0; n < nr_active_ranks; n++) {
if (todo_recv <= 0) break;
int rank = active_ranks[n];
if (rank == myrank) continue;
// Listen for a message to arrive.
while (!context->receive(receiveElement))
;
context->postReceiveBuffer(receiveElement.buffer);
for (int j = 0; j < nr_active_subtypes; j++) {
if (todo_recv <= 0) break;
int subtype = active_subtypes[j];
// Unpack the header.
BLOCKTYPE *dataptr = (BLOCKTYPE *)receiveElement.buffer->getData();
int rank = dataptr[0];
int subtype = dataptr[1];
size_t size = dataptr[2];
int tag = dataptr[3];
BLOCKTYPE *dataptr = &mpi_ptr[subtype][rank * MESSAGE_SIZE];
if (dataptr[0] == UNLOCKED) {
/* We have a message waiting to be handled, find the log. */
/* Now find the associated log. */
int found = 0;
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done && log->otherrank == rank &&
log->subtype == subtype && log->size == dataptr[1] &&
log->tag == dataptr[2]) {
log->subtype == subtype && log->size == size &&
log->tag == tag) {
found = 1;
if (verbose)
......@@ -296,20 +300,14 @@ static void *recv_thread(void *arg) {
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], rank)) {
if (datacheck && !datacheck_test(toblocks(log->size), &dataptr[HEADER_SIZE], rank)) {
message("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
free(log->data);
//free(log->data); // XXX should really offload the data to be fair.
atomic_dec(&todo_recv);
/* Now ready for next message. */
dataptr[0] = LOCKED;
break;
}
}
if (!found) {
......@@ -317,17 +315,6 @@ static void *recv_thread(void *arg) {
}
}
/* Need to allow for some MPI progession. Since we make no MPI calls
* (by intent receive is a passive target so only the sender should
* make calls that move data). */
int flag = 0;
int ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag,
MPI_STATUS_IGNORE);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Iprobe failed");
}
}
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
......@@ -499,18 +486,26 @@ int main(int argc, char *argv[]) {
/* Size of a message board. Needs to align on size_t. */
MESSAGE_SIZE = toblocks(maxsize) + HEADER_SIZE;
/* Now for the one-sided setup... We need a buffer with space largest
* message, plus one of these per rank. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
MPI_Win_allocate(tobytes(MESSAGE_SIZE * nr_ranks), BYTESINBLOCK,
MPI_INFO_NULL, subtypeMPI_comms[i], &mpi_ptr[i],
&mpi_window[i]);
/* Now for the RDMA setup. We need the IP addresses of all the ranks. */
/* Each rank can find its name and IP. */
char name[MPI_MAX_PROCESSOR_NAME];
int namelen = 0;
MPI_Get_processor_name(name, &namelen);
char ip[MPI_MAX_PROCESSOR_NAME];
strncpy(ip, toipaddr(name), MPI_MAX_PROCESSOR_NAME);
/* Assert a shared lock with all the other processes on this window. */
MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]);
/* And distribute, so we all know everyone's IPs. */
char *server_ips = (char *)malloc(sizeof(char) * nr_ranks * MPI_MAX_PROCESSOR_NAME);
MPI_Allgather(ip, MPI_MAX_PROCESSOR_NAME, MPI_BYTE, server_ips,
MPI_MAX_PROCESSOR_NAME, MPI_BYTE, MPI_COMM_WORLD);
if (myrank == 0) {
message("RDMA servers will listen on:");
for (int k = 0; k < nr_ranks; k++) {
message(" %s on port %d", &server_ips[k * MPI_MAX_PROCESSOR_NAME], BASE_PORT + k);
}
}
// message("Windows allocated");
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
......@@ -524,25 +519,25 @@ int main(int argc, char *argv[]) {
}
}
/* Make two threads, one for send and one for receiving. */
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
pthread_t recvthread;
/* Make two threads, one for sending and one for receiving RDMA messages. */
pthread_t recvthread; // Start servers first..
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
// Wait for all servers to start listening... Then release the messages.
while (starting)
;
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, server_ips) != 0)
error("Failed to create send thread.");
/* Wait until all threads have exited and all message exchanges have
* completed. */
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Free the window locks. Once we all arrive. */
MPI_Barrier(MPI_COMM_WORLD);
for (int i = 0; i < task_subtype_count; i++) {
MPI_Win_unlock_all(mpi_window[i]);
MPI_Win_free(&mpi_window[i]);
}
/* Dump the updated MPI logs. */
fflush(stdout);
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment