diff --git a/Makefile b/Makefile index dc5c4108364626ac1b00bb0be0640d09a1b06a5c..946793524e03a290f90f2b13ae3d764168ca7430 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CFLAGS = -g -O0 -Wall -all: swiftmpistepsim swiftmpirdmastepsim +all: swiftmpistepsim swiftmpirdmastepsim swiftmpirdmastepsim-acc-only swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h mpicc $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c @@ -10,8 +10,12 @@ swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h c swiftmpirdmastepsim: swiftmpirdmastepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h mpicc $(CFLAGS) -o swiftmpirdmastepsim swiftmpirdmastepsim.c mpiuse.c clocks.c +swiftmpirdmastepsim-acc-only: swiftmpirdmastepsim-acc-only.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h + mpicc $(CFLAGS) -o swiftmpirdmastepsim-acc-only swiftmpirdmastepsim-acc-only.c mpiuse.c clocks.c + clean: rm -f swiftmpistepsim rm -f swiftmpirdmastepsim + rm -f swiftmpirdmastepsim-acc-only diff --git a/swiftmpirdmastepsim-acc-only.c b/swiftmpirdmastepsim-acc-only.c new file mode 100644 index 0000000000000000000000000000000000000000..ec5e29f6c374b6a0931ec6a661f5b4d99583ace3 --- /dev/null +++ b/swiftmpirdmastepsim-acc-only.c @@ -0,0 +1,581 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2020 Peter W. Draper + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +// Fully one sided approach with passive target communication. This means only +// the sending side updates the window buffer and since we have threaded +// access we can only use flushes with a shared lock that is permanently open +// to move data. The send side has no associated window, as it only pushes data. +// +// So each rank needs a receive window that has room for all the expected +// sends, plus additional elements for controlling the readiness of the data +// (this is an atomic send that should be guaranteed to only be updated after +// the send of the main data) and correctness. +// +// In this implementation the size of the receive buffer per rank is just the +// sum of all the messages we know we are about to get. The order of that +// buffer is determined by the send and receive rank and the tag, which gives +// us a list of offsets into the buffer mapped by the ranktag, which we need +// to share with any rank that is expected to send us data. We'll send this +// data using normal MPI (could be done either as another extension into the +// window, which we get, but we'd need to synchronize that across all ranks, +// or we could use the global communicator to share this in a similar +// fashion). + +#include <limits.h> +#include <mpi.h> +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "atomic.h" +#include "clocks.h" +#include "error.h" +#include "mpiuse.h" + +/* Maximum number of communicator windows per rank, for SWIFT this is the + * number of subtypes. */ +#define task_subtype_count 22 + +/* 3D index of array. */ +#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x) + +/* 2D index of array. */ +#define INDEX2(nx, x, y) (nx * y + x) + +/* Our rank for all to see. */ +int myrank = -1; + +/* Number of ranks. */ +static int nr_ranks; + +/* Bit shift to accomodate all the bits of the maximum rank id. */ +static int rank_shift = 0; + +/* Maximum no. of messages (logs). */ +static size_t max_logs = 0; + +/* Flags for controlling access. High end of size_t. */ +static size_t UNLOCKED = (((size_t)2 << 63) - 1); + +/* Size of a block of memory. All addressible memory chunks need to be a + * multiple of this as we need to align sends and receives in memory. */ +#define BLOCKTYPE size_t +#define MPI_BLOCKTYPE MPI_AINT +static const int BYTESINBLOCK = sizeof(BLOCKTYPE); + +/* Size of message header in blocks. The unlocked flag, size and tag. Note + * size and tag are just for sanity checks. The flag value controls access to + * the main data areas. */ +static const size_t HEADER_SIZE = 3; + +/* Are we verbose. */ +static int verbose = 0; + +/* Set a data pattern and check we get this back, slow... */ +static int datacheck = 0; + +/* Integer types of send and recv tasks, must match log. */ +static const int task_type_send = 22; +static const int task_type_recv = 23; + +/* Global ommunicators for each of the subtypes. */ +static MPI_Comm subtypeMPI_comms[task_subtype_count]; + +/* And the windows for one-sided communications. */ +static MPI_Win mpi_window[task_subtype_count]; +static BLOCKTYPE *mpi_ptr[task_subtype_count] = {NULL}; + +/* Offsets of the ranktag regions within the windows and lists of the + * assocated tags. */ +static size_t ranktag_sizes[task_subtype_count] = {0}; +static size_t *ranktag_counts; +static size_t *ranktag_offsets; +static size_t *ranktag_lists; + +/* The local send queue. */ +static struct mpiuse_log_entry **volatile send_queue; +static int volatile nr_send = 0; +static int volatile todo_send = 0; + +/* The local receive queue. */ +static struct mpiuse_log_entry **volatile recv_queue; +static int volatile nr_recv = 0; +static int volatile todo_recv = 0; + +/** + * @brief Convert two ranks and tag into a single unique value. + * + * Assumes there is enough space in a size_t for these values. + * + * @param sendrank the send rank + * @param recvrank the receive rank + * @param tag the tag + * + * @result a unique value based on both values + */ +static size_t toranktag(int sendrank, int recvrank, int tag) { + size_t result = sendrank | recvrank << rank_shift | tag << (rank_shift * 2); + return result; +} + +/** + * @brief Convert a byte count into a number of blocks, rounds up. + * + * @param nr_bytes the number of bytes. + * + * @result the number of blocks needed. + */ +static int toblocks(BLOCKTYPE nr_bytes) { + return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK; +} + +/** + * @brief Convert a block count into a number of bytes. + * + * @param nr_block the number of blocks. + * + * @result the number of bytes. + */ +static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); } + +/** + * @brief fill a data area with our rank. + * + * @param size size of data in bytes. + * @param data the data to fill. + */ +static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) { + for (BLOCKTYPE i = 0; i < size; i++) { + data[i] = myrank; + } +} + +/** + * @brief test a filled data area for a value, reports if any unexpected value + * is found. + * + * @param size size of data in bytes. + * @param data the data to check. + * @param rank the value to, i.e. original rank. + * + * @result 1 on success, 0 otherwise. + */ +static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) { + for (size_t i = 0; i < size; i++) { + if (data[i] != (size_t)rank) { + message("see %zd expected %d @ %zd", data[i], rank, i); + return 0; + } + } + return 1; +} + +/** + * @brief Send thread, sends messages to other ranks one-by-one with the + * correct offsets into the remote windows. + * + * Messages are all considered in order, regardless of the subtype. + */ +static void *send_thread(void *arg) { + + message("%d: send thread starts with %d messages", *((int *)arg), nr_send); + ticks starttics = getticks(); + + for (int k = 0; k < nr_send; k++) { + struct mpiuse_log_entry *log = send_queue[k]; + if (log == NULL) error("NULL send message queued (%d/%d)", k, nr_send); + + /* Data has the actual data and room for the header. */ + BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE; + BLOCKTYPE *dataptr = calloc(datasize, BYTESINBLOCK); + log->data = dataptr; + + /* Fill data with pattern. */ + if (datacheck) datacheck_fill(datasize, dataptr); + + /* And define header; dataptr[0] can be any value except UNLOCKED. */ + dataptr[0] = 0; + dataptr[1] = log->size; + dataptr[2] = log->tag; + + /* Need to find the offset for this data in the remotes window. We match + * subtype, tag and rank. Need to search the ranktag_lists for our ranktag + * value. XXX bisection search XXX */ + size_t ranktag = toranktag(log->rank, log->otherrank, log->tag); + size_t counts = ranktag_counts[INDEX2(task_subtype_count, log->subtype, + log->otherrank)]; + size_t offset = 0; + + int found = 0; + counts = max_logs; // XXX do we still need this? + for (size_t j = 0; j < counts; j++) { + if (ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype, + log->otherrank, j)] == ranktag) { + offset = ranktag_offsets[INDEX3(task_subtype_count, nr_ranks, + log->subtype, log->otherrank, j)]; + found = 1; + break; + } + } + if (!found) { + error( + "Failed sending a message of size %zd to %d/%d " + "@ %zd\n, no offset found for ranktag %zd, counts = %zd", + datasize, log->otherrank, log->subtype, offset, ranktag, counts); + } + + /* And send data to other rank. Assumes remote has set dataptr[0] + * to 0. */ + int ret = MPI_Accumulate(&dataptr[1], datasize - 1, MPI_BLOCKTYPE, + log->otherrank, offset + 1, datasize - 1, + MPI_BLOCKTYPE, MPI_REPLACE, + mpi_window[log->subtype]); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data"); + + /* Need to flush before further modes of this window. No operlap now so do + * we need this still? */ + //ret = MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]); + //if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed"); + + /* And send first element to other rank, order should be guaranteed. */ + dataptr[0] = UNLOCKED; + ret = MPI_Accumulate(dataptr, 1, MPI_BLOCKTYPE, log->otherrank, + offset, 1, MPI_BLOCKTYPE, MPI_REPLACE, + mpi_window[log->subtype]); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data"); + + ret = MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed"); + } + + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + + return NULL; +} + +/** + * @brief Recv thread, checks for messages in the window from other ranks. + */ +static void *recv_thread(void *arg) { + + message( + "%d: recv thread starts, checking for %d messages %d " + "ranks %d communicators", + *((int *)arg), nr_recv, nr_ranks, task_subtype_count); + ticks starttics = getticks(); + + /* No. of receives to process. */ + todo_recv = nr_recv; + + /* We loop while new requests are being send and we still have messages + * to receive. */ + while (todo_recv > 0) { + for (int k = 0; k < nr_recv; k++) { + struct mpiuse_log_entry *log = recv_queue[k]; + if (log != NULL && !log->done) { + + /* Get offset into subtype for this message. */ + size_t offset = log->offset; + + /* Check if that part of the window has been unlocked. */ + BLOCKTYPE *dataptr = &mpi_ptr[log->subtype][offset]; + BLOCKTYPE volatile lock = dataptr[0]; + if (lock == UNLOCKED) { + + /* OK, so data should be ready for use, check the tag and size. */ + if ((size_t)log->size == dataptr[1] && + (size_t)log->tag == dataptr[2]) { + if (verbose) /* Check data sent data is unchanged. */ + if (datacheck) { + if (!datacheck_test(toblocks(log->size), &dataptr[HEADER_SIZE], + log->otherrank)) { + error("Data mismatch on completion"); + } + } + + /* Done, clean up. */ + log->done = 1; + atomic_dec(&todo_recv); + if (todo_recv == 0) break; + + } else { + error("Unlocked data has incorrect tag or size: %zd/%zd %d/%zd", + log->size, dataptr[1], log->tag, dataptr[2]); + } + } + + /* Need to allow for some MPI progession. Since we make no MPI calls + * (by intent receive is a passive target so only the sender should + * make calls that move data) use a no-op call. */ + int flag = 0; + int ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, + MPI_STATUS_IGNORE); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Iprobe failed"); + } + } + } + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); + + /* Thread exits. */ + return NULL; +} + +/** + * @brief Comparison function for ranktags. + */ +static int cmp_ranktags(const void *p1, const void *p2) { + struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1; + struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2; + + if (l1->ranktag > l2->ranktag) return 1; + if (l1->ranktag < l2->ranktag) return -1; + return 0; +} + +/** + * @brief Comparison function for tags. + */ +static int cmp_tags(const void *p1, const void *p2) { + struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1; + struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2; + + if (l1->tag > l2->tag) return 1; + if (l1->tag < l2->tag) return -1; + return 0; +} + +/** + * @brief Pick out the relevant logging data for our rank. + */ +static void pick_logs() { + + int nlogs = mpiuse_nr_logs(); + + /* Duplicate of logs. Bit large... */ + send_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_send = 0; + recv_queue = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_recv = 0; + + for (int k = 0; k < nlogs; k++) { + struct mpiuse_log_entry *log = mpiuse_get_log(k); + if (log->activation) { + if (log->rank == myrank) { + log->done = 0; + log->data = NULL; + log->ranktag = toranktag(log->otherrank, log->rank, log->tag); + if (log->type == task_type_send) { + send_queue[nr_send] = log; + nr_send++; + } else if (log->type == task_type_recv) { + recv_queue[nr_recv] = log; + nr_recv++; + } else { + error("task type '%d' is not a known send or recv task", log->type); + } + } + } + } + + /* Sort recv into increasing ranktag and send into tag order (don't + * want to sort by rank, that would synchronize the sends). */ + qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_ranktags); + qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_tags); + + /* Offsets and ranktags. */ + ranktag_offsets = + calloc(task_subtype_count * nr_ranks * max_logs, sizeof(size_t)); + ranktag_lists = + calloc(task_subtype_count * nr_ranks * max_logs, sizeof(size_t)); + ranktag_counts = calloc(task_subtype_count * nr_ranks, sizeof(size_t)); + + /* Setup the ranktag offsets for our receive windows. Also define the sizes + * of the windows. */ + for (int k = 0; k < nr_recv; k++) { + struct mpiuse_log_entry *log = recv_queue[k]; + ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank, + k)] = log->ranktag; + ranktag_offsets[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank, + k)] = ranktag_sizes[log->subtype]; + log->offset = ranktag_sizes[log->subtype]; + + /* Need to use a multiple of blocks to keep alignment. */ + size_t size = toblocks(log->size) + HEADER_SIZE; + ranktag_sizes[log->subtype] += size; + + ranktag_counts[INDEX2(task_subtype_count, log->subtype, myrank)]++; + } +} + +/** + * @brief usage help. + */ +static void usage(char *argv[]) { + fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n", + argv[0]); + fprintf(stderr, " options: -v verbose\n"); + fflush(stderr); +} + +/** + * @brief main function. + */ +int main(int argc, char *argv[]) { + + /* Initiate MPI. */ + int prov = 0; + int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov); + if (res != MPI_SUCCESS) + error("Call to MPI_Init_thread failed with error %i.", res); + + res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks); + if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res); + + res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + if (res != MPI_SUCCESS) + error("Call to MPI_Comm_rank failed with error %i.", res); + + /* Handle the command-line, we expect a mpiuse data file to read and various + * options. */ + int opt; + while ((opt = getopt(argc, argv, "vd")) != -1) { + switch (opt) { + case 'd': + datacheck = 1; + break; + case 'v': + verbose = 1; + break; + default: + if (myrank == 0) usage(argv); + return 1; + } + } + if (optind >= argc - 1) { + if (myrank == 0) usage(argv); + return 1; + } + char *infile = argv[optind]; + char *logfile = argv[optind + 1]; + + /* Now we read the SWIFT MPI logger output that defines the communcations + * we will undertake and the time differences between injections into the + * queues. Note this has all ranks for a single steps, SWIFT outputs one MPI + * log per rank per step, so you need to combine all ranks from a step. */ + mpiuse_log_restore(infile); + int nranks = mpiuse_nr_ranks(); + + /* This should match the expected size. */ + if (nr_ranks != nranks) + error("The number of MPI ranks %d does not match the expected value %d", + nranks, nr_ranks); + + /* Index of most significant bit in the maximum rank id. Assumes GCC + * intrinsic. */ + rank_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(nr_ranks); + + /* We all need to agree on a maximum count of logs, so we can exchange the + * offset arrays (would be ragged otherwise and difficult to exchange). */ + max_logs = mpiuse_nr_logs() / 2 + 1; + MPI_Allreduce(MPI_IN_PLACE, &max_logs, 1, MPI_AINT, MPI_MAX, MPI_COMM_WORLD); + + /* Extract the send and recv messages for our rank and populate the queues. */ + pick_logs(); + + /* Now for the one-sided setup... Each rank needs a buffer per communicator + * with space for all the expected messages. */ + for (int i = 0; i < task_subtype_count; i++) { + MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); + size_t size = tobytes(ranktag_sizes[i]); + if (size == 0) size = BYTESINBLOCK; + MPI_Win_allocate(size, BYTESINBLOCK, MPI_INFO_NULL, subtypeMPI_comms[i], + &mpi_ptr[i], &mpi_window[i]); + + memset(mpi_ptr[i], 170, tobytes(ranktag_sizes[i])); + + /* Assert a shared lock with all the other processes on this window. + * Strictly needed as we use threads, so cannot lock or unlock as + * a means of synchronization. */ + MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]); + } + + /* We need to share all the offsets for each communicator with all the other + * ranks so they can push data into the correct parts of our receive + * window. */ + MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets, + task_subtype_count * nr_ranks * max_logs, MPI_AINT, MPI_SUM, + MPI_COMM_WORLD); + MPI_Allreduce(MPI_IN_PLACE, ranktag_counts, task_subtype_count * nr_ranks, + MPI_AINT, MPI_SUM, MPI_COMM_WORLD); + MPI_Allreduce(MPI_IN_PLACE, ranktag_lists, + task_subtype_count * nr_ranks * max_logs, MPI_AINT, MPI_SUM, + MPI_COMM_WORLD); + + /* Time to start time. Try to make it synchronous across the ranks. */ + MPI_Barrier(MPI_COMM_WORLD); + clocks_set_cpufreq(0); + if (myrank == 0) { + message("Start of MPI tests"); + message("=================="); + if (verbose) { + if (datacheck) + message("checking data pattern on send and recv completion"); + } + } + + /* Make two threads, one for send and one for receiving. */ + pthread_t sendthread; + if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) + error("Failed to create send thread."); + pthread_t recvthread; + if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) + error("Failed to create recv thread."); + + /* Wait until all threads have exited and all message exchanges have + * completed. */ + pthread_join(sendthread, NULL); + pthread_join(recvthread, NULL); + + /* Free the window locks. Only after we all arrive. */ + MPI_Barrier(MPI_COMM_WORLD); + for (int i = 0; i < task_subtype_count; i++) { + MPI_Win_unlock_all(mpi_window[i]); + MPI_Win_free(&mpi_window[i]); + } + + /* Dump the updated MPI logs. */ + fflush(stdout); + if (myrank == 0) message("Dumping updated log"); + mpiuse_dump_logs(nranks, logfile); + + /* Shutdown MPI. */ + res = MPI_Finalize(); + if (res != MPI_SUCCESS) + error("call to MPI_Finalize failed with error %i.", res); + + if (myrank == 0) message("Bye"); + + return 0; +}