From 990efdfde9612100019a9f7e44be60cfeb162311 Mon Sep 17 00:00:00 2001 From: "Peter W. Draper" <p.w.draper@durham.ac.uk> Date: Thu, 10 Dec 2020 14:44:46 +0000 Subject: [PATCH] Extract infinity into wrapper functions --- Makefile | 13 +- infinity_wrapper.c | 346 ++++++++++++++++ infinity_wrapper.h | 46 +++ swiftmpirdmaonestepsim_wrapper.c | 656 +++++++++++++++++++++++++++++++ 4 files changed, 1057 insertions(+), 4 deletions(-) create mode 100644 infinity_wrapper.c create mode 100644 infinity_wrapper.h create mode 100644 swiftmpirdmaonestepsim_wrapper.c diff --git a/Makefile b/Makefile index 27c8f7a..3a5d360 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ -#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined +CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined -DINFINITY_DEBUG_ON -DINFINITY_ASSERT_ON #CFLAGS = -g -O3 -Wall -Iinfinity/include -CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_DEBUG_ON -DINFINITY_ASSERT_ON +#CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_DEBUG_ON -DINFINITY_ASSERT_ON #CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread INCLUDES = mpiuse.h atomic.h cycle.h clocks.h error.h @@ -9,10 +9,10 @@ DEPS = Makefile $(SOURCES) $(INCLUDES) INFINITY = -Linfinity -linfinity -libverbs - all: swiftmpistepsim swiftmpistepsim2 \ swiftmpirdmastepsim swiftmpirdmastepsim2 swiftmpirdmastepsim3 \ - swiftmpirdmaonestepsim swiftmpirdmaonestepsim2 + swiftmpirdmaonestepsim swiftmpirdmaonestepsim2 \ + swiftmpirdmaonestepsim_wrapper swiftmpistepsim: swiftmpistepsim.c $(DEPS) mpicxx $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c $(SOURCES) @@ -35,6 +35,10 @@ swiftmpirdmastepsim2: swiftmpirdmastepsim2.c $(DEPS) swiftmpirdmastepsim3: swiftmpirdmastepsim3.c $(DEPS) mpicxx $(CFLAGS) -o swiftmpirdmastepsim3 swiftmpirdmastepsim3.c $(SOURCES) $(INFINITY) +swiftmpirdmaonestepsim_wrapper: swiftmpirdmaonestepsim_wrapper.c $(DEPS) infinity_wrapper.h infinity_wrapper.c + mpicxx $(CFLAGS) -o swiftmpirdmaonestepsim_wrapper swiftmpirdmaonestepsim_wrapper.c \ + infinity_wrapper.c $(SOURCES) $(INFINITY) + clean: rm -f swiftmpistepsim rm -f swiftmpistepsim2 @@ -43,5 +47,6 @@ clean: rm -f swiftmpirdmaonestepsim2 rm -f swiftmpirdmastepsim2 rm -f swiftmpirdmastepsim3 + rm -f swiftmpirdmastepsim_wrapper diff --git a/infinity_wrapper.c b/infinity_wrapper.c new file mode 100644 index 0000000..105a476 --- /dev/null +++ b/infinity_wrapper.c @@ -0,0 +1,346 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2020 Peter W. Draper + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +/** + * @brief simple C wrapper for the C++ infinity library, only provides an + * interface to meta capabilities we use. Note still exposed to C++ linkage + * from the infinity library so we must use a C++ compiler. + */ +/* Config parameters. */ +//#include "../config.h" +#define HAVE_INFINITY + +/* Standard includes. */ +#include <arpa/inet.h> +#ifdef WITH_MPI +#include <mpi.h> +#endif +#include <netdb.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +/* Infinity C++ headers. */ +#ifdef HAVE_INFINITY +#include <infinity/core/Context.h> +#include <infinity/memory/Buffer.h> +#include <infinity/memory/RegionToken.h> +#include <infinity/queues/QueuePair.h> +#include <infinity/queues/QueuePairFactory.h> +#include <infinity/requests/RequestToken.h> +#endif + +/* Local defines. */ +#include "infinity_wrapper.h" + +/* Local includes. */ +#include "error.h" + +/* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as + * as we need to align in memory. */ +#define BLOCKTYPE size_t +#define MPI_BLOCKTYPE MPI_AINT +static const int BYTESINBLOCK = sizeof(BLOCKTYPE); + +/* Flags for controlling access. High end of size_t. */ +static size_t UNLOCKED = (((size_t)2 << 63) - 1); + +/* Struct of QPs and associated data. */ +struct qps_data { + int nr_qps; + infinity::core::Context *context; + infinity::queues::QueuePairFactory *factory; + infinity::queues::QueuePair **qps; + infinity::memory::Buffer **receive_buffers; + infinity::memory::RegionToken **remote_buffers; + infinity::memory::Buffer **readwrite_buffers; + infinity::memory::RegionToken **token_buffers; +}; + +/** + * @brief Find an IP address for the given hostname. + * + * @param hostname the hostname + * + * @result the IP address, note copy away to keep. + */ + +static char *toipaddr(char *hostname) { + + struct hostent *hostent = gethostbyname(hostname); + if (hostent == NULL) { + error("Failed to convert hostname '%s' to an IP address", hostname); + } + struct in_addr **addr_list = (struct in_addr **)hostent->h_addr_list; + return inet_ntoa(*addr_list[0]); +} + +/** + * @brief Create a QPs to connect a group of clients to a group of servers. + * + * Requires that infinity_create_servers is also running, otherwise we + * block waiting for the connections. + * + * @param servers a #mpi_servers struct with the server details. + * @param nr_servers the number of servers expected to connect. + * @param myrank the MPI rank of this process. + * @param verbose if 1 then report the connections made. + * + * @return handle for the QPs and related data. + */ +void *infinity_connect_clients(struct mpi_servers *servers, int nr_servers, + int myrank, int verbose) { +#ifdef HAVE_INFINITY + + /* Struct to hold all the persistent data. */ + struct qps_data *cqps = (struct qps_data *)calloc(1, sizeof(struct qps_data)); + + /* Need a factory to create QPs. */ + cqps->context = new infinity::core::Context(); + cqps->factory = new infinity::queues::QueuePairFactory(cqps->context); + + /* Create the QPs connecting to all the other ranks. */ + cqps->qps = (infinity::queues::QueuePair **) + calloc(nr_servers, sizeof(infinity::queues::QueuePair *)); + cqps->nr_qps = nr_servers; + + /* Space for the pointers to the remote memory. */ + cqps->remote_buffers = (infinity::memory::RegionToken **) + calloc(nr_servers, sizeof(infinity::memory::RegionToken *)); + + /* We need to listen for messages from the other rank servers that we can + * connect to them as they need to be up first. */ + int buf[1]; + MPI_Request reqs[nr_servers]; + for (int k = 0; k < nr_servers; k++) { + if (k != myrank) { + MPI_Irecv(buf, 1, MPI_INT, k, k, MPI_COMM_WORLD, &reqs[k]); + } else { + reqs[myrank] = MPI_REQUEST_NULL; + } + } + + /* Now we poll for any servers that are ready to connect. */ + int index; + MPI_Status status; + while (1) { + MPI_Waitany(nr_servers, reqs, &index, &status); + + /* All done when all requests have completed. */ + if (index == MPI_UNDEFINED) break; + + /* Got one, so connect. */ + char *ip = &servers->ip[index * infinity_max_server_ip]; + if (verbose) + message("%d waiting for connection to remote server %s %d on %d", myrank, + ip, index, BASE_PORT + myrank); + cqps->qps[index] = cqps->factory->connectToRemoteHost(ip, BASE_PORT + myrank); + if (verbose) + message("%d connected to remote server %s %d on %d", myrank, ip, index, + BASE_PORT + myrank); + + /* Remote buffer access */ + cqps->remote_buffers[index] = + (infinity::memory::RegionToken *)cqps->qps[index]->getUserData(); + } + + /* Result is opaque. */ + return (void *)cqps; + +#else + return NULL; +#endif +} + +/** + * @brief Send a buffer to a server listening on a QP. + * + * @param qphandle the handle from infinity_connect_clients. + * @param index index of the server to send to. + * @param buffer the buffer to send, should be block aligned. + * @param size the size of the buffer in bytes. + * @param offset the offset into the remote buffer, note in bytes not blocks. + */ +void infinity_send_data(void *qphandle, int index, void *buffer, size_t size, + size_t offset) { +#ifdef HAVE_INFINITY + struct qps_data *cqps = (struct qps_data *)qphandle; + + /* Need to assign to a buffer to register memory. XXX make this as big as + * necessary per server and reuse. */ + auto *sendBuffer = + new infinity::memory::Buffer(cqps->context, buffer, size); + + /* And send. */ + infinity::requests::RequestToken requestToken(cqps->context); + cqps->qps[index]->write(sendBuffer, + 0, // localOffset + cqps->remote_buffers[index], // destination + offset, // remoteOffset + size, // sizeInBytes + infinity::queues::OperationFlags(), + &requestToken); + requestToken.waitUntilCompleted(); + requestToken.reset(); + + /* Now we update the unlock field. */ + ((BLOCKTYPE *)sendBuffer->getData())[0] = UNLOCKED; + cqps->qps[index]->write(sendBuffer, + 0, // localOffset + cqps->remote_buffers[index], // destination + offset, // remoteOffset + BYTESINBLOCK, // sizeInBytes + infinity::queues::OperationFlags(), + &requestToken); + requestToken.waitUntilCompleted(); // Since we reuse the sendBuffer. + + delete sendBuffer; + +#endif + return; +} + +/* @brief Free the resource associated with handle. + * + * @param qphandle the handle from infinity_connect_clients. + */ +void infinity_clients_free(void *qphandle) { + +#ifdef HAVE_INFINITY + struct qps_data *cqps = (struct qps_data *)qphandle; + for (int k = 0; k < cqps->nr_qps; k++) delete cqps->qps[k]; + free(cqps->qps); + delete cqps->factory; + delete cqps->context; + free(cqps->receive_buffers); + free(cqps->remote_buffers); + if (cqps->readwrite_buffers != NULL) { + for (int k = 0; k < cqps->nr_qps; k++) delete cqps->readwrite_buffers[k]; + free(cqps->readwrite_buffers); + } + if (cqps->token_buffers != NULL) { + for (int k = 0; k < cqps->nr_qps; k++) delete cqps->token_buffers[k]; + free(cqps->token_buffers); + } + free(cqps); +#endif + return; +} + +/** + * @brief Create QPs for server to receive data from our clients. + * + * Requires that infinity_connect_clients is also ran, otherwise we + * block waiting for the connections. + * + * @param servers a #mpi_servers struct with the server details. + * @param nr_servers the number of servers we will create. + * @param sizes the sizes, in bytes, of the various windows needed to receive + * all the remote data from a client. Array size of nr_servers. + * @param myrank the MPI rank of this process. + * @param verbose if 1 then report the connections made. + * + * @return handle for the QPs and related data. + */ +void *infinity_create_servers(struct mpi_servers *servers, int nr_servers, + size_t *sizes, int myrank, int verbose) { + +#ifdef HAVE_INFINITY + /* Struct to hold all the persistent data. */ + struct qps_data *cqps = (struct qps_data *)calloc(1, sizeof(struct qps_data)); + + /* Need a factory to create QPs. */ + cqps->context = new infinity::core::Context(); + cqps->factory = new infinity::queues::QueuePairFactory(cqps->context); + + /* Create the QPs connecting to all the other ranks. */ + cqps->qps = (infinity::queues::QueuePair **) + calloc(nr_servers, sizeof(infinity::queues::QueuePair *)); + cqps->nr_qps = nr_servers; + + /* Create buffers to receive all the remote data. */ + cqps->readwrite_buffers = (infinity::memory::Buffer **) + calloc(nr_servers, sizeof(infinity::memory::Buffer *)); + cqps->token_buffers = (infinity::memory::RegionToken **) + calloc(nr_servers, sizeof(infinity::memory::RegionToken *)); + + for (int k = 0; k < nr_servers; k++) { + if (sizes[k] > 0) { + cqps->readwrite_buffers[k] = + new infinity::memory::Buffer(cqps->context, sizes[k]); + } else { + /* Dummy: not expecting any data, but setup anyway. */ + cqps->readwrite_buffers[k] = + new infinity::memory::Buffer(cqps->context, BYTESINBLOCK); + } + cqps->token_buffers[k] = cqps->readwrite_buffers[k]->createRegionToken(); + } + + /* Do the port binding for each other rank. */ + int buf[1]; + MPI_Request req; + for (int k = 0; k < nr_servers; k++) { + if (k != myrank) { + if (verbose) + message("%d binding to %d on port %d", myrank, k, BASE_PORT + k); + cqps->factory->bindToPort(BASE_PORT + k); + + /* Send message this port is about to block for a connection. */ + if (verbose) message("Blocking for first message on %d", BASE_PORT + k); + MPI_Isend(buf, 1, MPI_INT, k, myrank, MPI_COMM_WORLD, &req); + cqps->qps[k] = cqps->factory->acceptIncomingConnection + (cqps->token_buffers[k], sizeof(infinity::memory::RegionToken)); + if (verbose) + message("Accepting incoming connections on %d", BASE_PORT + k); + } + } + + return (void *)cqps; +#else + return NULL; +#endif +} + +/** + * @brief Check if data is ready, that is has arrived. + * + * @param qphandle the handle from infinity_create_servers. + * @param index index of the client we are checking. + * @param offset the offset of this data in the RDMA buffer, + * note in blocks not bytes. + * + * @result pointer to the start of the data, otherwise NULL. + */ +void *infinity_check_ready(void *qphandle, int index, size_t offset) { + + void *result = NULL; +#ifdef HAVE_INFINITY + struct qps_data *cqps = (struct qps_data *)qphandle; + + /* Get the data location. */ + BLOCKTYPE *dataptr = &((BLOCKTYPE *)cqps->readwrite_buffers[index]->getData())[offset]; + + /* Check if this has been unlocked. */ + BLOCKTYPE volatile lock = dataptr[0]; + if (lock == UNLOCKED) result = (void *)dataptr; + +#endif + return result; +} diff --git a/infinity_wrapper.h b/infinity_wrapper.h new file mode 100644 index 0000000..b32f862 --- /dev/null +++ b/infinity_wrapper.h @@ -0,0 +1,46 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2020 Peter W. Draper (p.w.draper@durham.ac.uk) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ +#ifndef INFINITY_WRAPPER_H +#define INFINITY_WRAPPER_H + +/* Config parameters. */ +//#include "../config.h" + +/* Base port no. Ranks use +rank. XXX we need to handle this more + * dynamically. */ +static int BASE_PORT = 27771; + +/* Maximum length of formatted server IP address. */ +#define infinity_max_server_ip 24 + +/* Struct of MPI server ip addresses as formatted strings.*/ +struct mpi_servers { + char *ip; +}; + +void *infinity_connect_clients(struct mpi_servers *servers, int nr_servers, + int myrank, int verbose); +void infinity_send_data(void *qphandle, int index, void *buffer, size_t size, + size_t offset); +void infinity_clients_free(void *qphandle); +void *infinity_create_servers(struct mpi_servers *servers, int nr_servers, + size_t *sizes, int myrank, int verbose); +void *infinity_check_ready(void *qphandle, int index, size_t offset); + +#endif /* INFINITY_WRAPPER_H */ diff --git a/swiftmpirdmaonestepsim_wrapper.c b/swiftmpirdmaonestepsim_wrapper.c new file mode 100644 index 0000000..9fd9a49 --- /dev/null +++ b/swiftmpirdmaonestepsim_wrapper.c @@ -0,0 +1,656 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2020 Peter W. Draper + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +// Pure RDMA version, we use MPI for process control and synchronization. +// Write variant, attempting to use eager-like one-sided sends. +// Variation has a single send and receive thread per rank. + +#include <arpa/inet.h> +#include <limits.h> +#include <mpi.h> +#include <netdb.h> +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "infinity_wrapper.h" + +#include "atomic.h" +#include "clocks.h" +#include "error.h" +#include "mpiuse.h" + +/* Our rank for all to see. */ +int myrank = -1; + +/* CPU frequency estimate, shared so we do this once. */ +static long long cpufreq = 0; + +/* Number of ranks. */ +static int nr_ranks; + +/* Size of a block of memory. */ +#define BLOCKTYPE size_t +#define MPI_BLOCKTYPE MPI_AINT +static const int BYTESINBLOCK = sizeof(BLOCKTYPE); + +/* Size of message header in blocks. The lock/rank, subtype, size and tag. */ +static const size_t HEADER_SIZE = 4; + +/* Maximum Size of a message in blocks. */ +static size_t max_size = 0; + +/* Are we verbose. */ +static int verbose = 0; + +/* Scale to apply to the size of the messages we send. */ +static float messagescale = 1.0; + +/* Set a data pattern and check we get this back, slow... */ +static int datacheck = 0; + +/* Integer types of send and recv tasks, must match log. */ +static const int task_type_send = 22; +static const int task_type_recv = 23; + +/* 3D index of array. */ +#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x) + +/* 2D index of array. */ +#define INDEX2(nx, x, y) (nx * y + x) + +/* Bit shift to accomodate all the bits of the maximum rank id. */ +static int rank_shift = 0; + +/* Bit shift to accomodate all the bits of the maximum subtype. */ +static int subtype_shift = 0; + +/* Maximum no. of messages (logs). */ +static size_t max_logs = 0; + +/* Offsets of the ranktag regions within the receive windows and lists of the + * assocated tags. */ +static size_t *ranktag_sizes; +static size_t *ranktag_offsets; +static size_t *ranktag_lists; + +/* The local send queue. */ +static struct mpiuse_log_entry **volatile send_queue; +static int volatile nr_sends = 0; +static int volatile todo_send = 0; + +/* Lock for starting up completed. */ +static int volatile starting_up = 1; + +/* Local receive queues separated by rank. */ +static int volatile nr_recvs = 0; +static struct mpiuse_log_entry **volatile recv_queue; + +/** + * @brief Convert ranks, subtype and tag into a single unique value. + * + * Assumes there is enough space in a size_t for these values. + * + * @param subtype the subtype of the message + * @param sendrank the receive rank + * @param recvrank the receive rank + * @param tag the tag + * + * @result a unique value based on both values + */ +static size_t toranktag(int subtype, int sendrank, int recvrank, int tag) { + size_t result = subtype | sendrank << subtype_shift | + recvrank << (subtype_shift + rank_shift) | + tag << (subtype_shift * 2 + rank_shift); + return result; +} + +/** + * @brief Find an IP address for the given hostname. + * + * @param hostname the hostname + * + * @result the IP address, note copy away to keep. + */ + +static char *toipaddr(char *hostname) { + + struct hostent *hostent = gethostbyname(hostname); + if (hostent == NULL) { + error("Failed to convert hostname '%s' to an IP address", hostname); + } + struct in_addr **addr_list = (struct in_addr **)hostent->h_addr_list; + return inet_ntoa(*addr_list[0]); +} + +/** + * @brief Convert a byte count into a number of blocks, rounds up. + * + * @param nr_bytes the number of bytes. + * + * @result the number of blocks needed. + */ +static BLOCKTYPE toblocks(BLOCKTYPE nr_bytes) { + return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK; +} + +/** + * @brief Convert a block count into a number of bytes. + * + * @param nr_block the number of blocks. + * + * @result the number of bytes. + */ +static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) { + return (nr_blocks * BYTESINBLOCK); +} + +/** + * @brief fill a data area with given value. + * + * @param size size of data in bytes. + * @param data the data to fill. + * @param value the value to fill. + */ +static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) { + for (BLOCKTYPE i = 0; i < size; i++) { + data[i] = value; + } +} + +/** + * @brief test a filled data area for a value. + * + * @param size size of data in bytes. + * @param data the data to check. + * @param value the value expected. + * + * @result 1 on success, 0 otherwise. + */ +static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) { + for (size_t i = 0; i < size; i++) { + if (data[i] != value) { + message("see %zd expected %zd @ %zd (%zd to go)", data[i], value, i, + size); + return 0; + } + } + return 1; +} + +/** + * @brief Send thread, sends RDMA messages to the other ranks. + * + * Messages are all considered in order. + */ +static void *send_thread(void *arg) { + + // Connect QPs to the remote servers. + struct mpi_servers *servers = (struct mpi_servers *)arg; + void *qphandle = infinity_connect_clients(servers, nr_ranks, + myrank, 0); + + /* Extract the offset lists that we use. */ + int nr = 0; + int size = (max_logs / 16 + 1); + size_t *ranktags = (size_t *)malloc(size * sizeof(size_t)); + size_t *offsets = (size_t *)malloc(size * sizeof(size_t)); + + /* A tag that will match any subtype or tag with our rank for all the + * otherranks. */ + for (int k = 0; k < nr_ranks; k++) { + size_t matchranktag = toranktag(0, myrank, k, 0); + for (size_t j = 0; j < max_logs; j++) { + size_t ranktag = ranktag_lists[INDEX3(nr_ranks, nr_ranks, myrank, k, j)]; + if ((ranktag & matchranktag) == matchranktag) { + + /* Keep this one. */ + ranktags[nr] = ranktag; + offsets[nr] = ranktag_offsets[INDEX3(nr_ranks, nr_ranks, myrank, k, j)]; + nr++; + if (nr >= size) { + size += (max_logs / 16 + 1); + ranktags = (size_t *)realloc(ranktags, size * sizeof(size_t)); + offsets = (size_t *)realloc(offsets, size * sizeof(size_t)); + } + } + } + } + + // Startup complete, so start timing and release the receive thread. + MPI_Barrier(MPI_COMM_WORLD); // Vital for synchronization. + clocks_set_cpufreq(cpufreq); + starting_up = 0; + message("All synchronized"); + ticks starttics = getticks(); + + for (int k = 0; k < nr_sends; k++) { + struct mpiuse_log_entry *log = send_queue[k]; + + /* Data has the actual data and room for the header. */ + BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE; + BLOCKTYPE *dataptr = (BLOCKTYPE *)calloc(datasize, BYTESINBLOCK); + log->data = dataptr; + log->injtic = getticks(); + + /* Fill data with pattern. */ + if (datacheck) + datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE], log->tag); + + /* First element is the lock element, which can have any value other than + * UNLOCKED, the other elements define an MPI message. */ + dataptr[0] = myrank; + dataptr[1] = log->subtype; + dataptr[2] = log->size; + dataptr[3] = log->tag; + + /* Need to find the offset for this data in the remotes window. */ + size_t ranktag = + toranktag(log->subtype, log->rank, log->otherrank, log->tag); + log->offset = 0; + int found = 0; + + for (int j = 0; j < nr; j++) { + if (ranktags[j] == ranktag) { + log->offset = offsets[j]; + found = 1; + break; + } + } + if (!found) { + error( + "Failed sending a message of size %zd to %d/%d " + "@ %zd\n, no offset found for ranktag %zd", + datasize, log->otherrank, log->subtype, log->offset, ranktag); + } + + if (verbose) + message( + "sending message subtype %d from %d to %d todo: %d/%d, offset %ld " + "size %ld", + log->subtype, myrank, log->otherrank, nr_sends - k, nr_sends, + log->offset, datasize); + + // And send + infinity_send_data(qphandle, log->otherrank, dataptr, tobytes(datasize), + tobytes(log->offset)); + free(dataptr); + log->endtic = getticks(); + } + + message("took %.3f %s", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + + // Free data. + free(offsets); + free(ranktags); + infinity_clients_free(qphandle); + + return NULL; +} + +/** + * @brief recv thread, listens for remote sends from another rank. + */ +static void *recv_thread(void *arg) { + + struct mpi_servers *servers = (struct mpi_servers *)arg; + + /* Need sizes in bytes. */ + size_t *sizes = (size_t *)calloc(nr_ranks, sizeof(size_t)); + for (int k = 0; k < nr_ranks; k++) { + sizes[k] = tobytes(ranktag_sizes[k]); + } + void *qphandle = infinity_create_servers(servers, nr_ranks, sizes, + myrank, 0); + free(sizes); + message("All incomings up"); + + /* Now we wait for the remotes to connect before proceeding, otherwise the + * timings will be incorrect. */ + while (starting_up) + ; + + /* Ignore the timing on previous part, which is fixed. */ + ticks starttics = getticks(); + + /* We loop while new requests are being send and we still have messages + * to receive. */ + int todo_recv = nr_recvs; + while (todo_recv > 0) { + + /* Loop over remaining messages, checking if any have been unlocked. */ + int k = 0; + while (k < todo_recv) { + struct mpiuse_log_entry *log = recv_queue[k]; + + /* On the first attempt we start listening for this receive. */ + if (log->injtic == 0) log->injtic = getticks(); + + /* Check if data has arrived. */ + BLOCKTYPE *dataptr = (BLOCKTYPE *)infinity_check_ready(qphandle, + log->otherrank, + log->offset); + if (dataptr != NULL) { + // XXX should check these for correctness. + // int subtype = dataptr[1]; + // size_t size = dataptr[2]; + // int tag = dataptr[3]; + + if (verbose) + message( + "receive message subtype %d from %d to %d todo: %d/%d," + " offset %ld size %ld", + log->subtype, myrank, log->otherrank, todo_recv, nr_recvs, + log->offset, toblocks(log->size) + HEADER_SIZE); + + /* Check data sent data is unchanged and received data is as + * expected. */ + if (datacheck && !datacheck_test(toblocks(log->size), + &dataptr[HEADER_SIZE], log->tag)) { + message("Data mismatch on completion"); + } + + /* Done, clean up. */ + log->done = 1; + log->endtic = getticks(); + todo_recv--; + + /* And swap with the last log if we can. */ + if (k < todo_recv) { + recv_queue[k] = recv_queue[todo_recv]; + + /* Do this log again, we wanted to check it this loop. */ + k--; + } + } + k++; + } + } + + // Free data. + infinity_clients_free(qphandle); + + message("took %.3f %s", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + + /* Thread exits. */ + return NULL; +} + +/** + * @brief Comparison function for tags. + */ +static int cmp_logs(const void *p1, const void *p2) { + struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1; + struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2; + + if (l1->tic > l2->tic) return 1; + if (l1->tic < l2->tic) return -1; + return 0; +} + +/** + * @brief Pick out the relevant logging data for our rank. + */ +static size_t pick_logs() { + + size_t nlogs = mpiuse_nr_logs(); + size_t maxsize = 0; + + /* Queues of send and receive logs. */ + send_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_sends = 0; + recv_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_recvs = 0; + + for (size_t k = 0; k < nlogs; k++) { + struct mpiuse_log_entry *log = mpiuse_get_log(k); + if (log->activation) { + if (log->rank == myrank) { + log->done = 0; + log->injtic = 0; + log->endtic = 0; + log->data = NULL; + log->ranktag = + toranktag(log->subtype, log->otherrank, log->rank, log->tag); + + /* Scale size. */ + log->size *= messagescale; + + if (log->type == task_type_send) { + send_queue[nr_sends] = log; + nr_sends++; + } else if (log->type == task_type_recv) { + recv_queue[nr_recvs] = log; + nr_recvs++; + } else { + error("task type '%d' is not a known send or recv task", log->type); + } + } + + /* Across all ranks. */ + if (log->size > maxsize) maxsize = log->size; + } + } + + /* Sort into increasing tic. */ + qsort(recv_queue, nr_recvs, sizeof(struct mpiuse_log_entry *), cmp_logs); + qsort(send_queue, nr_sends, sizeof(struct mpiuse_log_entry *), cmp_logs); + + /* Offsets and ranktags. */ + ranktag_offsets = + (size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t)); + ranktag_lists = + (size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t)); + ranktag_sizes = + (size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t)); + + /* Setup the ranktag offsets for our receive windows. Also define the sizes + * of the windows. */ + for (int k = 0; k < nr_recvs; k++) { + struct mpiuse_log_entry *log = recv_queue[k]; + ranktag_lists[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, k)] = + log->ranktag; + ranktag_offsets[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, k)] = + ranktag_sizes[log->otherrank]; + log->offset = ranktag_sizes[log->otherrank]; + + /* Need to use a multiple of blocks to keep alignment. */ + size_t size = toblocks(log->size) + HEADER_SIZE; + ranktag_sizes[log->otherrank] += size; + } + + if (verbose) { + message("maxsize = %zd, nr_sends = %d, nr_recvs = %d", maxsize, nr_sends, + nr_recvs); + } + + return maxsize; +} + +/** + * @brief usage help. + */ +static void usage(char *argv[]) { + fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n", + argv[0]); + fprintf(stderr, " options: -v verbose\n"); + fflush(stderr); +} + +/** + * @brief main function. + */ +int main(int argc, char *argv[]) { + + /* Start time for logging. This will be reset to restart time. */ + clocks_set_cpufreq(0); + cpufreq = clocks_get_cpufreq(); + + /* Initiate MPI. */ + int prov = 0; + int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov); + if (res != MPI_SUCCESS) + error("Call to MPI_Init_thread failed with error %i.", res); + + res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks); + if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res); + + res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + if (res != MPI_SUCCESS) + error("Call to MPI_Comm_rank failed with error %i.", res); + + /* Handle the command-line, we expect a mpiuse data file to read and various + * options. */ + int opt; + while ((opt = getopt(argc, argv, "vds:")) != -1) { + switch (opt) { + case 'd': + datacheck = 1; + break; + case 'v': + verbose = 1; + break; + case 's': + messagescale = atof(optarg); + break; + default: + if (myrank == 0) usage(argv); + return 1; + } + } + if (optind >= argc - 1) { + if (myrank == 0) usage(argv); + return 1; + } + char *infile = argv[optind]; + char *logfile = argv[optind + 1]; + + /* Now we read the SWIFT MPI logger output that defines the communcations + * we will undertake and the time differences between injections into the + * queues. Note this has all ranks for a single steps, SWIFT outputs one MPI + * log per rank per step, so you need to combine all ranks from a step. */ + mpiuse_log_restore(infile); + int nranks = mpiuse_nr_ranks(); + + /* This should match the expected size. */ + if (nr_ranks != nranks) + error("The number of MPI ranks %d does not match the expected value %d", + nranks, nr_ranks); + + /* Index of most significant bits in the maximum rank and subtypes. + * Assumes GCC intrinsic. */ + rank_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(nr_ranks); + subtype_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(32); + + /* We all need to agree on a maximum count of logs, so we can exchange the + * offset arrays (would be ragged otherwise and difficult to exchange). */ + max_logs = mpiuse_nr_logs() / 2 + 1; + MPI_Allreduce(MPI_IN_PLACE, &max_logs, 1, MPI_AINT, MPI_MAX, MPI_COMM_WORLD); + + /* Extract the send and recv messages for our rank. */ + size_t maxsize = pick_logs(); + + /* Largest Size of a message. Needs to align on size_t. */ + max_size = toblocks(maxsize) + HEADER_SIZE; + + /* We need to share all the offsets for each communicator with all the other + * ranks so they can push data into the correct parts of our receive + * window. */ + MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets, nr_ranks * nr_ranks * max_logs, + MPI_AINT, MPI_SUM, MPI_COMM_WORLD); + MPI_Allreduce(MPI_IN_PLACE, ranktag_lists, nr_ranks * nr_ranks * max_logs, + MPI_AINT, MPI_SUM, MPI_COMM_WORLD); + + /* Now for the RDMA setup. We need the IP addresses of all the ranks. */ + + /* Each rank can find its name and IP. */ + char name[MPI_MAX_PROCESSOR_NAME]; + int namelen = 0; + MPI_Get_processor_name(name, &namelen); + char ip[infinity_max_server_ip]; + strncpy(ip, toipaddr(name), infinity_max_server_ip); + + /* And distribute, so we all know everyone's IPs. */ + struct mpi_servers servers; + servers.ip = (char *)malloc(sizeof(char) * nr_ranks * infinity_max_server_ip); + MPI_Allgather(ip, infinity_max_server_ip, MPI_BYTE, servers.ip, + infinity_max_server_ip, MPI_BYTE, MPI_COMM_WORLD); + + if (myrank == 0 && verbose) { + message("RDMA servers will listen on:"); + for (int j = 0; j < nr_ranks; j++) { + for (int k = 0; k < nr_ranks; k++) { + if (k != j) { + message(" %d: %s on port %d", j, + &servers.ip[j * infinity_max_server_ip], BASE_PORT + k); + } + } + } + } + + /* Time to start time. Try to make it synchronous across the ranks. */ + MPI_Barrier(MPI_COMM_WORLD); + clocks_set_cpufreq(cpufreq); + if (myrank == 0) { + message("Start of MPI tests"); + message("=================="); + if (verbose) { + if (datacheck) + message("checking data pattern on send and recv completion"); + } + } + + /* Server rank that listens for connections and receives data. */ + pthread_t recvthread; + if (pthread_create(&recvthread, NULL, &recv_thread, &servers) != 0) + error("Failed to create recv thread."); + + /* Now we have a single thread to send the messages. */ + pthread_t sendthread; + if (pthread_create(&sendthread, NULL, &send_thread, &servers) != 0) + error("Failed to create send thread."); + + /* Wait until all threads have exited and all message exchanges have + * completed. */ + pthread_join(sendthread, NULL); + pthread_join(recvthread, NULL); + + /* Dump the updated MPI logs. */ + MPI_Barrier(MPI_COMM_WORLD); + if (myrank == 0) message("Dumping updated log"); + mpiuse_dump_logs(nranks, logfile); + + /* Shutdown MPI. */ + res = MPI_Finalize(); + if (res != MPI_SUCCESS) + error("call to MPI_Finalize failed with error %i.", res); + + /* Free resources. */ + free(servers.ip); + + if (myrank == 0) message("Bye"); + + return 0; +} -- GitLab