Skip to content
Snippets Groups Projects
Commit 990efdfd authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Extract infinity into wrapper functions

parent 5bc5307b
Branches
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined
CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined -DINFINITY_DEBUG_ON -DINFINITY_ASSERT_ON
#CFLAGS = -g -O3 -Wall -Iinfinity/include
CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_DEBUG_ON -DINFINITY_ASSERT_ON
#CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_DEBUG_ON -DINFINITY_ASSERT_ON
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread
INCLUDES = mpiuse.h atomic.h cycle.h clocks.h error.h
......@@ -9,10 +9,10 @@ DEPS = Makefile $(SOURCES) $(INCLUDES)
INFINITY = -Linfinity -linfinity -libverbs
all: swiftmpistepsim swiftmpistepsim2 \
swiftmpirdmastepsim swiftmpirdmastepsim2 swiftmpirdmastepsim3 \
swiftmpirdmaonestepsim swiftmpirdmaonestepsim2
swiftmpirdmaonestepsim swiftmpirdmaonestepsim2 \
swiftmpirdmaonestepsim_wrapper
swiftmpistepsim: swiftmpistepsim.c $(DEPS)
mpicxx $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c $(SOURCES)
......@@ -35,6 +35,10 @@ swiftmpirdmastepsim2: swiftmpirdmastepsim2.c $(DEPS)
swiftmpirdmastepsim3: swiftmpirdmastepsim3.c $(DEPS)
mpicxx $(CFLAGS) -o swiftmpirdmastepsim3 swiftmpirdmastepsim3.c $(SOURCES) $(INFINITY)
swiftmpirdmaonestepsim_wrapper: swiftmpirdmaonestepsim_wrapper.c $(DEPS) infinity_wrapper.h infinity_wrapper.c
mpicxx $(CFLAGS) -o swiftmpirdmaonestepsim_wrapper swiftmpirdmaonestepsim_wrapper.c \
infinity_wrapper.c $(SOURCES) $(INFINITY)
clean:
rm -f swiftmpistepsim
rm -f swiftmpistepsim2
......@@ -43,5 +47,6 @@ clean:
rm -f swiftmpirdmaonestepsim2
rm -f swiftmpirdmastepsim2
rm -f swiftmpirdmastepsim3
rm -f swiftmpirdmastepsim_wrapper
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/**
* @brief simple C wrapper for the C++ infinity library, only provides an
* interface to meta capabilities we use. Note still exposed to C++ linkage
* from the infinity library so we must use a C++ compiler.
*/
/* Config parameters. */
//#include "../config.h"
#define HAVE_INFINITY
/* Standard includes. */
#include <arpa/inet.h>
#ifdef WITH_MPI
#include <mpi.h>
#endif
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
/* Infinity C++ headers. */
#ifdef HAVE_INFINITY
#include <infinity/core/Context.h>
#include <infinity/memory/Buffer.h>
#include <infinity/memory/RegionToken.h>
#include <infinity/queues/QueuePair.h>
#include <infinity/queues/QueuePairFactory.h>
#include <infinity/requests/RequestToken.h>
#endif
/* Local defines. */
#include "infinity_wrapper.h"
/* Local includes. */
#include "error.h"
/* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as
* as we need to align in memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Flags for controlling access. High end of size_t. */
static size_t UNLOCKED = (((size_t)2 << 63) - 1);
/* Struct of QPs and associated data. */
struct qps_data {
int nr_qps;
infinity::core::Context *context;
infinity::queues::QueuePairFactory *factory;
infinity::queues::QueuePair **qps;
infinity::memory::Buffer **receive_buffers;
infinity::memory::RegionToken **remote_buffers;
infinity::memory::Buffer **readwrite_buffers;
infinity::memory::RegionToken **token_buffers;
};
/**
* @brief Find an IP address for the given hostname.
*
* @param hostname the hostname
*
* @result the IP address, note copy away to keep.
*/
static char *toipaddr(char *hostname) {
struct hostent *hostent = gethostbyname(hostname);
if (hostent == NULL) {
error("Failed to convert hostname '%s' to an IP address", hostname);
}
struct in_addr **addr_list = (struct in_addr **)hostent->h_addr_list;
return inet_ntoa(*addr_list[0]);
}
/**
* @brief Create a QPs to connect a group of clients to a group of servers.
*
* Requires that infinity_create_servers is also running, otherwise we
* block waiting for the connections.
*
* @param servers a #mpi_servers struct with the server details.
* @param nr_servers the number of servers expected to connect.
* @param myrank the MPI rank of this process.
* @param verbose if 1 then report the connections made.
*
* @return handle for the QPs and related data.
*/
void *infinity_connect_clients(struct mpi_servers *servers, int nr_servers,
int myrank, int verbose) {
#ifdef HAVE_INFINITY
/* Struct to hold all the persistent data. */
struct qps_data *cqps = (struct qps_data *)calloc(1, sizeof(struct qps_data));
/* Need a factory to create QPs. */
cqps->context = new infinity::core::Context();
cqps->factory = new infinity::queues::QueuePairFactory(cqps->context);
/* Create the QPs connecting to all the other ranks. */
cqps->qps = (infinity::queues::QueuePair **)
calloc(nr_servers, sizeof(infinity::queues::QueuePair *));
cqps->nr_qps = nr_servers;
/* Space for the pointers to the remote memory. */
cqps->remote_buffers = (infinity::memory::RegionToken **)
calloc(nr_servers, sizeof(infinity::memory::RegionToken *));
/* We need to listen for messages from the other rank servers that we can
* connect to them as they need to be up first. */
int buf[1];
MPI_Request reqs[nr_servers];
for (int k = 0; k < nr_servers; k++) {
if (k != myrank) {
MPI_Irecv(buf, 1, MPI_INT, k, k, MPI_COMM_WORLD, &reqs[k]);
} else {
reqs[myrank] = MPI_REQUEST_NULL;
}
}
/* Now we poll for any servers that are ready to connect. */
int index;
MPI_Status status;
while (1) {
MPI_Waitany(nr_servers, reqs, &index, &status);
/* All done when all requests have completed. */
if (index == MPI_UNDEFINED) break;
/* Got one, so connect. */
char *ip = &servers->ip[index * infinity_max_server_ip];
if (verbose)
message("%d waiting for connection to remote server %s %d on %d", myrank,
ip, index, BASE_PORT + myrank);
cqps->qps[index] = cqps->factory->connectToRemoteHost(ip, BASE_PORT + myrank);
if (verbose)
message("%d connected to remote server %s %d on %d", myrank, ip, index,
BASE_PORT + myrank);
/* Remote buffer access */
cqps->remote_buffers[index] =
(infinity::memory::RegionToken *)cqps->qps[index]->getUserData();
}
/* Result is opaque. */
return (void *)cqps;
#else
return NULL;
#endif
}
/**
* @brief Send a buffer to a server listening on a QP.
*
* @param qphandle the handle from infinity_connect_clients.
* @param index index of the server to send to.
* @param buffer the buffer to send, should be block aligned.
* @param size the size of the buffer in bytes.
* @param offset the offset into the remote buffer, note in bytes not blocks.
*/
void infinity_send_data(void *qphandle, int index, void *buffer, size_t size,
size_t offset) {
#ifdef HAVE_INFINITY
struct qps_data *cqps = (struct qps_data *)qphandle;
/* Need to assign to a buffer to register memory. XXX make this as big as
* necessary per server and reuse. */
auto *sendBuffer =
new infinity::memory::Buffer(cqps->context, buffer, size);
/* And send. */
infinity::requests::RequestToken requestToken(cqps->context);
cqps->qps[index]->write(sendBuffer,
0, // localOffset
cqps->remote_buffers[index], // destination
offset, // remoteOffset
size, // sizeInBytes
infinity::queues::OperationFlags(),
&requestToken);
requestToken.waitUntilCompleted();
requestToken.reset();
/* Now we update the unlock field. */
((BLOCKTYPE *)sendBuffer->getData())[0] = UNLOCKED;
cqps->qps[index]->write(sendBuffer,
0, // localOffset
cqps->remote_buffers[index], // destination
offset, // remoteOffset
BYTESINBLOCK, // sizeInBytes
infinity::queues::OperationFlags(),
&requestToken);
requestToken.waitUntilCompleted(); // Since we reuse the sendBuffer.
delete sendBuffer;
#endif
return;
}
/* @brief Free the resource associated with handle.
*
* @param qphandle the handle from infinity_connect_clients.
*/
void infinity_clients_free(void *qphandle) {
#ifdef HAVE_INFINITY
struct qps_data *cqps = (struct qps_data *)qphandle;
for (int k = 0; k < cqps->nr_qps; k++) delete cqps->qps[k];
free(cqps->qps);
delete cqps->factory;
delete cqps->context;
free(cqps->receive_buffers);
free(cqps->remote_buffers);
if (cqps->readwrite_buffers != NULL) {
for (int k = 0; k < cqps->nr_qps; k++) delete cqps->readwrite_buffers[k];
free(cqps->readwrite_buffers);
}
if (cqps->token_buffers != NULL) {
for (int k = 0; k < cqps->nr_qps; k++) delete cqps->token_buffers[k];
free(cqps->token_buffers);
}
free(cqps);
#endif
return;
}
/**
* @brief Create QPs for server to receive data from our clients.
*
* Requires that infinity_connect_clients is also ran, otherwise we
* block waiting for the connections.
*
* @param servers a #mpi_servers struct with the server details.
* @param nr_servers the number of servers we will create.
* @param sizes the sizes, in bytes, of the various windows needed to receive
* all the remote data from a client. Array size of nr_servers.
* @param myrank the MPI rank of this process.
* @param verbose if 1 then report the connections made.
*
* @return handle for the QPs and related data.
*/
void *infinity_create_servers(struct mpi_servers *servers, int nr_servers,
size_t *sizes, int myrank, int verbose) {
#ifdef HAVE_INFINITY
/* Struct to hold all the persistent data. */
struct qps_data *cqps = (struct qps_data *)calloc(1, sizeof(struct qps_data));
/* Need a factory to create QPs. */
cqps->context = new infinity::core::Context();
cqps->factory = new infinity::queues::QueuePairFactory(cqps->context);
/* Create the QPs connecting to all the other ranks. */
cqps->qps = (infinity::queues::QueuePair **)
calloc(nr_servers, sizeof(infinity::queues::QueuePair *));
cqps->nr_qps = nr_servers;
/* Create buffers to receive all the remote data. */
cqps->readwrite_buffers = (infinity::memory::Buffer **)
calloc(nr_servers, sizeof(infinity::memory::Buffer *));
cqps->token_buffers = (infinity::memory::RegionToken **)
calloc(nr_servers, sizeof(infinity::memory::RegionToken *));
for (int k = 0; k < nr_servers; k++) {
if (sizes[k] > 0) {
cqps->readwrite_buffers[k] =
new infinity::memory::Buffer(cqps->context, sizes[k]);
} else {
/* Dummy: not expecting any data, but setup anyway. */
cqps->readwrite_buffers[k] =
new infinity::memory::Buffer(cqps->context, BYTESINBLOCK);
}
cqps->token_buffers[k] = cqps->readwrite_buffers[k]->createRegionToken();
}
/* Do the port binding for each other rank. */
int buf[1];
MPI_Request req;
for (int k = 0; k < nr_servers; k++) {
if (k != myrank) {
if (verbose)
message("%d binding to %d on port %d", myrank, k, BASE_PORT + k);
cqps->factory->bindToPort(BASE_PORT + k);
/* Send message this port is about to block for a connection. */
if (verbose) message("Blocking for first message on %d", BASE_PORT + k);
MPI_Isend(buf, 1, MPI_INT, k, myrank, MPI_COMM_WORLD, &req);
cqps->qps[k] = cqps->factory->acceptIncomingConnection
(cqps->token_buffers[k], sizeof(infinity::memory::RegionToken));
if (verbose)
message("Accepting incoming connections on %d", BASE_PORT + k);
}
}
return (void *)cqps;
#else
return NULL;
#endif
}
/**
* @brief Check if data is ready, that is has arrived.
*
* @param qphandle the handle from infinity_create_servers.
* @param index index of the client we are checking.
* @param offset the offset of this data in the RDMA buffer,
* note in blocks not bytes.
*
* @result pointer to the start of the data, otherwise NULL.
*/
void *infinity_check_ready(void *qphandle, int index, size_t offset) {
void *result = NULL;
#ifdef HAVE_INFINITY
struct qps_data *cqps = (struct qps_data *)qphandle;
/* Get the data location. */
BLOCKTYPE *dataptr = &((BLOCKTYPE *)cqps->readwrite_buffers[index]->getData())[offset];
/* Check if this has been unlocked. */
BLOCKTYPE volatile lock = dataptr[0];
if (lock == UNLOCKED) result = (void *)dataptr;
#endif
return result;
}
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
#ifndef INFINITY_WRAPPER_H
#define INFINITY_WRAPPER_H
/* Config parameters. */
//#include "../config.h"
/* Base port no. Ranks use +rank. XXX we need to handle this more
* dynamically. */
static int BASE_PORT = 27771;
/* Maximum length of formatted server IP address. */
#define infinity_max_server_ip 24
/* Struct of MPI server ip addresses as formatted strings.*/
struct mpi_servers {
char *ip;
};
void *infinity_connect_clients(struct mpi_servers *servers, int nr_servers,
int myrank, int verbose);
void infinity_send_data(void *qphandle, int index, void *buffer, size_t size,
size_t offset);
void infinity_clients_free(void *qphandle);
void *infinity_create_servers(struct mpi_servers *servers, int nr_servers,
size_t *sizes, int myrank, int verbose);
void *infinity_check_ready(void *qphandle, int index, size_t offset);
#endif /* INFINITY_WRAPPER_H */
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
// Pure RDMA version, we use MPI for process control and synchronization.
// Write variant, attempting to use eager-like one-sided sends.
// Variation has a single send and receive thread per rank.
#include <arpa/inet.h>
#include <limits.h>
#include <mpi.h>
#include <netdb.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "infinity_wrapper.h"
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"
/* Our rank for all to see. */
int myrank = -1;
/* CPU frequency estimate, shared so we do this once. */
static long long cpufreq = 0;
/* Number of ranks. */
static int nr_ranks;
/* Size of a block of memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Size of message header in blocks. The lock/rank, subtype, size and tag. */
static const size_t HEADER_SIZE = 4;
/* Maximum Size of a message in blocks. */
static size_t max_size = 0;
/* Are we verbose. */
static int verbose = 0;
/* Scale to apply to the size of the messages we send. */
static float messagescale = 1.0;
/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;
/* 3D index of array. */
#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x)
/* 2D index of array. */
#define INDEX2(nx, x, y) (nx * y + x)
/* Bit shift to accomodate all the bits of the maximum rank id. */
static int rank_shift = 0;
/* Bit shift to accomodate all the bits of the maximum subtype. */
static int subtype_shift = 0;
/* Maximum no. of messages (logs). */
static size_t max_logs = 0;
/* Offsets of the ranktag regions within the receive windows and lists of the
* assocated tags. */
static size_t *ranktag_sizes;
static size_t *ranktag_offsets;
static size_t *ranktag_lists;
/* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue;
static int volatile nr_sends = 0;
static int volatile todo_send = 0;
/* Lock for starting up completed. */
static int volatile starting_up = 1;
/* Local receive queues separated by rank. */
static int volatile nr_recvs = 0;
static struct mpiuse_log_entry **volatile recv_queue;
/**
* @brief Convert ranks, subtype and tag into a single unique value.
*
* Assumes there is enough space in a size_t for these values.
*
* @param subtype the subtype of the message
* @param sendrank the receive rank
* @param recvrank the receive rank
* @param tag the tag
*
* @result a unique value based on both values
*/
static size_t toranktag(int subtype, int sendrank, int recvrank, int tag) {
size_t result = subtype | sendrank << subtype_shift |
recvrank << (subtype_shift + rank_shift) |
tag << (subtype_shift * 2 + rank_shift);
return result;
}
/**
* @brief Find an IP address for the given hostname.
*
* @param hostname the hostname
*
* @result the IP address, note copy away to keep.
*/
static char *toipaddr(char *hostname) {
struct hostent *hostent = gethostbyname(hostname);
if (hostent == NULL) {
error("Failed to convert hostname '%s' to an IP address", hostname);
}
struct in_addr **addr_list = (struct in_addr **)hostent->h_addr_list;
return inet_ntoa(*addr_list[0]);
}
/**
* @brief Convert a byte count into a number of blocks, rounds up.
*
* @param nr_bytes the number of bytes.
*
* @result the number of blocks needed.
*/
static BLOCKTYPE toblocks(BLOCKTYPE nr_bytes) {
return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
}
/**
* @brief Convert a block count into a number of bytes.
*
* @param nr_block the number of blocks.
*
* @result the number of bytes.
*/
static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) {
return (nr_blocks * BYTESINBLOCK);
}
/**
* @brief fill a data area with given value.
*
* @param size size of data in bytes.
* @param data the data to fill.
* @param value the value to fill.
*/
static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) {
for (BLOCKTYPE i = 0; i < size; i++) {
data[i] = value;
}
}
/**
* @brief test a filled data area for a value.
*
* @param size size of data in bytes.
* @param data the data to check.
* @param value the value expected.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) {
for (size_t i = 0; i < size; i++) {
if (data[i] != value) {
message("see %zd expected %zd @ %zd (%zd to go)", data[i], value, i,
size);
return 0;
}
}
return 1;
}
/**
* @brief Send thread, sends RDMA messages to the other ranks.
*
* Messages are all considered in order.
*/
static void *send_thread(void *arg) {
// Connect QPs to the remote servers.
struct mpi_servers *servers = (struct mpi_servers *)arg;
void *qphandle = infinity_connect_clients(servers, nr_ranks,
myrank, 0);
/* Extract the offset lists that we use. */
int nr = 0;
int size = (max_logs / 16 + 1);
size_t *ranktags = (size_t *)malloc(size * sizeof(size_t));
size_t *offsets = (size_t *)malloc(size * sizeof(size_t));
/* A tag that will match any subtype or tag with our rank for all the
* otherranks. */
for (int k = 0; k < nr_ranks; k++) {
size_t matchranktag = toranktag(0, myrank, k, 0);
for (size_t j = 0; j < max_logs; j++) {
size_t ranktag = ranktag_lists[INDEX3(nr_ranks, nr_ranks, myrank, k, j)];
if ((ranktag & matchranktag) == matchranktag) {
/* Keep this one. */
ranktags[nr] = ranktag;
offsets[nr] = ranktag_offsets[INDEX3(nr_ranks, nr_ranks, myrank, k, j)];
nr++;
if (nr >= size) {
size += (max_logs / 16 + 1);
ranktags = (size_t *)realloc(ranktags, size * sizeof(size_t));
offsets = (size_t *)realloc(offsets, size * sizeof(size_t));
}
}
}
}
// Startup complete, so start timing and release the receive thread.
MPI_Barrier(MPI_COMM_WORLD); // Vital for synchronization.
clocks_set_cpufreq(cpufreq);
starting_up = 0;
message("All synchronized");
ticks starttics = getticks();
for (int k = 0; k < nr_sends; k++) {
struct mpiuse_log_entry *log = send_queue[k];
/* Data has the actual data and room for the header. */
BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
BLOCKTYPE *dataptr = (BLOCKTYPE *)calloc(datasize, BYTESINBLOCK);
log->data = dataptr;
log->injtic = getticks();
/* Fill data with pattern. */
if (datacheck)
datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE], log->tag);
/* First element is the lock element, which can have any value other than
* UNLOCKED, the other elements define an MPI message. */
dataptr[0] = myrank;
dataptr[1] = log->subtype;
dataptr[2] = log->size;
dataptr[3] = log->tag;
/* Need to find the offset for this data in the remotes window. */
size_t ranktag =
toranktag(log->subtype, log->rank, log->otherrank, log->tag);
log->offset = 0;
int found = 0;
for (int j = 0; j < nr; j++) {
if (ranktags[j] == ranktag) {
log->offset = offsets[j];
found = 1;
break;
}
}
if (!found) {
error(
"Failed sending a message of size %zd to %d/%d "
"@ %zd\n, no offset found for ranktag %zd",
datasize, log->otherrank, log->subtype, log->offset, ranktag);
}
if (verbose)
message(
"sending message subtype %d from %d to %d todo: %d/%d, offset %ld "
"size %ld",
log->subtype, myrank, log->otherrank, nr_sends - k, nr_sends,
log->offset, datasize);
// And send
infinity_send_data(qphandle, log->otherrank, dataptr, tobytes(datasize),
tobytes(log->offset));
free(dataptr);
log->endtic = getticks();
}
message("took %.3f %s", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
// Free data.
free(offsets);
free(ranktags);
infinity_clients_free(qphandle);
return NULL;
}
/**
* @brief recv thread, listens for remote sends from another rank.
*/
static void *recv_thread(void *arg) {
struct mpi_servers *servers = (struct mpi_servers *)arg;
/* Need sizes in bytes. */
size_t *sizes = (size_t *)calloc(nr_ranks, sizeof(size_t));
for (int k = 0; k < nr_ranks; k++) {
sizes[k] = tobytes(ranktag_sizes[k]);
}
void *qphandle = infinity_create_servers(servers, nr_ranks, sizes,
myrank, 0);
free(sizes);
message("All incomings up");
/* Now we wait for the remotes to connect before proceeding, otherwise the
* timings will be incorrect. */
while (starting_up)
;
/* Ignore the timing on previous part, which is fixed. */
ticks starttics = getticks();
/* We loop while new requests are being send and we still have messages
* to receive. */
int todo_recv = nr_recvs;
while (todo_recv > 0) {
/* Loop over remaining messages, checking if any have been unlocked. */
int k = 0;
while (k < todo_recv) {
struct mpiuse_log_entry *log = recv_queue[k];
/* On the first attempt we start listening for this receive. */
if (log->injtic == 0) log->injtic = getticks();
/* Check if data has arrived. */
BLOCKTYPE *dataptr = (BLOCKTYPE *)infinity_check_ready(qphandle,
log->otherrank,
log->offset);
if (dataptr != NULL) {
// XXX should check these for correctness.
// int subtype = dataptr[1];
// size_t size = dataptr[2];
// int tag = dataptr[3];
if (verbose)
message(
"receive message subtype %d from %d to %d todo: %d/%d,"
" offset %ld size %ld",
log->subtype, myrank, log->otherrank, todo_recv, nr_recvs,
log->offset, toblocks(log->size) + HEADER_SIZE);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], log->tag)) {
message("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
todo_recv--;
/* And swap with the last log if we can. */
if (k < todo_recv) {
recv_queue[k] = recv_queue[todo_recv];
/* Do this log again, we wanted to check it this loop. */
k--;
}
}
k++;
}
}
// Free data.
infinity_clients_free(qphandle);
message("took %.3f %s", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
}
/**
* @brief Comparison function for tags.
*/
static int cmp_logs(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
if (l1->tic > l2->tic) return 1;
if (l1->tic < l2->tic) return -1;
return 0;
}
/**
* @brief Pick out the relevant logging data for our rank.
*/
static size_t pick_logs() {
size_t nlogs = mpiuse_nr_logs();
size_t maxsize = 0;
/* Queues of send and receive logs. */
send_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends = 0;
recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recvs = 0;
for (size_t k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->activation) {
if (log->rank == myrank) {
log->done = 0;
log->injtic = 0;
log->endtic = 0;
log->data = NULL;
log->ranktag =
toranktag(log->subtype, log->otherrank, log->rank, log->tag);
/* Scale size. */
log->size *= messagescale;
if (log->type == task_type_send) {
send_queue[nr_sends] = log;
nr_sends++;
} else if (log->type == task_type_recv) {
recv_queue[nr_recvs] = log;
nr_recvs++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
}
/* Across all ranks. */
if (log->size > maxsize) maxsize = log->size;
}
}
/* Sort into increasing tic. */
qsort(recv_queue, nr_recvs, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_sends, sizeof(struct mpiuse_log_entry *), cmp_logs);
/* Offsets and ranktags. */
ranktag_offsets =
(size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t));
ranktag_lists =
(size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t));
ranktag_sizes =
(size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t));
/* Setup the ranktag offsets for our receive windows. Also define the sizes
* of the windows. */
for (int k = 0; k < nr_recvs; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
ranktag_lists[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, k)] =
log->ranktag;
ranktag_offsets[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, k)] =
ranktag_sizes[log->otherrank];
log->offset = ranktag_sizes[log->otherrank];
/* Need to use a multiple of blocks to keep alignment. */
size_t size = toblocks(log->size) + HEADER_SIZE;
ranktag_sizes[log->otherrank] += size;
}
if (verbose) {
message("maxsize = %zd, nr_sends = %d, nr_recvs = %d", maxsize, nr_sends,
nr_recvs);
}
return maxsize;
}
/**
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n",
argv[0]);
fprintf(stderr, " options: -v verbose\n");
fflush(stderr);
}
/**
* @brief main function.
*/
int main(int argc, char *argv[]) {
/* Start time for logging. This will be reset to restart time. */
clocks_set_cpufreq(0);
cpufreq = clocks_get_cpufreq();
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks);
if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int opt;
while ((opt = getopt(argc, argv, "vds:")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
break;
case 'v':
verbose = 1;
break;
case 's':
messagescale = atof(optarg);
break;
default:
if (myrank == 0) usage(argv);
return 1;
}
}
if (optind >= argc - 1) {
if (myrank == 0) usage(argv);
return 1;
}
char *infile = argv[optind];
char *logfile = argv[optind + 1];
/* Now we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
* log per rank per step, so you need to combine all ranks from a step. */
mpiuse_log_restore(infile);
int nranks = mpiuse_nr_ranks();
/* This should match the expected size. */
if (nr_ranks != nranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_ranks);
/* Index of most significant bits in the maximum rank and subtypes.
* Assumes GCC intrinsic. */
rank_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(nr_ranks);
subtype_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(32);
/* We all need to agree on a maximum count of logs, so we can exchange the
* offset arrays (would be ragged otherwise and difficult to exchange). */
max_logs = mpiuse_nr_logs() / 2 + 1;
MPI_Allreduce(MPI_IN_PLACE, &max_logs, 1, MPI_AINT, MPI_MAX, MPI_COMM_WORLD);
/* Extract the send and recv messages for our rank. */
size_t maxsize = pick_logs();
/* Largest Size of a message. Needs to align on size_t. */
max_size = toblocks(maxsize) + HEADER_SIZE;
/* We need to share all the offsets for each communicator with all the other
* ranks so they can push data into the correct parts of our receive
* window. */
MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets, nr_ranks * nr_ranks * max_logs,
MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, ranktag_lists, nr_ranks * nr_ranks * max_logs,
MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
/* Now for the RDMA setup. We need the IP addresses of all the ranks. */
/* Each rank can find its name and IP. */
char name[MPI_MAX_PROCESSOR_NAME];
int namelen = 0;
MPI_Get_processor_name(name, &namelen);
char ip[infinity_max_server_ip];
strncpy(ip, toipaddr(name), infinity_max_server_ip);
/* And distribute, so we all know everyone's IPs. */
struct mpi_servers servers;
servers.ip = (char *)malloc(sizeof(char) * nr_ranks * infinity_max_server_ip);
MPI_Allgather(ip, infinity_max_server_ip, MPI_BYTE, servers.ip,
infinity_max_server_ip, MPI_BYTE, MPI_COMM_WORLD);
if (myrank == 0 && verbose) {
message("RDMA servers will listen on:");
for (int j = 0; j < nr_ranks; j++) {
for (int k = 0; k < nr_ranks; k++) {
if (k != j) {
message(" %d: %s on port %d", j,
&servers.ip[j * infinity_max_server_ip], BASE_PORT + k);
}
}
}
}
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
clocks_set_cpufreq(cpufreq);
if (myrank == 0) {
message("Start of MPI tests");
message("==================");
if (verbose) {
if (datacheck)
message("checking data pattern on send and recv completion");
}
}
/* Server rank that listens for connections and receives data. */
pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &servers) != 0)
error("Failed to create recv thread.");
/* Now we have a single thread to send the messages. */
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &servers) != 0)
error("Failed to create send thread.");
/* Wait until all threads have exited and all message exchanges have
* completed. */
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Dump the updated MPI logs. */
MPI_Barrier(MPI_COMM_WORLD);
if (myrank == 0) message("Dumping updated log");
mpiuse_dump_logs(nranks, logfile);
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
/* Free resources. */
free(servers.ip);
if (myrank == 0) message("Bye");
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment