Skip to content
Snippets Groups Projects
Commit 0b36f3f7 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Add accumulator only variation

No faster, but makes more sense
parent 0cd76249
Branches reallyonesided
No related tags found
No related merge requests found
......@@ -2,7 +2,7 @@
CFLAGS = -g -O0 -Wall
all: swiftmpistepsim swiftmpirdmastepsim
all: swiftmpistepsim swiftmpirdmastepsim swiftmpirdmastepsim-acc-only
swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h
mpicc $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c
......@@ -10,8 +10,12 @@ swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h c
swiftmpirdmastepsim: swiftmpirdmastepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h
mpicc $(CFLAGS) -o swiftmpirdmastepsim swiftmpirdmastepsim.c mpiuse.c clocks.c
swiftmpirdmastepsim-acc-only: swiftmpirdmastepsim-acc-only.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h
mpicc $(CFLAGS) -o swiftmpirdmastepsim-acc-only swiftmpirdmastepsim-acc-only.c mpiuse.c clocks.c
clean:
rm -f swiftmpistepsim
rm -f swiftmpirdmastepsim
rm -f swiftmpirdmastepsim-acc-only
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
// Fully one sided approach with passive target communication. This means only
// the sending side updates the window buffer and since we have threaded
// access we can only use flushes with a shared lock that is permanently open
// to move data. The send side has no associated window, as it only pushes data.
//
// So each rank needs a receive window that has room for all the expected
// sends, plus additional elements for controlling the readiness of the data
// (this is an atomic send that should be guaranteed to only be updated after
// the send of the main data) and correctness.
//
// In this implementation the size of the receive buffer per rank is just the
// sum of all the messages we know we are about to get. The order of that
// buffer is determined by the send and receive rank and the tag, which gives
// us a list of offsets into the buffer mapped by the ranktag, which we need
// to share with any rank that is expected to send us data. We'll send this
// data using normal MPI (could be done either as another extension into the
// window, which we get, but we'd need to synchronize that across all ranks,
// or we could use the global communicator to share this in a similar
// fashion).
#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"
/* Maximum number of communicator windows per rank, for SWIFT this is the
* number of subtypes. */
#define task_subtype_count 22
/* 3D index of array. */
#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x)
/* 2D index of array. */
#define INDEX2(nx, x, y) (nx * y + x)
/* Our rank for all to see. */
int myrank = -1;
/* Number of ranks. */
static int nr_ranks;
/* Bit shift to accomodate all the bits of the maximum rank id. */
static int rank_shift = 0;
/* Maximum no. of messages (logs). */
static size_t max_logs = 0;
/* Flags for controlling access. High end of size_t. */
static size_t UNLOCKED = (((size_t)2 << 63) - 1);
/* Size of a block of memory. All addressible memory chunks need to be a
* multiple of this as we need to align sends and receives in memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Size of message header in blocks. The unlocked flag, size and tag. Note
* size and tag are just for sanity checks. The flag value controls access to
* the main data areas. */
static const size_t HEADER_SIZE = 3;
/* Are we verbose. */
static int verbose = 0;
/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;
/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;
/* Global ommunicators for each of the subtypes. */
static MPI_Comm subtypeMPI_comms[task_subtype_count];
/* And the windows for one-sided communications. */
static MPI_Win mpi_window[task_subtype_count];
static BLOCKTYPE *mpi_ptr[task_subtype_count] = {NULL};
/* Offsets of the ranktag regions within the windows and lists of the
* assocated tags. */
static size_t ranktag_sizes[task_subtype_count] = {0};
static size_t *ranktag_counts;
static size_t *ranktag_offsets;
static size_t *ranktag_lists;
/* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue;
static int volatile nr_send = 0;
static int volatile todo_send = 0;
/* The local receive queue. */
static struct mpiuse_log_entry **volatile recv_queue;
static int volatile nr_recv = 0;
static int volatile todo_recv = 0;
/**
* @brief Convert two ranks and tag into a single unique value.
*
* Assumes there is enough space in a size_t for these values.
*
* @param sendrank the send rank
* @param recvrank the receive rank
* @param tag the tag
*
* @result a unique value based on both values
*/
static size_t toranktag(int sendrank, int recvrank, int tag) {
size_t result = sendrank | recvrank << rank_shift | tag << (rank_shift * 2);
return result;
}
/**
* @brief Convert a byte count into a number of blocks, rounds up.
*
* @param nr_bytes the number of bytes.
*
* @result the number of blocks needed.
*/
static int toblocks(BLOCKTYPE nr_bytes) {
return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
}
/**
* @brief Convert a block count into a number of bytes.
*
* @param nr_block the number of blocks.
*
* @result the number of bytes.
*/
static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); }
/**
* @brief fill a data area with our rank.
*
* @param size size of data in bytes.
* @param data the data to fill.
*/
static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
for (BLOCKTYPE i = 0; i < size; i++) {
data[i] = myrank;
}
}
/**
* @brief test a filled data area for a value, reports if any unexpected value
* is found.
*
* @param size size of data in bytes.
* @param data the data to check.
* @param rank the value to, i.e. original rank.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) {
for (size_t i = 0; i < size; i++) {
if (data[i] != (size_t)rank) {
message("see %zd expected %d @ %zd", data[i], rank, i);
return 0;
}
}
return 1;
}
/**
* @brief Send thread, sends messages to other ranks one-by-one with the
* correct offsets into the remote windows.
*
* Messages are all considered in order, regardless of the subtype.
*/
static void *send_thread(void *arg) {
message("%d: send thread starts with %d messages", *((int *)arg), nr_send);
ticks starttics = getticks();
for (int k = 0; k < nr_send; k++) {
struct mpiuse_log_entry *log = send_queue[k];
if (log == NULL) error("NULL send message queued (%d/%d)", k, nr_send);
/* Data has the actual data and room for the header. */
BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
BLOCKTYPE *dataptr = calloc(datasize, BYTESINBLOCK);
log->data = dataptr;
/* Fill data with pattern. */
if (datacheck) datacheck_fill(datasize, dataptr);
/* And define header; dataptr[0] can be any value except UNLOCKED. */
dataptr[0] = 0;
dataptr[1] = log->size;
dataptr[2] = log->tag;
/* Need to find the offset for this data in the remotes window. We match
* subtype, tag and rank. Need to search the ranktag_lists for our ranktag
* value. XXX bisection search XXX */
size_t ranktag = toranktag(log->rank, log->otherrank, log->tag);
size_t counts = ranktag_counts[INDEX2(task_subtype_count, log->subtype,
log->otherrank)];
size_t offset = 0;
int found = 0;
counts = max_logs; // XXX do we still need this?
for (size_t j = 0; j < counts; j++) {
if (ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype,
log->otherrank, j)] == ranktag) {
offset = ranktag_offsets[INDEX3(task_subtype_count, nr_ranks,
log->subtype, log->otherrank, j)];
found = 1;
break;
}
}
if (!found) {
error(
"Failed sending a message of size %zd to %d/%d "
"@ %zd\n, no offset found for ranktag %zd, counts = %zd",
datasize, log->otherrank, log->subtype, offset, ranktag, counts);
}
/* And send data to other rank. Assumes remote has set dataptr[0]
* to 0. */
int ret = MPI_Accumulate(&dataptr[1], datasize - 1, MPI_BLOCKTYPE,
log->otherrank, offset + 1, datasize - 1,
MPI_BLOCKTYPE, MPI_REPLACE,
mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data");
/* Need to flush before further modes of this window. No operlap now so do
* we need this still? */
//ret = MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]);
//if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
/* And send first element to other rank, order should be guaranteed. */
dataptr[0] = UNLOCKED;
ret = MPI_Accumulate(dataptr, 1, MPI_BLOCKTYPE, log->otherrank,
offset, 1, MPI_BLOCKTYPE, MPI_REPLACE,
mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data");
ret = MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
return NULL;
}
/**
* @brief Recv thread, checks for messages in the window from other ranks.
*/
static void *recv_thread(void *arg) {
message(
"%d: recv thread starts, checking for %d messages %d "
"ranks %d communicators",
*((int *)arg), nr_recv, nr_ranks, task_subtype_count);
ticks starttics = getticks();
/* No. of receives to process. */
todo_recv = nr_recv;
/* We loop while new requests are being send and we still have messages
* to receive. */
while (todo_recv > 0) {
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done) {
/* Get offset into subtype for this message. */
size_t offset = log->offset;
/* Check if that part of the window has been unlocked. */
BLOCKTYPE *dataptr = &mpi_ptr[log->subtype][offset];
BLOCKTYPE volatile lock = dataptr[0];
if (lock == UNLOCKED) {
/* OK, so data should be ready for use, check the tag and size. */
if ((size_t)log->size == dataptr[1] &&
(size_t)log->tag == dataptr[2]) {
if (verbose) /* Check data sent data is unchanged. */
if (datacheck) {
if (!datacheck_test(toblocks(log->size), &dataptr[HEADER_SIZE],
log->otherrank)) {
error("Data mismatch on completion");
}
}
/* Done, clean up. */
log->done = 1;
atomic_dec(&todo_recv);
if (todo_recv == 0) break;
} else {
error("Unlocked data has incorrect tag or size: %zd/%zd %d/%zd",
log->size, dataptr[1], log->tag, dataptr[2]);
}
}
/* Need to allow for some MPI progession. Since we make no MPI calls
* (by intent receive is a passive target so only the sender should
* make calls that move data) use a no-op call. */
int flag = 0;
int ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag,
MPI_STATUS_IGNORE);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Iprobe failed");
}
}
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
}
/**
* @brief Comparison function for ranktags.
*/
static int cmp_ranktags(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
if (l1->ranktag > l2->ranktag) return 1;
if (l1->ranktag < l2->ranktag) return -1;
return 0;
}
/**
* @brief Comparison function for tags.
*/
static int cmp_tags(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
if (l1->tag > l2->tag) return 1;
if (l1->tag < l2->tag) return -1;
return 0;
}
/**
* @brief Pick out the relevant logging data for our rank.
*/
static void pick_logs() {
int nlogs = mpiuse_nr_logs();
/* Duplicate of logs. Bit large... */
send_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_send = 0;
recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recv = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->activation) {
if (log->rank == myrank) {
log->done = 0;
log->data = NULL;
log->ranktag = toranktag(log->otherrank, log->rank, log->tag);
if (log->type == task_type_send) {
send_queue[nr_send] = log;
nr_send++;
} else if (log->type == task_type_recv) {
recv_queue[nr_recv] = log;
nr_recv++;
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
}
}
}
/* Sort recv into increasing ranktag and send into tag order (don't
* want to sort by rank, that would synchronize the sends). */
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_ranktags);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_tags);
/* Offsets and ranktags. */
ranktag_offsets =
calloc(task_subtype_count * nr_ranks * max_logs, sizeof(size_t));
ranktag_lists =
calloc(task_subtype_count * nr_ranks * max_logs, sizeof(size_t));
ranktag_counts = calloc(task_subtype_count * nr_ranks, sizeof(size_t));
/* Setup the ranktag offsets for our receive windows. Also define the sizes
* of the windows. */
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank,
k)] = log->ranktag;
ranktag_offsets[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank,
k)] = ranktag_sizes[log->subtype];
log->offset = ranktag_sizes[log->subtype];
/* Need to use a multiple of blocks to keep alignment. */
size_t size = toblocks(log->size) + HEADER_SIZE;
ranktag_sizes[log->subtype] += size;
ranktag_counts[INDEX2(task_subtype_count, log->subtype, myrank)]++;
}
}
/**
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n",
argv[0]);
fprintf(stderr, " options: -v verbose\n");
fflush(stderr);
}
/**
* @brief main function.
*/
int main(int argc, char *argv[]) {
/* Initiate MPI. */
int prov = 0;
int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
if (res != MPI_SUCCESS)
error("Call to MPI_Init_thread failed with error %i.", res);
res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks);
if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (res != MPI_SUCCESS)
error("Call to MPI_Comm_rank failed with error %i.", res);
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int opt;
while ((opt = getopt(argc, argv, "vd")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
break;
case 'v':
verbose = 1;
break;
default:
if (myrank == 0) usage(argv);
return 1;
}
}
if (optind >= argc - 1) {
if (myrank == 0) usage(argv);
return 1;
}
char *infile = argv[optind];
char *logfile = argv[optind + 1];
/* Now we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
* log per rank per step, so you need to combine all ranks from a step. */
mpiuse_log_restore(infile);
int nranks = mpiuse_nr_ranks();
/* This should match the expected size. */
if (nr_ranks != nranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_ranks);
/* Index of most significant bit in the maximum rank id. Assumes GCC
* intrinsic. */
rank_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(nr_ranks);
/* We all need to agree on a maximum count of logs, so we can exchange the
* offset arrays (would be ragged otherwise and difficult to exchange). */
max_logs = mpiuse_nr_logs() / 2 + 1;
MPI_Allreduce(MPI_IN_PLACE, &max_logs, 1, MPI_AINT, MPI_MAX, MPI_COMM_WORLD);
/* Extract the send and recv messages for our rank and populate the queues. */
pick_logs();
/* Now for the one-sided setup... Each rank needs a buffer per communicator
* with space for all the expected messages. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
size_t size = tobytes(ranktag_sizes[i]);
if (size == 0) size = BYTESINBLOCK;
MPI_Win_allocate(size, BYTESINBLOCK, MPI_INFO_NULL, subtypeMPI_comms[i],
&mpi_ptr[i], &mpi_window[i]);
memset(mpi_ptr[i], 170, tobytes(ranktag_sizes[i]));
/* Assert a shared lock with all the other processes on this window.
* Strictly needed as we use threads, so cannot lock or unlock as
* a means of synchronization. */
MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]);
}
/* We need to share all the offsets for each communicator with all the other
* ranks so they can push data into the correct parts of our receive
* window. */
MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets,
task_subtype_count * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, ranktag_counts, task_subtype_count * nr_ranks,
MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, ranktag_lists,
task_subtype_count * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
MPI_COMM_WORLD);
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
clocks_set_cpufreq(0);
if (myrank == 0) {
message("Start of MPI tests");
message("==================");
if (verbose) {
if (datacheck)
message("checking data pattern on send and recv completion");
}
}
/* Make two threads, one for send and one for receiving. */
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Wait until all threads have exited and all message exchanges have
* completed. */
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Free the window locks. Only after we all arrive. */
MPI_Barrier(MPI_COMM_WORLD);
for (int i = 0; i < task_subtype_count; i++) {
MPI_Win_unlock_all(mpi_window[i]);
MPI_Win_free(&mpi_window[i]);
}
/* Dump the updated MPI logs. */
fflush(stdout);
if (myrank == 0) message("Dumping updated log");
mpiuse_dump_logs(nranks, logfile);
/* Shutdown MPI. */
res = MPI_Finalize();
if (res != MPI_SUCCESS)
error("call to MPI_Finalize failed with error %i.", res);
if (myrank == 0) message("Bye");
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment