diff --git a/Makefile b/Makefile index 56aa0e1f7a3cb41f62ed7a1ab31de11808d5daa7..57a0353bb52f7f2bb21f7f5a9ee9d5a82fc5a051 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ #CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined -CFLAGS = -g -O3 -Wall -Iinfinity/include +CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_ASSERT_ON #CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread INCLUDES = mpiuse.h atomic.h cycle.h clocks.h error.h @@ -9,7 +9,7 @@ DEPS = Makefile $(SOURCES) $(INCLUDES) INFINITY = -Linfinity -linfinity -libverbs -all: swiftmpistepsim swiftmpirdmastepsim swiftmpirdmaonestepsim swiftmpirdmastepsim2 +all: swiftmpistepsim swiftmpirdmastepsim swiftmpirdmaonestepsim swiftmpirdmastepsim2 swiftmpirdmastepsim3 swiftmpistepsim: swiftmpistepsim.c $(DEPS) mpicxx $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c $(SOURCES) @@ -23,9 +23,14 @@ swiftmpirdmaonestepsim: swiftmpirdmaonestepsim.c $(DEPS) swiftmpirdmastepsim2: swiftmpirdmastepsim2.c $(DEPS) mpicxx $(CFLAGS) -o swiftmpirdmastepsim2 swiftmpirdmastepsim2.c $(SOURCES) $(INFINITY) +swiftmpirdmastepsim3: swiftmpirdmastepsim3.c $(DEPS) + mpicxx $(CFLAGS) -o swiftmpirdmastepsim3 swiftmpirdmastepsim3.c $(SOURCES) $(INFINITY) + clean: rm -f swiftmpistepsim rm -f swiftmpirdmastepsim rm -f swiftmpirdmaonestepsim + rm -f swiftmpirdmastepsim2 + rm -f swiftmpirdmastepsim3 diff --git a/swiftmpirdmastepsim3.c b/swiftmpirdmastepsim3.c new file mode 100644 index 0000000000000000000000000000000000000000..6e92e8e2e5be6cfff1ba3dfb0b56716387c04676 --- /dev/null +++ b/swiftmpirdmastepsim3.c @@ -0,0 +1,604 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2020 Peter W. Draper + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +// Pure RDMA version, we use MPI for process control and synchronization. + +#include <arpa/inet.h> +#include <limits.h> +#include <mpi.h> +#include <netdb.h> +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include <infinity/core/Context.h> +#include <infinity/memory/Buffer.h> +#include <infinity/memory/RegionToken.h> +#include <infinity/queues/QueuePair.h> +#include <infinity/queues/QueuePairFactory.h> +#include <infinity/requests/RequestToken.h> + +#include "atomic.h" +#include "clocks.h" +#include "error.h" +#include "mpiuse.h" + +/* Our rank for all to see. */ +int myrank = -1; + +/* CPU frequency estimate, shared so we do this once. */ +static long long cpufreq = 0; + +/* Number of ranks. */ +static int nr_ranks; + +/* Base port no. Ranks use +rank. */ +static int BASE_PORT = 27771; + +/* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as + * as we need to align in memory. */ +#define BLOCKTYPE size_t +#define MPI_BLOCKTYPE MPI_AINT +static const int BYTESINBLOCK = sizeof(BLOCKTYPE); + +/* Size of message header in blocks. The rank, subtype, size and tag. */ +static const size_t HEADER_SIZE = 4; + +/* Size of a message board in blocks, we have one of these per rank per + * communicator (i.e. per window). */ +static size_t MESSAGE_SIZE = 0; + +/* Are we verbose. */ +static int verbose = 0; + +/* Scale to apply to the size of the messages we send. */ +static float messagescale = 1.0; + +/* Set a data pattern and check we get this back, slow... */ +static int datacheck = 0; + +/* Integer types of send and recv tasks, must match log. */ +static const int task_type_send = 22; +static const int task_type_recv = 23; + +/* Global communicators for each of the subtypes. */ +#define task_subtype_count 22 // Just some upper limit on subtype. + +/* The local send queue. */ +static struct mpiuse_log_entry **volatile send_queue; +static int volatile nr_send = 0; + +/* Local receive queue. */ +static int volatile nr_recv = 0; +static struct mpiuse_log_entry **volatile recv_queue; + +/** + * @brief Find an IP address for the given hostname. + * + * @param hostname the hostname + * + * @result the IP address, note copy away to keep. + */ + +static char *toipaddr(char *hostname) { + + struct hostent *hostent = gethostbyname(hostname); + if (hostent == NULL) { + error("Failed to convert hostname '%s' to an IP address", hostname); + } + struct in_addr **addr_list = (struct in_addr **)hostent->h_addr_list; + return inet_ntoa(*addr_list[0]); +} + +/** + * @brief Convert a byte count into a number of blocks, rounds up. + * + * @param nr_bytes the number of bytes. + * + * @result the number of blocks needed. + */ +static int toblocks(BLOCKTYPE nr_bytes) { + return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK; +} + +/** + * @brief Convert a block count into a number of bytes. + * + * @param nr_block the number of blocks. + * + * @result the number of bytes. + */ +static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) { + return (nr_blocks * BYTESINBLOCK); +} + +/** + * @brief fill a data area with given value. + * + * @param size size of data in bytes. + * @param data the data to fill. + * @param value the value to fill. + */ +static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) { + for (BLOCKTYPE i = 0; i < size; i++) { + data[i] = value; + } +} + +/** + * @brief test a filled data area for a value. + * + * @param size size of data in bytes. + * @param data the data to check. + * @param value the value expected. + * + * @result 1 on success, 0 otherwise. + */ +static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) { + for (size_t i = 0; i < size; i++) { + if (data[i] != value) { + message("see %zd expected %zd @ %zd (%zd to go)", data[i], value, i, + size); + return 0; + } + } + return 1; +} + +/* Struct of server ip addresses as formatted strings.*/ +struct servers { + char *ip; +}; + +/** + * @brief Send thread, sends RDMA messages to the other ranks. + * + * Messages are all considered in order. + */ +static void *send_thread(void *arg) { + + // Get the destination IPs and ranks. + struct servers *servers = (struct servers *)arg; + + // Need a factory to create QPs. + infinity::core::Context *context = new infinity::core::Context(); + infinity::queues::QueuePairFactory *qpFactory = + new infinity::queues::QueuePairFactory(context); + + // Create the QPs connecting to all the other ranks. + infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **) + calloc(nr_ranks, sizeof(infinity::queues::QueuePair *)); + + // We need to listen for messages from the other rank servers that we can + // connect to them as they need to be up first. + int buf[1]; + MPI_Request reqs[nr_ranks]; + reqs[myrank] = MPI_REQUEST_NULL; + for (int k = 0; k < nr_ranks; k++) { + if (k != myrank) { + MPI_Irecv(buf, 1, MPI_INT, k, k, MPI_COMM_WORLD, &reqs[k]); + } + } + + /* Now we poll for any servers that are ready to connect. */ + int index; + MPI_Status status; + while (1) { + MPI_Waitany(nr_ranks, reqs, &index, &status); + + // All done when all requests have completed. + if (index == MPI_UNDEFINED) break; + + // Got one, so connect. + char *ip = &servers->ip[index * MPI_MAX_PROCESSOR_NAME]; + if (verbose) + message("%d waiting for connection to remote server %s %d on %d", myrank, + ip, index, BASE_PORT + myrank); + qps[index] = qpFactory->connectToRemoteHost(ip, BASE_PORT + myrank); + if (verbose) + message("%d connected to remote server %s %d on %d", myrank, ip, + index, BASE_PORT + myrank); + } + + // Startup complete, so start timing. + MPI_Barrier(MPI_COMM_WORLD); // Vital for synchronization. + clocks_set_cpufreq(cpufreq); + message("All synchronized"); + ticks starttics = getticks(); + + for (int k = 0; k < nr_send; k++) { + struct mpiuse_log_entry *log = send_queue[k]; + + /* Data has the actual data and room for the header. */ + BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE; + BLOCKTYPE *dataptr = (BLOCKTYPE *)calloc(datasize, BYTESINBLOCK); + log->data = dataptr; + log->injtic = getticks(); + + /* Fill data with pattern. */ + if (datacheck) + datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE], log->tag); + + /* First element has our rank, other elements replicate what we need to + * define an MPI message. */ + dataptr[0] = myrank; + dataptr[1] = log->subtype; + dataptr[2] = log->size; + dataptr[3] = log->tag; + + /* Need to assign to a buffer to register memory. */ + auto *sendBuffer = + new infinity::memory::Buffer(context, dataptr, tobytes(datasize)); + + // And send + infinity::requests::RequestToken requestToken(context); + qps[log->otherrank]->send(sendBuffer, &requestToken); + requestToken.waitUntilCompleted(); + log->endtic = getticks(); + + delete sendBuffer; // XXX Can we reuse ? + } + + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + + + for (int k = 0; k < nr_ranks; k++) delete qps[k]; + free(qps); + delete qpFactory; + delete context; + + return NULL; +} + +/** + * @brief recv thread, listens for remote sends from another rank. + */ +static void *recv_thread(void *arg) { + + struct servers *servers = (struct servers *)arg; + + // Need a factory to create QPs. + infinity::core::Context *context = new infinity::core::Context(); + infinity::queues::QueuePairFactory *qpFactory = + new infinity::queues::QueuePairFactory(context); + + // Create the QPs connecting to all the other ranks. + infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **) + calloc(nr_ranks, sizeof(infinity::queues::QueuePair *)); + + // Create buffers to receive messages. Each one is big enough for the + // largest message. + infinity::memory::Buffer **receiveBuffer = (infinity::memory::Buffer **) + calloc(nr_ranks, sizeof(infinity::memory::Buffer *)); + for (int k = 0; k < nr_ranks; k++) { + receiveBuffer[k] = new infinity::memory::Buffer(context, tobytes(MESSAGE_SIZE)); + context->postReceiveBuffer(receiveBuffer[k]); + } + + // Do the port binding for each other rank. + int buf[1]; + MPI_Request req; + for (int k = 0; k < nr_ranks; k++) { + if (k != myrank) { + if (verbose) + message("%d binding to %d on port %d", myrank, k, BASE_PORT + k); + qpFactory->bindToPort(BASE_PORT + k); + + // Send message this port is about to block for a connection. + if (verbose) message("Blocking for first message on %d", BASE_PORT + k); + MPI_Isend(buf, 1, MPI_INT, k, myrank, MPI_COMM_WORLD, &req); + qps[k] = qpFactory->acceptIncomingConnection(); + if (verbose) + message("Accepting incoming connections on %d", BASE_PORT + k); + } + } + + // Startup complete, so start timing. + ticks starttics = getticks(); + + /* No. of receives to process and associated queue. */ + int todo_recv = nr_recv; + + /* We loop while new requests are being send and we still have messages + * to receive. */ + infinity::core::receive_element_t receiveElement; + while (todo_recv > 0) { + + while (!context->receive(&receiveElement)) + ; + + // Unpack the header. + BLOCKTYPE *dataptr = (BLOCKTYPE *)receiveElement.buffer->getData(); + int rank = dataptr[0]; + int subtype = dataptr[1]; + size_t size = dataptr[2]; + int tag = dataptr[3]; + + /* Now find the associated log. XXX speed this up, local queue. */ + int found = 0; + for (int k = 0; k < nr_recv; k++) { + struct mpiuse_log_entry *log = recv_queue[k]; + if (log != NULL && !log->done) { + + /* On the first attempt we start listening for this receive. */ + if (log->injtic == 0) log->injtic = getticks(); + + if (log->otherrank == rank && log->subtype == subtype && + log->size == size && log->tag == tag) { + found = 1; + + if (verbose) + message("receive message subtype %d from %d on %d", log->subtype, + rank, myrank); + + /* Check data sent data is unchanged and received data is as + * expected. */ + if (datacheck && !datacheck_test(toblocks(log->size), + &dataptr[HEADER_SIZE], log->tag)) { + message("Data mismatch on completion"); + } + + /* Done, clean up. */ + log->done = 1; + // free(log->data); // XXX should really offload the data to be fair. + log->endtic = getticks(); + todo_recv--; + } + } + } + if (!found) { + error("No matching receive on"); + } + + // Ready for next use of buffer? + context->postReceiveBuffer(receiveElement.buffer); + } + + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + + delete receiveBuffer; + delete context; + delete qps; + delete qpFactory; + + /* Thread exits. */ + return NULL; +} + +/** + * @brief Comparison function for tags. + */ +static int cmp_logs(const void *p1, const void *p2) { + struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1; + struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2; + + if (l1->tic > l2->tic) return 1; + if (l1->tic < l2->tic) return -1; + return 0; +} + +/** + * @brief Pick out the relevant logging data for our rank. + */ +static size_t pick_logs() { + + size_t nlogs = mpiuse_nr_logs(); + size_t maxsize = 0; + + /* Queues of send and receive logs. */ + send_queue = + (struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *)); + nr_send = 0; + + recv_queue = + (struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *)); + nr_recv = 0; + + for (size_t k = 0; k < nlogs; k++) { + struct mpiuse_log_entry *log = mpiuse_get_log(k); + if (log->activation) { + if (log->rank == myrank) { + log->done = 0; + log->injtic = 0; + log->endtic = 0; + log->data = NULL; + + /* Scale size. */ + log->size *= messagescale; + + if (log->type == task_type_send) { + send_queue[nr_send] = log; + nr_send++; + } else if (log->type == task_type_recv) { + recv_queue[nr_recv] = log; + nr_recv++; + } else { + error("task type '%d' is not a known send or recv task", log->type); + } + } + + /* Across all ranks. */ + if (log->size > maxsize) maxsize = log->size; + } + } + + /* Sort into increasing tic. */ + qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs); + qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs); + + if (verbose) { + message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send, + nr_recv); + } + + return maxsize; +} + +/** + * @brief usage help. + */ +static void usage(char *argv[]) { + fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n", + argv[0]); + fprintf(stderr, " options: -v verbose\n"); + fflush(stderr); +} + +/** + * @brief main function. + */ +int main(int argc, char *argv[]) { + + /* Initiate MPI. */ + int prov = 0; + int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov); + if (res != MPI_SUCCESS) + error("Call to MPI_Init_thread failed with error %i.", res); + + res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks); + if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res); + + res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + if (res != MPI_SUCCESS) + error("Call to MPI_Comm_rank failed with error %i.", res); + + /* Handle the command-line, we expect a mpiuse data file to read and various + * options. */ + int opt; + while ((opt = getopt(argc, argv, "vds:")) != -1) { + switch (opt) { + case 'd': + datacheck = 1; + break; + case 'v': + verbose = 1; + break; + case 's': + messagescale = atof(optarg); + break; + default: + if (myrank == 0) usage(argv); + return 1; + } + } + if (optind >= argc - 1) { + if (myrank == 0) usage(argv); + return 1; + } + char *infile = argv[optind]; + char *logfile = argv[optind + 1]; + + /* Now we read the SWIFT MPI logger output that defines the communcations + * we will undertake and the time differences between injections into the + * queues. Note this has all ranks for a single steps, SWIFT outputs one MPI + * log per rank per step, so you need to combine all ranks from a step. */ + mpiuse_log_restore(infile); + int nranks = mpiuse_nr_ranks(); + + /* This should match the expected size. */ + if (nr_ranks != nranks) + error("The number of MPI ranks %d does not match the expected value %d", + nranks, nr_ranks); + + /* Extract the send and recv messages for our rank. */ + size_t maxsize = pick_logs(); + + /* Size of a message board. Needs to align on size_t. */ + MESSAGE_SIZE = toblocks(maxsize) + HEADER_SIZE; + + /* Now for the RDMA setup. We need the IP addresses of all the ranks. */ + + /* Each rank can find its name and IP. */ + char name[MPI_MAX_PROCESSOR_NAME]; + int namelen = 0; + MPI_Get_processor_name(name, &namelen); + char ip[MPI_MAX_PROCESSOR_NAME]; + strncpy(ip, toipaddr(name), MPI_MAX_PROCESSOR_NAME); + + /* And distribute, so we all know everyone's IPs. */ + struct servers servers; + servers.ip = (char *)malloc(sizeof(char) * nr_ranks * MPI_MAX_PROCESSOR_NAME); + MPI_Allgather(ip, MPI_MAX_PROCESSOR_NAME, MPI_BYTE, servers.ip, + MPI_MAX_PROCESSOR_NAME, MPI_BYTE, MPI_COMM_WORLD); + + if (myrank == 0) { + message("RDMA servers will listen on:"); + for (int j = 0; j < nr_ranks; j++) { + for (int k = 0; k < nr_ranks; k++) { + if (k != j) { + message(" %d: %s on port %d", j, + &servers.ip[j * MPI_MAX_PROCESSOR_NAME], BASE_PORT + k); + } + } + } + } + + /* Time to start time. Try to make it synchronous across the ranks. + * Note we reset this once more when the startup costs have been + * done. */ + clocks_set_cpufreq(0); + cpufreq = clocks_get_cpufreq(); + MPI_Barrier(MPI_COMM_WORLD); + clocks_set_cpufreq(cpufreq); + if (myrank == 0) { + message("Start of MPI tests"); + message("=================="); + if (verbose) { + if (datacheck) + message("checking data pattern on send and recv completion"); + } + } + + /* Make a server thread that listens for connections. */ + pthread_t recvthread; + if (pthread_create(&recvthread, NULL, &recv_thread, &servers) != 0) + error("Failed to create recv thread."); + + /* Now we have a single to send the messages. */ + pthread_t sendthread; + if (pthread_create(&sendthread, NULL, &send_thread, &servers) != 0) + error("Failed to create send thread."); + + /* Wait until all threads have exited and all message exchanges have + * completed. */ + pthread_join(sendthread, NULL); + pthread_join(recvthread, NULL); + MPI_Barrier(MPI_COMM_WORLD); + + /* Dump the updated MPI logs. */ + fflush(stdout); + if (myrank == 0) message("Dumping updated log"); + mpiuse_dump_logs(nranks, logfile); + + /* Shutdown MPI. */ + res = MPI_Finalize(); + if (res != MPI_SUCCESS) + error("call to MPI_Finalize failed with error %i.", res); + + /* Free resources. */ + free(servers.ip); + + if (myrank == 0) message("Bye"); + + return 0; +}