-
Peter W. Draper authoredPeter W. Draper authored
swiftmpirdmastepsim.c 22.36 KiB
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
// Fully one sided approach with passive target communication. This means only
// the sending side updates the window buffer and since we have threaded
// access we can only use flushes with a shared lock that is permanently open
// to move data. The send side has no window, as it only pushes data.
//
// So each rank needs a receive window that has room for all the expected
// sends, plus an additional element for controlling the readyness of the data
// (this is an atomic send that should be guaranteed to only be updated after
// the send of the main data).
//
// In this implementation the size of the receive buffer per rank is just the
// sum of all the messages we know we are about to get. The order of that
// buffer is determined by the rank and tags, which gives us a list of offsets
// into the buffer mapped by the ranktag, which we need to share with any rank
// that is expected to send us data. We'll send this data using normal MPI
// (could be done either as another extension into the window, which we get,
// but we'd need to synchronize that across all ranks, or we could use the
// global communicator to share this in a similar fashion).
//
// XXX ranktag per subtype is not unique we need to include the size.
#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"
/* Maximum number of communicator windows per rank, for SWIFT this is the
* number of subtypes. */
#define task_subtype_count 22
/* 3D index of array. */
#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x)
/* 2D index of array. */
#define INDEX2(nx, x, y) (nx * y + x)
/* Our rank for all to see. */
int myrank = -1;
/* Number of ranks. */
static int nr_ranks;
/* Maximum no. of messages (logs). */
static size_t max_logs = 0;
/* Flags for controlling access. */
static size_t UNLOCKED = (((size_t)2 << 63) - 1);
/* Size of a block of memory. All addressible memory chunks need to be a
* multiple of this as as we need to align sends in memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Size of message header in blocks. The flag, size and tag. Note size and tag
* are just for sanity checks. The flag value controls access to the main data
* areas. */
static const size_t HEADER_SIZE = 3;
/* Are we verbose. */
static int verbose = 0;
/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;
/* Global ommunicators for each of the subtypes. */
static MPI_Comm subtypeMPI_comms[task_subtype_count];
/* And the windows for one-sided communications. */
static MPI_Win mpi_window[task_subtype_count];
static BLOCKTYPE *mpi_ptr[task_subtype_count] = {NULL};
/* Offsets of the ranktag regions within the windows and lists of the
* assocated tags. */
static size_t ranktag_sizes[task_subtype_count] = {0};
static size_t *ranktag_counts;
static size_t *ranktag_offsets;
;
static size_t *ranktag_lists;
/* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue;
static int volatile nr_send = 0;
static int volatile todo_send = 0;
/* The local receive queue. */
static struct mpiuse_log_entry **volatile recv_queue;
static int volatile nr_recv = 0;
static int volatile todo_recv = 0;
/**
* @brief Convert ranks and tag into a single unique value.
*
* Assumes there is enough space in a size_t for these values.
*
* @param sendrank the send rank
* @param recvrank the receive rank
* @param tag the tag
*
* @result a unique value based on both values
*/
static size_t toranktag(int sendrank, int recvrank, int tag) {
int shift = (sizeof(int) * 8) - __builtin_clz(nr_ranks); /* XXX could precalc. */
//message("nr_ranks = %d, shift = %d", nr_ranks, shift);
size_t result = sendrank | recvrank << shift | tag << (shift * 2);
return result;
}
static char *tokey(struct mpiuse_log_entry *log) {
static char buf[256];
sprintf(buf, "%d/%d/%d/%zd on %d ranktags %zd/%zd",
log->otherrank, log->tag, log->subtype, log->size, log->rank,
toranktag(log->rank, log->otherrank, log->tag), toranktag(log->rank, log->otherrank, log->tag));
return buf;
}
/**
* @brief Convert a byte count into a number of blocks, rounds up.
*
* @param nr_bytes the number of bytes.
*
* @result the number of blocks needed.
*/
static int toblocks(BLOCKTYPE nr_bytes) {
return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
}
/**
* @brief Convert a block count into a number of bytes.
*
* @param nr_block the number of blocks.
*
* @result the number of bytes.
*/
static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); }
/**
* @brief fill a data area with our rank.
*
* @param size size of data in bytes.
* @param data the data to fill.
*/
static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
for (BLOCKTYPE i = 0; i < size; i++) {
data[i] = myrank;
}
}
/**
* @brief test a filled data area for a value.
*
* @param size size of data in bytes.
* @param data the data to check.
* @param rank the value to, i.e. original rank.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) {
for (size_t i = 0; i < size; i++) {
if (data[i] != (size_t)rank) {
message("see %zd expected %d @ %zd", data[i], rank, i);
return 0;
}
}
return 1;
}
/**
* @brief Send thread, sends messages to other ranks one-by-one with the
* correct offsets into the remote windows.
*
* Messages are all considered in order, regardless of the subtype.
*/
static void *send_thread(void *arg) {
message("%d: send thread starts with %d messages", *((int *)arg), nr_send);
ticks starttics = getticks();
for (int k = 0; k < nr_send; k++) {
struct mpiuse_log_entry *log = send_queue[k];
if (log == NULL) error("NULL send message queued (%d/%d)", k, nr_send);
/* Data has the actual data and room for the header. */
BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
BLOCKTYPE *dataptr = calloc(datasize, BYTESINBLOCK);
log->data = dataptr;
/* Fill data with pattern. */
if (datacheck) datacheck_fill(datasize, dataptr);
/* And define header; dataptr[0] can be any value except UNLOCKED. */
dataptr[0] = 0;
dataptr[1] = log->size;
dataptr[2] = log->tag;
/* Need to find the offset for this data in the remotes window. We match
* subtype, tag and rank. Need to search the ranktag_lists for our ranktag
* value. XXX bisection search XXX */
size_t ranktag = toranktag(log->rank, log->otherrank, log->tag);
//message("looking for %s", tokey(log));
size_t counts = ranktag_counts[INDEX2(task_subtype_count, log->subtype,
log->otherrank)];
size_t offset = 0;
int found = 0;
struct mpiuse_log_entry *keeplog = NULL;
counts = max_logs;
for (size_t j = 0; j < counts; j++) {
if (ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype,
log->otherrank, j)] == ranktag) {
if (found) message("duplicate %zd: %s of %s", ranktag, tokey(log), tokey(keeplog));
offset = ranktag_offsets[INDEX3(task_subtype_count, nr_ranks,
log->subtype, log->otherrank, j)];
keeplog = log;
//message("%d/%d located offset %zd one of %zd ranktag: %zd", log->otherrank,
// log->subtype, offset, counts, ranktag);
found = 1;
//break;
}
}
if (!found) {
fflush(stdout); fflush(stderr);
error("Failed sending a message of size %zd to %d/%d @ %zd\n, no offset"
" found for ranktag %zd, counts = %zd", datasize, log->otherrank, log->subtype,
offset, ranktag, counts);
}
//message("Sending a message of size %zd to %d/%d @ %zd", datasize,
// log->otherrank, log->subtype, offset);
/* And send data to other rank. */
int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
offset, datasize, MPI_BLOCKTYPE, MPI_REPLACE,
mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data");
/* XXX Emergency measure. */
ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
/* Now we change the first element to UNLOCKED so that the remote end can
* find out that the data has arrived. */
BLOCKTYPE newval[1];
BLOCKTYPE oldval[1];
newval[0] = UNLOCKED;
oldval[0] = 0;
ret =
MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_BLOCKTYPE,
log->otherrank, offset, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS)
mpi_error_message(ret, "MPI_Compare_and_swap error");
ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
//if (verbose) {
// if (oldval[0] == dataptr[0]) {
// message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)",
// log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
// offset, k, nr_send);
// } else {
// message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd %d/%d",
// log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
// offset, k, nr_send);
// }
//}
//sleep(1);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
return NULL;
}
/**
* @brief Recv thread, checks for messages in the window from other ranks.
*/
static void *recv_thread(void *arg) {
message(
"%d: recv thread starts, checking for %d messages %d "
"ranks %d communicators",
*((int *)arg), nr_recv, nr_ranks, task_subtype_count);
ticks starttics = getticks();
/* Global statistics. */
//int lncalls = 0;
//double lsum = 0.0;
//ticks lmint = INT_MAX;
//ticks lmaxt = 0;
/* No. of receives to process. */
todo_recv = nr_recv;
/* We loop while new requests are being send and we still have messages
* to receive. */
while (todo_recv > 0) {
//sleep(5);
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done) {
/* Get offset into subtype for this message. */
size_t offset = log->offset;
/* Check if that part of the window has been unlocked. */
BLOCKTYPE *dataptr = &mpi_ptr[log->subtype][offset];
BLOCKTYPE volatile lock = dataptr[0];
//message("Checking: %s @ %zd", (dataptr[0] == UNLOCKED?"unlocked":"locked"), offset);
//if (atomic_cas(&dataptr[0], UNLOCKED, UNLOCKED)) {
if (lock == UNLOCKED) {
//message("Checked: %s @ %zd", (dataptr[0] == UNLOCKED?"unlocked":"locked"), offset);
/* OK, so data should be ready for use, check the tag and size. */
if ((size_t)log->size == dataptr[1] && (size_t)log->tag == dataptr[2]) {
if (verbose)
//message("receive message %d/%d from %d @ offset %zd: "
// "dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
// log->otherrank, offset, dataptr[0], log->size,
// toblocks(log->size) + HEADER_SIZE);
/* Check data sent data is unchanged. */
if (datacheck) {
if (!datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], log->otherrank)) {
fflush(stdout);
fflush(stderr);
error("Data mismatch on completion");
}
}
/* Done, clean up. */
log->done = 1;
atomic_dec(&todo_recv);
if (todo_recv == 0) break;
} else {
message("bad unlocked message %d/%d from %d @ offset %zd: "
"dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
log->otherrank, offset, dataptr[0], log->size,
toblocks(log->size) + HEADER_SIZE);
error("Unlocked data has incorrect tag or size: %zd/%zd %d/%zd",
log->size, dataptr[1], log->tag, dataptr[2]);
}
}
//else {
// message("unlocked message %d/%d from %d @ offset %zd: "
// "dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
// log->otherrank, offset, dataptr[0], log->size,
// toblocks(log->size) + HEADER_SIZE);
//}
/* Need to allow for some MPI progession. Since we make no MPI calls
* (by intent receive is a passive target so only the sender should
* make calls that move data) use a no-op call. */
int flag = 0;
int ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag,
MPI_STATUS_IGNORE);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Iprobe failed");
}
}
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
}
/**
* @brief Comparison function for ranktags.
*/
static int cmp_ranktags(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
if (l1->ranktag > l2->ranktag) return 1;
if (l1->ranktag < l2->ranktag) return -1;
return 0;
}
/**
* @brief Comparison function for tags.
*/
static int cmp_tags(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
if (l1->tag > l2->tag) return 1;
if (l1->tag < l2->tag) return -1;
return 0;
}
/**
* @brief Pick out the relevant logging data for our rank.
*/
static void pick_logs() {
int nlogs = mpiuse_nr_logs();
/* Duplicate of logs. Bit large... */
send_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_send = 0;
recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recv = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->activation) {
if (log->rank == myrank) {
log->done = 0;
log->data = NULL;
log->ranktag = toranktag(log->otherrank, log->rank, log->tag);
//message("add %s", tokey(log));
if (log->type == task_type_send) {
send_queue[nr_send] = log;
nr_send++;
} else if (log->type == task_type_recv) {
recv_queue[nr_recv] = log;
nr_recv++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
}
}
}
/* Sort recv into increasing ranktag and send into tag order (don't
* want to sort by rank, that would synchronize the sends). */
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_ranktags);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_tags);
/* Offsets and ranktags. */
ranktag_offsets =
calloc(task_subtype_count * nr_ranks * max_logs, sizeof(size_t));
ranktag_lists =
calloc(task_subtype_count * nr_ranks * max_logs, sizeof(size_t));
ranktag_counts = calloc(task_subtype_count * nr_ranks, sizeof(size_t));
/* Setup the ranktag offsets for our receive windows. Also define the sizes
* of the windows. */
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank, k)] = log->ranktag;
ranktag_offsets[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank, k)] = ranktag_sizes[log->subtype];
log->offset = ranktag_sizes[log->subtype];
/* Need to use a multiple of blocks to keep alignment. */
size_t size = toblocks(log->size) + HEADER_SIZE;
//message("%d increment offset by %zd blocks", log->subtype, size);
ranktag_sizes[log->subtype] += size;
ranktag_counts[INDEX2(task_subtype_count, log->subtype, myrank)]++;
}
if (verbose)
message("local logs picked, nr_send = %d, nr_recv = %d ", nr_send, nr_recv);
}
/**
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n",
argv[0]);
fprintf(stderr, " options: -v verbose\n");
fflush(stderr);
}
/**
* @brief main function.
*/
int main(int argc, char *argv[]) {
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks);
if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int opt;
while ((opt = getopt(argc, argv, "vd")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
break;
case 'v':
verbose = 1;
break;
default:
if (myrank == 0) usage(argv);
return 1;
}
}
if (optind >= argc - 1) {
if (myrank == 0) usage(argv);
return 1;
}
char *infile = argv[optind];
char *logfile = argv[optind + 1];
/* Now we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
* log per rank per step, so you need to combine all ranks from a step. */
mpiuse_log_restore(infile);
int nranks = mpiuse_nr_ranks();
/* This should match the expected size. */
if (nr_ranks != nranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_ranks);
/* We all need to agree on a maximum count of logs, so we can exchange the
* offset arrays (would be ragged otherwise). */
max_logs = mpiuse_nr_logs() / 2 + 1;
MPI_Allreduce(MPI_IN_PLACE, &max_logs, 1, MPI_AINT, MPI_MAX, MPI_COMM_WORLD);
if (verbose) message("maximum log count = %zd", max_logs);
/* Extract the send and recv messages for our rank. */
pick_logs();
/* Now for the one-sided setup... Each rank needs a buffer per communicator
* with space for all the expected messages. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
size_t size = tobytes(ranktag_sizes[i]);
if (size == 0) size = BYTESINBLOCK;
MPI_Win_allocate(size, BYTESINBLOCK, MPI_INFO_NULL,
subtypeMPI_comms[i], &mpi_ptr[i], &mpi_window[i]);
memset(mpi_ptr[i], 170, tobytes(ranktag_sizes[i]));
//if (verbose)
// message("Allocated window of size %zd for subtype %d",
// ranktag_sizes[i], i);
/* Assert a shared lock with all the other processes on this
* window. Needed as we use threads, so cannot lock or unlock as a means
* of synchronization. */
MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]);
}
//message("Windows allocated");
/* We need to share all the offsets for each communucator with all the other
* ranks so they can push data into the correct parts of our receive
* window. */
MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets,
task_subtype_count * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, ranktag_counts, task_subtype_count * nr_ranks,
MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, ranktag_lists,
task_subtype_count * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
MPI_COMM_WORLD);
//message("check: local logs picked, nr_send = %d, nr_recv = %d ", nr_send, nr_recv);
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
clocks_set_cpufreq(0);
if (myrank == 0) {
message("Start of MPI tests");
message("==================");
if (verbose) {
if (datacheck)
message("checking data pattern on send and recv completion");
}
}
/* Make two threads, one for send and one for receiving. */
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
/* XXX could have more than one of these... With a partition of the
* subtypes. */
pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Wait until all threads have exited and all message exchanges have
* completed. */
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Free the window locks. Once we all arrive. */
MPI_Barrier(MPI_COMM_WORLD);
for (int i = 0; i < task_subtype_count; i++) {
MPI_Win_unlock_all(mpi_window[i]);
MPI_Win_free(&mpi_window[i]);
}
/* Dump the updated MPI logs. */
fflush(stdout);
if (myrank == 0) message("Dumping updated log");
mpiuse_dump_logs(nranks, logfile);
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
if (myrank == 0) message("Bye");
return 0;
}