diff --git a/Makefile b/Makefile index 38b9759e03e50b26dce48ef87239ad0467c611d8..27c8f7a5f03d3b834ab0e998acfc78f179540edd 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ #CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined -CFLAGS = -g -O3 -Wall -Iinfinity/include -#CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_ASSERT_ON +#CFLAGS = -g -O3 -Wall -Iinfinity/include +CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_DEBUG_ON -DINFINITY_ASSERT_ON #CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread INCLUDES = mpiuse.h atomic.h cycle.h clocks.h error.h @@ -12,7 +12,7 @@ INFINITY = -Linfinity -linfinity -libverbs all: swiftmpistepsim swiftmpistepsim2 \ swiftmpirdmastepsim swiftmpirdmastepsim2 swiftmpirdmastepsim3 \ - swiftmpirdmaonestepsim + swiftmpirdmaonestepsim swiftmpirdmaonestepsim2 swiftmpistepsim: swiftmpistepsim.c $(DEPS) mpicxx $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c $(SOURCES) @@ -26,6 +26,9 @@ swiftmpirdmastepsim: swiftmpirdmastepsim.c $(DEPS) swiftmpirdmaonestepsim: swiftmpirdmaonestepsim.c $(DEPS) mpicxx $(CFLAGS) -o swiftmpirdmaonestepsim swiftmpirdmaonestepsim.c $(SOURCES) $(INFINITY) +swiftmpirdmaonestepsim2: swiftmpirdmaonestepsim2.c $(DEPS) + mpicxx $(CFLAGS) -o swiftmpirdmaonestepsim2 swiftmpirdmaonestepsim2.c $(SOURCES) $(INFINITY) + swiftmpirdmastepsim2: swiftmpirdmastepsim2.c $(DEPS) mpicxx $(CFLAGS) -o swiftmpirdmastepsim2 swiftmpirdmastepsim2.c $(SOURCES) $(INFINITY) @@ -37,6 +40,7 @@ clean: rm -f swiftmpistepsim2 rm -f swiftmpirdmastepsim rm -f swiftmpirdmaonestepsim + rm -f swiftmpirdmaonestepsim2 rm -f swiftmpirdmastepsim2 rm -f swiftmpirdmastepsim3 diff --git a/swiftmpirdmaonestepsim2.c b/swiftmpirdmaonestepsim2.c new file mode 100644 index 0000000000000000000000000000000000000000..1f1ef4bf55be47286673885721df3ff519de1605 --- /dev/null +++ b/swiftmpirdmaonestepsim2.c @@ -0,0 +1,749 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2020 Peter W. Draper + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +// Pure RDMA version, we use MPI for process control and synchronization. +// Write variant, attempting to use eager-like one-sided sends. +// Variation has a single send and receive thread per rank. + +#include <arpa/inet.h> +#include <limits.h> +#include <mpi.h> +#include <netdb.h> +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include <infinity/core/Context.h> +#include <infinity/memory/Buffer.h> +#include <infinity/memory/RegionToken.h> +#include <infinity/queues/QueuePair.h> +#include <infinity/queues/QueuePairFactory.h> +#include <infinity/requests/RequestToken.h> + +#include "atomic.h" +#include "clocks.h" +#include "error.h" +#include "mpiuse.h" + +/* Our rank for all to see. */ +int myrank = -1; + +/* CPU frequency estimate, shared so we do this once. */ +static long long cpufreq = 0; + +/* Number of ranks. */ +static int nr_ranks; + +/* Base port no. Ranks use +rank. XXX argument, autodiscovery? XXX */ +static int BASE_PORT = 27771; + +/* Size of a block of memory. */ +#define BLOCKTYPE size_t +#define MPI_BLOCKTYPE MPI_AINT +static const int BYTESINBLOCK = sizeof(BLOCKTYPE); + +/* Flags for controlling access. High end of size_t. */ +static size_t UNLOCKED = (((size_t)2 << 63) - 1); + +/* Size of message header in blocks. The lock/rank, subtype, size and tag. */ +static const size_t HEADER_SIZE = 4; + +/* Maximum Size of a message in blocks. */ +static size_t max_size = 0; + +/* Are we verbose. */ +static int verbose = 0; + +/* Scale to apply to the size of the messages we send. */ +static float messagescale = 1.0; + +/* Set a data pattern and check we get this back, slow... */ +static int datacheck = 0; + +/* Integer types of send and recv tasks, must match log. */ +static const int task_type_send = 22; +static const int task_type_recv = 23; + +/* 3D index of array. */ +#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x) + +/* 2D index of array. */ +#define INDEX2(nx, x, y) (nx * y + x) + +/* Bit shift to accomodate all the bits of the maximum rank id. */ +static int rank_shift = 0; + +/* Bit shift to accomodate all the bits of the maximum subtype. */ +static int subtype_shift = 0; + +/* Maximum no. of messages (logs). */ +static size_t max_logs = 0; + +/* Offsets of the ranktag regions within the receive windows and lists of the + * assocated tags. */ +static size_t *ranktag_sizes; +static size_t *ranktag_offsets; +static size_t *ranktag_lists; + +/* The local send queue. */ +static struct mpiuse_log_entry **volatile send_queue; +static int volatile nr_sends = 0; +static int volatile todo_send = 0; + +/* Lock for starting up completed. */ +static int volatile starting_up = 1; + +/* Local receive queues separated by rank. */ +static int volatile nr_recvs = 0; +static struct mpiuse_log_entry **volatile recv_queue; + +/** + * @brief Convert ranks, subtype and tag into a single unique value. + * + * Assumes there is enough space in a size_t for these values. + * + * @param subtype the subtype of the message + * @param sendrank the receive rank + * @param recvrank the receive rank + * @param tag the tag + * + * @result a unique value based on both values + */ +static size_t toranktag(int subtype, int sendrank, int recvrank, int tag) { + size_t result = subtype | sendrank << subtype_shift | + recvrank << (subtype_shift + rank_shift)| tag << (subtype_shift * 2 + rank_shift); + return result; +} + +/** + * @brief Find an IP address for the given hostname. + * + * @param hostname the hostname + * + * @result the IP address, note copy away to keep. + */ + +static char *toipaddr(char *hostname) { + + struct hostent *hostent = gethostbyname(hostname); + if (hostent == NULL) { + error("Failed to convert hostname '%s' to an IP address", hostname); + } + struct in_addr **addr_list = (struct in_addr **)hostent->h_addr_list; + return inet_ntoa(*addr_list[0]); +} + +/** + * @brief Convert a byte count into a number of blocks, rounds up. + * + * @param nr_bytes the number of bytes. + * + * @result the number of blocks needed. + */ +static BLOCKTYPE toblocks(BLOCKTYPE nr_bytes) { + return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK; +} + +/** + * @brief Convert a block count into a number of bytes. + * + * @param nr_block the number of blocks. + * + * @result the number of bytes. + */ +static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) { + return (nr_blocks * BYTESINBLOCK); +} + +/** + * @brief fill a data area with given value. + * + * @param size size of data in bytes. + * @param data the data to fill. + * @param value the value to fill. + */ +static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) { + for (BLOCKTYPE i = 0; i < size; i++) { + data[i] = value; + } +} + +/** + * @brief test a filled data area for a value. + * + * @param size size of data in bytes. + * @param data the data to check. + * @param value the value expected. + * + * @result 1 on success, 0 otherwise. + */ +static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) { + for (size_t i = 0; i < size; i++) { + if (data[i] != value) { + message("see %zd expected %zd @ %zd (%zd to go)", data[i], value, i, + size); + return 0; + } + } + return 1; +} + +/* Struct of server ip addresses as formatted strings.*/ +struct servers { + char *ip; +}; + +/** + * @brief Send thread, sends RDMA messages to the other ranks. + * + * Messages are all considered in order. + */ +static void *send_thread(void *arg) { + + // Get the destination IPs and ranks. + struct servers *servers = (struct servers *)arg; + + // Need a factory to create QPs. + infinity::core::Context *context = new infinity::core::Context(); + infinity::queues::QueuePairFactory *qpFactory = + new infinity::queues::QueuePairFactory(context); + + // Create the QPs connecting to all the other ranks. + infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **) + calloc(nr_ranks, sizeof(infinity::queues::QueuePair *)); + + // And pointers to the remote memory. + infinity::memory::RegionToken **remoteBufferToken = (infinity::memory::RegionToken **) + calloc(nr_ranks, sizeof(infinity::memory::RegionToken *)); + + // We need to listen for messages from the other rank servers that we can + // connect to them as they need to be up first. + int buf[1]; + MPI_Request reqs[nr_ranks]; + reqs[myrank] = MPI_REQUEST_NULL; + for (int k = 0; k < nr_ranks; k++) { + if (k != myrank) { + MPI_Irecv(buf, 1, MPI_INT, k, k, MPI_COMM_WORLD, &reqs[k]); + } + } + + /* Now we poll for any servers that are ready to connect. */ + int index; + MPI_Status status; + while (1) { + MPI_Waitany(nr_ranks, reqs, &index, &status); + + // All done when all requests have completed. + if (index == MPI_UNDEFINED) break; + + // Got one, so connect. + char *ip = &servers->ip[index * MPI_MAX_PROCESSOR_NAME]; + if (verbose) + message("%d waiting for connection to remote server %s %d on %d", myrank, + ip, index, BASE_PORT + myrank); + qps[index] = qpFactory->connectToRemoteHost(ip, BASE_PORT + myrank); + if (verbose) + message("%d connected to remote server %s %d on %d", myrank, ip, + index, BASE_PORT + myrank); + remoteBufferToken[index] = (infinity::memory::RegionToken *)qps[index]->getUserData(); + if (remoteBufferToken[index] == NULL) { + message("remoteBufferToken for %d is NULL", index); + } else { + message("remoteBufferToken for %d not NULL", index); + } + } + + /* Register some memory for use by RDMA, make it large enough for our + * biggest message. */ + auto *sendBuffer = new infinity::memory::Buffer(context, tobytes(max_size)); + + // Startup complete, so start timing and release the receive thread. + MPI_Barrier(MPI_COMM_WORLD); // Vital for synchronization. + clocks_set_cpufreq(cpufreq); + starting_up = 0; + message("All synchronized"); + ticks starttics = getticks(); + + for (int k = 0; k < nr_sends; k++) { + struct mpiuse_log_entry *log = send_queue[k]; + + /* Data has the actual data and room for the header. */ + BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE; + BLOCKTYPE *dataptr = (BLOCKTYPE *)calloc(datasize, BYTESINBLOCK); + log->data = dataptr; + log->injtic = getticks(); + + /* Fill data with pattern. */ + if (datacheck) + datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE], log->tag); + + /* First element is the lock element, which can have any value other than + * UNLOCKED, the other elements define an MPI message. */ + dataptr[0] = myrank; + dataptr[1] = log->subtype; + dataptr[2] = log->size; + dataptr[3] = log->tag; + + /* Copy this to the registered memory. */ + memcpy(sendBuffer->getData(), dataptr, tobytes(datasize)); + + /* Need to find the offset for this data in the remotes window. */ + size_t ranktag = toranktag(log->subtype, log->rank, log->otherrank, log->tag); + log->offset = 0; + int found = 0; + + // XXX must be faster way of doing this... Go back to per rank lists. + for (size_t j = 0; j < max_logs; j++) { + if (ranktag_lists[INDEX3(nr_ranks, nr_ranks, myrank, log->otherrank, j)] == ranktag) { + log->offset = ranktag_offsets[INDEX3(nr_ranks, nr_ranks, myrank, log->otherrank, j)]; + found = 1; + break; + } + } + if (!found) { + error( + "Failed sending a message of size %zd to %d/%d " + "@ %zd\n, no offset found for ranktag %zd", + datasize, log->otherrank, log->subtype, log->offset, ranktag); + } + + if (verbose) + message("sending message subtype %d from %d to %d todo: %d/%d, offset %ld size %ld", + log->subtype, myrank, log->otherrank, nr_sends - k, nr_sends, log->offset, datasize); + + // And send + infinity::requests::RequestToken requestToken(context); + qps[log->otherrank]->write(sendBuffer, + 0, // localOffset + remoteBufferToken[log->otherrank], // destination + tobytes(log->offset), // remoteOffset + tobytes(datasize), // sizeInBytes + infinity::queues::OperationFlags(), &requestToken); + requestToken.waitUntilCompleted(); + requestToken.reset(); + + // Now we update the unlock field. + ((BLOCKTYPE *)sendBuffer->getData())[0] = UNLOCKED; + qps[log->otherrank]->write(sendBuffer, + 0, // localOffset + remoteBufferToken[log->otherrank], // destination + tobytes(log->offset), // remoteOffset + BYTESINBLOCK, // sizeInBytes + infinity::queues::OperationFlags(), &requestToken); + requestToken.waitUntilCompleted(); // Since we reuse the sendBuffer. + // requestToken.reset(); + + log->endtic = getticks(); + } + + message("took %.3f %s", clocks_from_ticks(getticks() - starttics), clocks_getunit()); + + for (int k = 0; k < nr_ranks; k++) delete qps[k]; + free(qps); + delete sendBuffer; + delete qpFactory; + delete context; + + return NULL; +} + +/** + * @brief recv thread, listens for remote sends from another rank. + */ +static void *recv_thread(void *arg) { + + struct servers *servers = (struct servers *)arg; + + // Need a factory to create QPs. + infinity::core::Context *context = new infinity::core::Context(); + infinity::queues::QueuePairFactory *qpFactory = + new infinity::queues::QueuePairFactory(context); + + // Create the QPs connecting to all the other ranks. + infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **) + calloc(nr_ranks, sizeof(infinity::queues::QueuePair *)); + + // Create buffers to receive all the remote data. + infinity::memory::Buffer **bufferToReadWrite = (infinity::memory::Buffer **) + calloc(nr_ranks, sizeof(infinity::memory::Buffer *)); + infinity::memory::RegionToken **bufferToken = (infinity::memory::RegionToken **) + calloc(nr_ranks, sizeof(infinity::memory::RegionToken *)); + for (int k = 0; k < nr_ranks; k++) { + if (ranktag_sizes[k] > 0) { + bufferToReadWrite[k] = new infinity::memory::Buffer(context, tobytes(ranktag_sizes[k])); + bufferToken[k] = bufferToReadWrite[k]->createRegionToken(); + } else { + // Dummy. + bufferToReadWrite[k] = new infinity::memory::Buffer(context, BYTESINBLOCK); + bufferToken[k] = bufferToReadWrite[k]->createRegionToken(); + } + } + + // Do the port binding for each other rank. + int buf[1]; + MPI_Request req; + for (int k = 0; k < nr_ranks; k++) { + if (k != myrank) { + if (verbose) + message("%d binding to %d on port %d", myrank, k, BASE_PORT + k); + qpFactory->bindToPort(BASE_PORT + k); + + // Send message this port is about to block for a connection. + if (verbose) message("Blocking for first message on %d", BASE_PORT + k); + MPI_Isend(buf, 1, MPI_INT, k, myrank, MPI_COMM_WORLD, &req); + qps[k] = qpFactory->acceptIncomingConnection(bufferToken[k], + sizeof(infinity::memory::RegionToken)); + if (verbose) + message("Accepting incoming connections on %d", BASE_PORT + k); + } + } + message("All incomings up"); + + /* Now we wait for the remotes to connect before proceeding, otherwise the + * timings will be incorrect. */ + while(starting_up) + ; + + /* Ignore the timing on previous part, which is fixed. */ + ticks starttics = getticks(); + + /* We loop while new requests are being send and we still have messages + * to receive. */ + int todo_recv = nr_recvs; + while (todo_recv > 0) { + + /* Loop over remaining messages, checking if any have been unlocked. */ + for (int k = 0; k < nr_recvs; k++) { + struct mpiuse_log_entry *log = recv_queue[k]; + if (log != NULL && !log->done) { + + /* On the first attempt we start listening for this receive. */ + if (log->injtic == 0) log->injtic = getticks(); + + /* Get find the data for this message. */ + BLOCKTYPE *dataptr = + &((BLOCKTYPE *)bufferToReadWrite[log->otherrank]->getData())[log->offset]; + + /* Check if this has been unlocked. */ + BLOCKTYPE volatile lock = dataptr[0]; + + if (lock == UNLOCKED) { + + // XXX should check these for correctness. + // int subtype = dataptr[1]; + // size_t size = dataptr[2]; + // int tag = dataptr[3]; + + if (verbose) + message( + "receive message subtype %d from %d to %d todo: %d/%d," + " offset %ld size %ld", + log->subtype, myrank, log->otherrank, todo_recv, nr_recvs, + log->offset, toblocks(log->size) + HEADER_SIZE); + + /* Check data sent data is unchanged and received data is as + * expected. */ + if (datacheck && !datacheck_test(toblocks(log->size), + &dataptr[HEADER_SIZE], log->tag)) { + message("Data mismatch on completion"); + } + + /* Done, clean up. */ + log->done = 1; + log->endtic = getticks(); + todo_recv--; + } + } + } + } + + message("took %.3f %s", clocks_from_ticks(getticks() - starttics), clocks_getunit()); + + for (int k = 0; k < nr_ranks; k++) { + delete qps[k]; + delete bufferToReadWrite[k]; + delete bufferToken[k]; + } + free(qps); + delete context; + delete qpFactory; + + /* Thread exits. */ + return NULL; +} + +/** + * @brief Comparison function for tags. + */ +static int cmp_logs(const void *p1, const void *p2) { + struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1; + struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2; + + if (l1->tic > l2->tic) return 1; + if (l1->tic < l2->tic) return -1; + return 0; +} + +/** + * @brief Pick out the relevant logging data for our rank. + */ +static size_t pick_logs() { + + size_t nlogs = mpiuse_nr_logs(); + size_t maxsize = 0; + + /* Queues of send and receive logs. */ + send_queue = (struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *)); + nr_sends = 0; + recv_queue = (struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *)); + nr_recvs = 0; + + for (size_t k = 0; k < nlogs; k++) { + struct mpiuse_log_entry *log = mpiuse_get_log(k); + if (log->activation) { + if (log->rank == myrank) { + log->done = 0; + log->injtic = 0; + log->endtic = 0; + log->data = NULL; + log->ranktag = toranktag(log->subtype, log->otherrank, log->rank, log->tag); + + /* Scale size. */ + log->size *= messagescale; + + if (log->type == task_type_send) { + send_queue[nr_sends] = log; + nr_sends++; + } else if (log->type == task_type_recv) { + recv_queue[nr_recvs] = log; + nr_recvs++; + } else { + error("task type '%d' is not a known send or recv task", log->type); + } + } + + /* Across all ranks. */ + if (log->size > maxsize) maxsize = log->size; + } + } + + /* Sort into increasing tic. */ + qsort(recv_queue, nr_recvs, sizeof(struct mpiuse_log_entry *), cmp_logs); + qsort(send_queue, nr_sends, sizeof(struct mpiuse_log_entry *), cmp_logs); + + /* Offsets and ranktags. */ + ranktag_offsets = + (size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t)); + ranktag_lists = + (size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t)); + ranktag_sizes = + (size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t)); + + /* Setup the ranktag offsets for our receive windows. Also define the sizes + * of the windows. */ + for (int k = 0; k < nr_recvs; k++) { + struct mpiuse_log_entry *log = recv_queue[k]; + ranktag_lists[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, + k)] = log->ranktag; + ranktag_offsets[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, + k)] = ranktag_sizes[log->otherrank]; + log->offset = ranktag_sizes[log->otherrank]; + + /* Need to use a multiple of blocks to keep alignment. */ + size_t size = toblocks(log->size) + HEADER_SIZE; + ranktag_sizes[log->otherrank] += size; + } + + if (verbose) { + message("maxsize = %zd, nr_sends = %d, nr_recvs = %d", maxsize, nr_sends, + nr_recvs); + } + + return maxsize; +} + +/** + * @brief usage help. + */ +static void usage(char *argv[]) { + fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n", + argv[0]); + fprintf(stderr, " options: -v verbose\n"); + fflush(stderr); +} + +/** + * @brief main function. + */ +int main(int argc, char *argv[]) { + + /* Start time for logging. This will be reset to restart time. */ + clocks_set_cpufreq(0); + cpufreq = clocks_get_cpufreq(); + + /* Initiate MPI. */ + int prov = 0; + int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov); + if (res != MPI_SUCCESS) + error("Call to MPI_Init_thread failed with error %i.", res); + + res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks); + if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res); + + res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + if (res != MPI_SUCCESS) + error("Call to MPI_Comm_rank failed with error %i.", res); + + /* Handle the command-line, we expect a mpiuse data file to read and various + * options. */ + int opt; + while ((opt = getopt(argc, argv, "vds:")) != -1) { + switch (opt) { + case 'd': + datacheck = 1; + break; + case 'v': + verbose = 1; + break; + case 's': + messagescale = atof(optarg); + break; + default: + if (myrank == 0) usage(argv); + return 1; + } + } + if (optind >= argc - 1) { + if (myrank == 0) usage(argv); + return 1; + } + char *infile = argv[optind]; + char *logfile = argv[optind + 1]; + + /* Now we read the SWIFT MPI logger output that defines the communcations + * we will undertake and the time differences between injections into the + * queues. Note this has all ranks for a single steps, SWIFT outputs one MPI + * log per rank per step, so you need to combine all ranks from a step. */ + mpiuse_log_restore(infile); + int nranks = mpiuse_nr_ranks(); + + /* This should match the expected size. */ + if (nr_ranks != nranks) + error("The number of MPI ranks %d does not match the expected value %d", + nranks, nr_ranks); + + /* Index of most significant bits in the maximum rank and subtypes. + * Assumes GCC intrinsic. */ + rank_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(nr_ranks); + subtype_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(32); + + /* We all need to agree on a maximum count of logs, so we can exchange the + * offset arrays (would be ragged otherwise and difficult to exchange). */ + max_logs = mpiuse_nr_logs() / 2 + 1; + MPI_Allreduce(MPI_IN_PLACE, &max_logs, 1, MPI_AINT, MPI_MAX, MPI_COMM_WORLD); + + /* Extract the send and recv messages for our rank. */ + size_t maxsize = pick_logs(); + + /* Largest Size of a message. Needs to align on size_t. */ + max_size = toblocks(maxsize) + HEADER_SIZE; + + /* We need to share all the offsets for each communicator with all the other + * ranks so they can push data into the correct parts of our receive + * window. */ + MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets, + nr_ranks * nr_ranks * max_logs, MPI_AINT, MPI_SUM, + MPI_COMM_WORLD); + MPI_Allreduce(MPI_IN_PLACE, ranktag_lists, nr_ranks * nr_ranks * max_logs, + MPI_AINT, MPI_SUM, MPI_COMM_WORLD); + + /* Now for the RDMA setup. We need the IP addresses of all the ranks. */ + + /* Each rank can find its name and IP. */ + char name[MPI_MAX_PROCESSOR_NAME]; + int namelen = 0; + MPI_Get_processor_name(name, &namelen); + char ip[MPI_MAX_PROCESSOR_NAME]; + strncpy(ip, toipaddr(name), MPI_MAX_PROCESSOR_NAME); + + /* And distribute, so we all know everyone's IPs. */ + struct servers servers; + servers.ip = + (char *)malloc(sizeof(char) * nr_ranks * MPI_MAX_PROCESSOR_NAME); + MPI_Allgather(ip, MPI_MAX_PROCESSOR_NAME, MPI_BYTE, servers.ip, + MPI_MAX_PROCESSOR_NAME, MPI_BYTE, MPI_COMM_WORLD); + + if (myrank == 0 && verbose) { + message("RDMA servers will listen on:"); + for (int j = 0; j < nr_ranks; j++) { + for (int k = 0; k < nr_ranks; k++) { + if (k != j) { + message(" %d: %s on port %d", j, + &servers.ip[j * MPI_MAX_PROCESSOR_NAME], BASE_PORT + k); + } + } + } + } + + /* Time to start time. Try to make it synchronous across the ranks. */ + MPI_Barrier(MPI_COMM_WORLD); + clocks_set_cpufreq(cpufreq); + if (myrank == 0) { + message("Start of MPI tests"); + message("=================="); + if (verbose) { + if (datacheck) + message("checking data pattern on send and recv completion"); + } + } + + /* Server rank that listens for connections and receives data. */ + pthread_t recvthread; + if (pthread_create(&recvthread, NULL, &recv_thread, &servers) != 0) + error("Failed to create recv thread."); + + /* Now we have a single thread to send the messages. */ + pthread_t sendthread; + if (pthread_create(&sendthread, NULL, &send_thread, &servers) != 0) + error("Failed to create send thread."); + + /* Wait until all threads have exited and all message exchanges have + * completed. */ + pthread_join(sendthread, NULL); + pthread_join(recvthread, NULL); + + /* Dump the updated MPI logs. */ + MPI_Barrier(MPI_COMM_WORLD); + if (myrank == 0) message("Dumping updated log"); + mpiuse_dump_logs(nranks, logfile); + + /* Shutdown MPI. */ + res = MPI_Finalize(); + if (res != MPI_SUCCESS) + error("call to MPI_Finalize failed with error %i.", res); + + /* Free resources. */ + free(servers.ip); + + if (myrank == 0) message("Bye"); + + return 0; +}