Skip to content
Snippets Groups Projects
Commit 02f01712 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Variation on RDMA send, use only one thread to send messages, still using n threads to receive

parent 4c01bd13
No related branches found
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
......@@ -2,18 +2,26 @@
CFLAGS = -g -O3 -Wall -Iinfinity/include
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread
INCLUDES = mpiuse.h atomic.h cycle.h clocks.h error.h
SOURCES = mpiuse.c clocks.c
DEPS = Makefile $(SOURCES) $(INCLUDES)
INFINITY = -Linfinity -linfinity -libverbs
all: swiftmpistepsim swiftmpirdmastepsim swiftmpirdmaonestepsim
swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h
mpicxx $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c
all: swiftmpistepsim swiftmpirdmastepsim swiftmpirdmaonestepsim swiftmpirdmastepsim2
swiftmpirdmastepsim: swiftmpirdmastepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h
mpicxx $(CFLAGS) -o swiftmpirdmastepsim swiftmpirdmastepsim.c mpiuse.c clocks.c -Linfinity -linfinity -libverbs
swiftmpistepsim: swiftmpistepsim.c $(DEPS)
mpicxx $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c $(SOURCES)
swiftmpirdmaonestepsim: swiftmpirdmaonestepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h
mpicxx $(CFLAGS) -o swiftmpirdmaonestepsim swiftmpirdmaonestepsim.c mpiuse.c clocks.c -Linfinity -linfinity -libverbs
swiftmpirdmastepsim: swiftmpirdmastepsim.c $(DEPS)
mpicxx $(CFLAGS) -o swiftmpirdmastepsim swiftmpirdmastepsim.c $(SOURCES) $(INFINITY)
swiftmpirdmaonestepsim: swiftmpirdmaonestepsim.c $(DEPS)
mpicxx $(CFLAGS) -o swiftmpirdmaonestepsim swiftmpirdmaonestepsim.c $(SOURCES) $(INFINITY)
swiftmpirdmastepsim2: swiftmpirdmastepsim2.c $(DEPS)
mpicxx $(CFLAGS) -o swiftmpirdmastepsim2 swiftmpirdmastepsim2.c $(SOURCES) $(INFINITY)
clean:
rm -f swiftmpistepsim
......
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
// Pure RDMA version, we use MPI for process control and synchronization.
#include <arpa/inet.h>
#include <limits.h>
#include <mpi.h>
#include <netdb.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <infinity/core/Context.h>
#include <infinity/memory/Buffer.h>
#include <infinity/memory/RegionToken.h>
#include <infinity/queues/QueuePair.h>
#include <infinity/queues/QueuePairFactory.h>
#include <infinity/requests/RequestToken.h>
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"
/* Our rank for all to see. */
int myrank = -1;
/* Number of ranks. */
static int nr_ranks;
/* Base port no. Ranks use +rank. */
static int BASE_PORT = 27771;
/* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as
* as we need to align in memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Size of message header in blocks. The rank, subtype, size and tag. */
static const size_t HEADER_SIZE = 4;
/* Size of a message board in blocks, we have one of these per rank per
* communicator (i.e. per window). */
static size_t MESSAGE_SIZE = 0;
/* Are we verbose. */
static int verbose = 0;
/* Scale to apply to the size of the messages we send. */
static float messagescale = 1.0;
/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;
/* Global communicators for each of the subtypes. */
#define task_subtype_count 22 // Just some upper limit on subtype.
/* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue;
static int volatile nr_send = 0;
static int volatile todo_send = 0;
/* Local receive queue. */
static int volatile nr_recvs[16] = {0};
static struct mpiuse_log_entry **volatile recvs_queue[16];
/* Starting up the server. */
static int volatile starting[16] = {1};
/**
* @brief Find an IP address for the given hostname.
*
* @param hostname the hostname
*
* @result the IP address, note copy away to keep.
*/
static char *toipaddr(char *hostname) {
struct hostent *hostent = gethostbyname(hostname);
if (hostent == NULL) {
error("Failed to convert hostname '%s' to an IP address", hostname);
}
struct in_addr **addr_list = (struct in_addr **)hostent->h_addr_list;
return inet_ntoa(*addr_list[0]);
}
/**
* @brief Convert a byte count into a number of blocks, rounds up.
*
* @param nr_bytes the number of bytes.
*
* @result the number of blocks needed.
*/
static int toblocks(BLOCKTYPE nr_bytes) {
return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
}
/**
* @brief Convert a block count into a number of bytes.
*
* @param nr_block the number of blocks.
*
* @result the number of bytes.
*/
static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) {
return (nr_blocks * BYTESINBLOCK);
}
/**
* @brief fill a data area with given value.
*
* @param size size of data in bytes.
* @param data the data to fill.
* @param value the value to fill.
*/
static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) {
for (BLOCKTYPE i = 0; i < size; i++) {
data[i] = value;
}
}
/**
* @brief test a filled data area for a value.
*
* @param size size of data in bytes.
* @param data the data to check.
* @param value the value expected.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) {
for (size_t i = 0; i < size; i++) {
if (data[i] != value) {
message("see %zd expected %zd @ %zd (%zd to go)", data[i], value, i,
size);
return 0;
}
}
return 1;
}
/* Struct of server ip addresses as formatted strings.*/
struct servers {
char *ip;
};
/**
* @brief Send thread, sends RDMA messages to the other ranks.
*
* Messages are all considered in order.
*/
static void *send_thread(void *arg) {
ticks starttics = getticks();
// Get the destination IPs and ranks.
struct servers *servers = (struct servers *)arg;
// Need a factory to create QPs.
infinity::core::Context *context = new infinity::core::Context();
infinity::queues::QueuePairFactory *qpFactory =
new infinity::queues::QueuePairFactory(context);
// Create the QPs connecting to all the other ranks. Note we cannot do this
// until the related servers are up and running, so make sure that is true..
infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)
calloc(nr_ranks, sizeof(infinity::queues::QueuePair *));
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
char *ip = &servers->ip[k * MPI_MAX_PROCESSOR_NAME];
if (verbose)
message("%d waiting for connection to remote server %s %d on %d", myrank,
ip, k, BASE_PORT + myrank);
qps[k] = qpFactory->connectToRemoteHost(ip, BASE_PORT + myrank);
if (verbose)
message("%d connected to remote server %s %d on %d", myrank, ip,
k, BASE_PORT + myrank);
}
}
for (int k = 0; k < nr_send; k++) {
struct mpiuse_log_entry *log = send_queue[k];
/* Data has the actual data and room for the header. */
BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
BLOCKTYPE *dataptr = (BLOCKTYPE *)calloc(datasize, BYTESINBLOCK);
log->data = dataptr;
log->injtic = getticks();
/* Fill data with pattern. */
if (datacheck)
datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE], log->tag);
/* First element has our rank, other elements replicate what we need to
* define an MPI message. */
dataptr[0] = myrank;
dataptr[1] = log->subtype;
dataptr[2] = log->size;
dataptr[3] = log->tag;
/* Need to assign to a buffer to register memory. */
auto *sendBuffer =
new infinity::memory::Buffer(context, dataptr, tobytes(datasize));
// And send
infinity::requests::RequestToken requestToken(context);
qps[log->otherrank]->send(sendBuffer, &requestToken);
requestToken.waitUntilCompleted();
log->endtic = getticks();
delete sendBuffer; // XXX Can we reuse ?
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
for (int k = 0; k < nr_ranks; k++) delete qps[k];
free(qps);
delete qpFactory;
delete context;
return NULL;
}
/**
* @brief recv thread, listens for remote sends from another rank.
*/
static void *recv_thread(void *arg) {
int rank = *(int *)arg;
ticks starttics = getticks();
// Each receive port needs a factory to create QPs.
auto *context = new infinity::core::Context();
auto *qpFactory = new infinity::queues::QueuePairFactory(context);
// Create buffer to receive messages. This is big enough for the largest
// message.
auto *receiveBuffer =
new infinity::memory::Buffer(context, tobytes(MESSAGE_SIZE));
context->postReceiveBuffer(receiveBuffer);
// Port binding.
if (verbose)
message("%d binding to %d on port %d", myrank, rank, BASE_PORT + rank);
qpFactory->bindToPort(BASE_PORT + rank);
if (verbose) message("Blocking for first message on %d", BASE_PORT + rank);
starting[rank] = 0; // really need to do this in acceptIncomingConnection().
auto qp = qpFactory->acceptIncomingConnection();
if (verbose)
message("Accepting incoming connections on %d", BASE_PORT + rank);
/* No. of receives to process and associated queue. */
int todo_recv = nr_recvs[rank];
struct mpiuse_log_entry **recv_queue = recvs_queue[rank];
/* We loop while new requests are being send and we still have messages
* to receive. */
infinity::core::receive_element_t receiveElement;
while (todo_recv > 0) {
while (!context->receive(&receiveElement))
;
// Unpack the header.
BLOCKTYPE *dataptr = (BLOCKTYPE *)receiveElement.buffer->getData();
int rank = dataptr[0];
int subtype = dataptr[1];
size_t size = dataptr[2];
int tag = dataptr[3];
/* Now find the associated log. XXX speed this up, local queue. */
int found = 0;
for (int k = 0; k < nr_recvs[rank]; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done) {
/* On the first attempt we start listening for this receive. */
if (log->injtic == 0) log->injtic = getticks();
if (log->otherrank == rank && log->subtype == subtype &&
log->size == size && log->tag == tag) {
found = 1;
if (verbose)
message("receive message subtype %d from %d on %d", log->subtype,
rank, myrank);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], log->tag)) {
message("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
// free(log->data); // XXX should really offload the data to be fair.
log->endtic = getticks();
todo_recv--;
}
}
}
if (!found) {
error(
"No matching receive on connections to %d (%d of %d todo:"
" rank = %d otherrank = %d subtype = %d size = %zd tag = %d)",
BASE_PORT + rank, todo_recv, nr_recvs[rank], myrank, rank, subtype,
size, tag);
}
// Ready for next use of buffer?
context->postReceiveBuffer(receiveElement.buffer);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
delete receiveBuffer;
delete context;
delete qp;
delete qpFactory;
/* Thread exits. */
return NULL;
}
/**
* @brief Comparison function for tags.
*/
static int cmp_logs(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
if (l1->tic > l2->tic) return 1;
if (l1->tic < l2->tic) return -1;
return 0;
}
/**
* @brief Pick out the relevant logging data for our rank.
*/
static size_t pick_logs() {
size_t nlogs = mpiuse_nr_logs();
size_t maxsize = 0;
/* Queues of send and receive logs. */
send_queue =
(struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *));
nr_send = 0;
struct mpiuse_log_entry **recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
int nr_recv = 0;
for (size_t k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->activation) {
if (log->rank == myrank) {
log->done = 0;
log->injtic = 0;
log->endtic = 0;
log->data = NULL;
/* Scale size. */
log->size *= messagescale;
if (log->type == task_type_send) {
send_queue[nr_send] = log;
nr_send++;
} else if (log->type == task_type_recv) {
recv_queue[nr_recv] = log;
nr_recv++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
}
/* Across all ranks. */
if (log->size > maxsize) maxsize = log->size;
}
}
/* Sort into increasing tic. */
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
/* Now we need to count the numbers of messages to send per rank
* and create sub-queues for these.*/
for (int k = 0; k < nr_ranks; k++) nr_recvs[k] = 0;
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
nr_recvs[log->otherrank]++;
}
for (int k = 0; k < nr_ranks; k++) {
if (nr_recvs[k] > 0) {
recvs_queue[k] = (struct mpiuse_log_entry **)calloc(
nr_recvs[k], sizeof(struct mpiuse_log_entry *));
int i = 0;
for (int j = 0; j < nr_recv; j++) {
struct mpiuse_log_entry *log = recv_queue[j];
if (log->otherrank == k) {
recvs_queue[k][i] = recv_queue[j];
i++;
}
}
} else {
recvs_queue[k] = NULL;
}
}
free(recv_queue);
if (verbose) {
message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
nr_recv);
}
return maxsize;
}
/**
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n",
argv[0]);
fprintf(stderr, " options: -v verbose\n");
fflush(stderr);
}
/**
* @brief main function.
*/
int main(int argc, char *argv[]) {
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks);
if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int opt;
while ((opt = getopt(argc, argv, "vds:")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
break;
case 'v':
verbose = 1;
break;
case 's':
messagescale = atof(optarg);
break;
default:
if (myrank == 0) usage(argv);
return 1;
}
}
if (optind >= argc - 1) {
if (myrank == 0) usage(argv);
return 1;
}
char *infile = argv[optind];
char *logfile = argv[optind + 1];
/* Now we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
* log per rank per step, so you need to combine all ranks from a step. */
mpiuse_log_restore(infile);
int nranks = mpiuse_nr_ranks();
/* This should match the expected size. */
if (nr_ranks != nranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_ranks);
/* Extract the send and recv messages for our rank. */
size_t maxsize = pick_logs();
/* Size of a message board. Needs to align on size_t. */
MESSAGE_SIZE = toblocks(maxsize) + HEADER_SIZE;
/* Now for the RDMA setup. We need the IP addresses of all the ranks. */
/* Each rank can find its name and IP. */
char name[MPI_MAX_PROCESSOR_NAME];
int namelen = 0;
MPI_Get_processor_name(name, &namelen);
char ip[MPI_MAX_PROCESSOR_NAME];
strncpy(ip, toipaddr(name), MPI_MAX_PROCESSOR_NAME);
/* And distribute, so we all know everyone's IPs. */
struct servers servers;
servers.ip = (char *)malloc(sizeof(char) * nr_ranks * MPI_MAX_PROCESSOR_NAME);
MPI_Allgather(ip, MPI_MAX_PROCESSOR_NAME, MPI_BYTE, servers.ip,
MPI_MAX_PROCESSOR_NAME, MPI_BYTE, MPI_COMM_WORLD);
if (myrank == 0) {
message("RDMA servers will listen on:");
for (int j = 0; j < nr_ranks; j++) {
for (int k = 0; k < nr_ranks; k++) {
if (k != j) {
message(" %d: %s on port %d", j,
&servers.ip[j * MPI_MAX_PROCESSOR_NAME], BASE_PORT + k);
}
}
}
}
for (int k = 0; k < nr_ranks; k++) starting[k] = 1;
/* Time to start time. Try to make it synchronous across the ranks. */
clocks_set_cpufreq(0);
long long freq = clocks_get_cpufreq();
MPI_Barrier(MPI_COMM_WORLD);
clocks_set_cpufreq(freq);
if (myrank == 0) {
message("Start of MPI tests");
message("==================");
if (verbose) {
if (datacheck)
message("checking data pattern on send and recv completion");
}
}
/* Make a server thread per sending rank. Note we need to start all the
* server threads first. */
pthread_t recvthread[nr_ranks];
int *ranks = (int *)malloc(nr_ranks * sizeof(int));
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
ranks[k] = k;
if (pthread_create(&recvthread[k], NULL, &recv_thread, &ranks[k]) != 0)
error("Failed to create recv thread.");
}
}
// Wait on all the local servers to start.
int ready = 0;
while (ready != nr_ranks - 1) {
ready = 0;
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
if (!starting[k]) ready++;
}
}
}
message("All servers are started");
/* Reset time as previous can be thought of as setup costs? */
MPI_Barrier(MPI_COMM_WORLD); // Vital...
clocks_set_cpufreq(freq);
message("All synchronized");
/* Now we have a single to send the messages. */
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &servers) != 0)
error("Failed to create send thread.");
/* Wait until all threads have exited and all message exchanges have
* completed. */
pthread_join(sendthread, NULL);
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
pthread_join(recvthread[k], NULL);
}
}
MPI_Barrier(MPI_COMM_WORLD);
/* Dump the updated MPI logs. */
fflush(stdout);
if (myrank == 0) message("Dumping updated log");
mpiuse_dump_logs(nranks, logfile);
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
/* Free resources. */
free(servers.ip);
free(ranks);
if (myrank == 0) message("Bye");
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment