diff --git a/Makefile b/Makefile index 7756f83a1db34e27f3bf06278d902837b88d268a..3994eb4deb925d5b74d85511e0124d9b014977de 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,6 @@ -CFLAGS = -g -O0 -Wall +#CFLAGS = -g -O0 -Wall -std=gnu11 -fsanitize=address -fno-omit-frame-pointer +CFLAGS = -g -O0 -Wall -std=gnu11 + all: swiftmpistepsim swiftmpifakestepsim diff --git a/error.h b/error.h index bda6a62106d387cfa62076d9e5299b8a11d44f3d..0770d80b215c01ebee009fb8c523f0d4849fffa2 100644 --- a/error.h +++ b/error.h @@ -5,18 +5,32 @@ extern int myrank; /* Exit in error macro. */ -#define error(s, ...) \ - ({ \ - fflush(stdout); \ - fprintf(stderr, "[%03i] %s %s:%s():%i: " s "\n", myrank, \ +#define error(s, ...) \ + ({ \ + fflush(stdout); \ + fprintf(stderr, "[%03i] %s %s:%s():%i: " s "\n", myrank, \ clocks_get_timesincestart(), __FILE__, __FUNCTION__, __LINE__, \ - ##__VA_ARGS__); \ - MPI_Abort(MPI_COMM_WORLD, -1); \ + ##__VA_ARGS__); \ + MPI_Abort(MPI_COMM_WORLD, -1); \ }) /* Print a message */ -#define message(s, ...) \ - ({ \ +#define message(s, ...) \ + ({ \ printf("[%04i] %s %s: " s "\n", myrank, clocks_get_timesincestart(), \ - __FUNCTION__, ##__VA_ARGS__); \ + __FUNCTION__, ##__VA_ARGS__);fflush(stderr);fflush(stdout); \ }) + +/* Print MPI error string. */ +#define mpi_error_message(res, s, ...) \ + ({ \ + fflush(stdout); \ + fprintf(stderr, "[%04i] %s %s:%s():%i: " s "\n", myrank, \ + clocks_get_timesincestart(), __FILE__, __FUNCTION__, __LINE__, \ + ##__VA_ARGS__); \ + int len = 1024; \ + char buf[len]; \ + MPI_Error_string(res, buf, &len); \ + fprintf(stderr, "%s\n\n", buf); \ + }) + diff --git a/mpiuse.h b/mpiuse.h index 71d8982f0cbb020dd6a563d2f8cb756ccacf3dd5..862e83829230f57f082e2e51bac646903eda5f73 100644 --- a/mpiuse.h +++ b/mpiuse.h @@ -82,6 +82,12 @@ struct mpiuse_log_entry { /* Minimum time in a test. */ ticks tmin; + + /* Remote offset. */ + MPI_Aint offset; + + /* Local log index, should be global. */ + size_t index; }; /* Flags for the types of request when generating fakes. */ diff --git a/swiftmpistepsim.c b/swiftmpistepsim.c index 318e634cbbc2655a1d7f7e73515a18bccce2fb32..056d6c260300d61dec0959c1d78b3a69e06e2ca7 100644 --- a/swiftmpistepsim.c +++ b/swiftmpistepsim.c @@ -1,6 +1,6 @@ /******************************************************************************* * This file is part of SWIFT. - * Copyright (c) 2019 Peter W. Draper + * Copyright (c) 2020 Peter W. Draper * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published @@ -17,11 +17,26 @@ * ******************************************************************************/ +// One-sided variation will attempt to not use single contiguous memory +// allocations, and instead share the memory directly with MPI in the hope +// that we can avoid a memory copy into the device registered memory.... +// +// Want to keep as close to the usual MPI_Isend/Irecv/Test usage as possible. +// +// Note that this means only the sending side updates the window buffer and +// since we have threaded access we can only use flushes with a shared lock +// that is permanently open to move data. +// +// Also the send side has no associated window, as it only pushes data, so we +// cannot seem to register that? + + #include <limits.h> #include <mpi.h> #include <pthread.h> #include <stdio.h> #include <stdlib.h> +#include <string.h> #include <unistd.h> #include "atomic.h" @@ -29,15 +44,31 @@ #include "error.h" #include "mpiuse.h" +#define maxtags 1000 + /* Global: Our rank for all to see. */ int myrank = -1; +/* Number of ranks. */ +static int nr_ranks; + +/* Flags for controlling access. High end of size_t. */ +static size_t UNLOCKED = (((size_t)2 << 63) - 1); + +/* Size of a block of memory. All addressible memory chunks need to be a + * multiple of this as we need to align sends and receives in memory. */ +#define BLOCKTYPE size_t +#define MPI_BLOCKTYPE MPI_AINT +static const int BYTESINBLOCK = sizeof(BLOCKTYPE); + +/* Size of message header in blocks. The unlocked flag, size and tag. Note + * size and tag are just for sanity checks. The flag value controls access to + * the main data areas. */ +static const size_t HEADER_SIZE = 3; + /* Are we verbose. */ static int verbose = 0; -/* Attempt to keep original injection time differences. */ -static int usetics = 1; - /* Scale to apply to the size of the messages we send. */ static float messagescale = 1.0; @@ -53,301 +84,291 @@ static const int task_type_send = 25; static const int task_type_recv = 26; /* Global communicators for each of the subtypes. */ -static const int task_subtype_count = 34; // Just some upper limit on subtype. +#define task_subtype_count 34 // Just some upper limit on subtype. static MPI_Comm subtypeMPI_comms[task_subtype_count]; -/* The local queues. */ -static struct mpiuse_log_entry **volatile reqs_queue; -static int volatile ind_req = 0; -static int volatile nr_reqs = 0; +/* Windows for one-sided communications. */ +static MPI_Win mpi_window[task_subtype_count]; + +/* Offsets of the logs for all ranks. */ +static MPI_Aint *ranklog_offsets; +static int *ranklog_indices; +#define INDEX4(nx, ny, nt, x, y, z, t) (nx * ny * nt * t + nx * ny * z + nx * y + x) + +/* The local send queue. */ +static struct mpiuse_log_entry **volatile send_queue; +static int volatile nr_send = 0; +static int volatile todo_send = 0; static int volatile injecting = 1; -static struct mpiuse_log_entry **volatile recvs_queue; -static int volatile nr_recvs = 0; -static int volatile ind_recv = 0; + +/* The local receive queue. */ +static struct mpiuse_log_entry **volatile recv_queue; +static int volatile nr_recv = 0; static int volatile todo_recv = 0; -static struct mpiuse_log_entry **volatile sends_queue; -static int volatile nr_sends = 0; -static int volatile ind_send = 0; -static int volatile todo_send = 0; -/* CPU frequency of the machine that created the MPI log. */ -// XXX need to store this in the data file. -static double log_clocks_cpufreq = 2194844448.0; +/* The local requests queue. */ +static struct mpiuse_log_entry **volatile req_queue; +static int volatile ind_req = 0; +static int volatile nr_req = 0; +static int volatile todo_req = 0; + +/** + * @brief Convert a byte count into a number of blocks, rounds up. + * + * @param nr_bytes the number of bytes. + * + * @result the number of blocks needed. + */ +static int toblocks(BLOCKTYPE nr_bytes) { + return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK; +} + +/** + * @brief Convert a block count into a number of bytes. + * + * @param nr_block the number of blocks. + * + * @result the number of bytes. + */ +static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) { return (nr_blocks * BYTESINBLOCK); } /** - * @brief fill a data area with a pattern that can be checked for changes. + * @brief fill a data area with our rank. * - * @param size size of data in bytes. + * @param size size of data in blocks. * @param data the data to fill. */ -static void datacheck_fill(size_t size, void *data) { - unsigned char *p = (unsigned char *)data; - for (size_t i = 0; i < size; i++) { - p[i] = 170; /* 10101010 in bits. */ +static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) { + for (BLOCKTYPE i = 0; i < size; i++) { + data[i] = myrank; } } /** - * @brief test a filled data area for our pattern. + * @brief fill a data area with our rank. * - * @param size size of data in bytes. + * @param size size of data in blocks. * @param data the data to fill. + */ +static void datacheck_zero(BLOCKTYPE size, BLOCKTYPE *data) { + for (BLOCKTYPE i = 0; i < size; i++) { + data[i] = 0; + } +} + +/** + * @brief test a filled data area for a value, reports if any unexpected value + * is found. + * + * @param size size of data in blocks. + * @param data the data to check. + * @param rank the value to, i.e. original rank. * * @result 1 on success, 0 otherwise. */ -static int datacheck_test(size_t size, void *data) { - unsigned char *p = (unsigned char *)data; +static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) { for (size_t i = 0; i < size; i++) { - if (p[i] != 170) return 0; + if (data[i] != (size_t)rank) { + message("see %zd expected %d @ %zd", data[i], rank, i); + return 0; + } } return 1; } /** - * @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests. + * @brief Send thread, initiates the sending of messages to other ranks + * one-by-one with the correct offsets into the remote windows. * - * The requests are initiated in the time order of the original log and an - * attempt to start these with the same interval gap is made if usetics is - * set, otherwise we just do them as quickly as possible. + * Messages are all considered in order, regardless of the subtype. */ -static void *inject_thread(void *arg) { +static void *send_thread(void *arg) { - if (verbose) message("%d: injection thread starts", *((int *)arg)); + message("%d: send thread starts with %d messages", *((int *)arg), nr_send); ticks starttics = getticks(); - /* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */ - ticks basetic = reqs_queue[0]->tic; - ticks looptics = 0; - double deadtime = 0.0; - struct timespec sleep; - sleep.tv_sec = 0; - - while (ind_req < nr_reqs) { - struct mpiuse_log_entry *log = reqs_queue[ind_req]; - - if (usetics) { - /* Expect time between this request and the previous one. */ - ticks dt = log->tic - basetic; - basetic = log->tic; - - /* We guess some time below which we should not attempt to wait, - * otherwise we'll start to overrun, and just inject the next call if we - * are below that (we time the ticks this loop takes without any waiting - * and use that). Otherwise we wait a while. Note we need to convert the - * ticks of the log file into nanoseconds, that requires the original - * CPU frequency. */ - if (dt > looptics) { - - /* Remember to be fair and remove the looptics, then convert to - * nanoseconds. */ - double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9; - if (ns < 1.0e9) { - sleep.tv_nsec = (long)ns; - } else { - /* Wait more than one second. Must be an error, but complain and - * continue. */ - sleep.tv_nsec = (long)1.0e9; - message("wait greater than one second"); - } - nanosleep(&sleep, NULL); - deadtime += sleep.tv_nsec; - } - } - - /* Initialise new log elements. */ - log->done = 0; - log->nr_tests = 0; - log->tsum = 0.0; - log->tmax = 0; - log->tmin = LONG_MAX; - log->endtic = 0; - log->injtic = getticks(); - - /* Differences to SWIFT: MPI_BYTE not the MPI_Type. */ - int err = 0; - if (log->type == task_type_send) { - log->data = calloc(log->size, 1); - - /* Fill data with pattern. */ - if (datacheck) datacheck_fill(log->size, log->data); - - /* And send. */ - err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, - subtypeMPI_comms[log->subtype], &log->req); + injecting = 1; + for (int k = 0; k < nr_send; k++) { + struct mpiuse_log_entry *log = send_queue[k]; + if (log == NULL) error("NULL send message queued (%d/%d)", k, nr_send); - /* Add a new send request. */ - int ind = atomic_inc(&nr_sends); - sends_queue[ind] = log; - atomic_inc(&todo_send); + /* Data has the actual data and room for the header. */ + BLOCKTYPE datasize = log->size; + BLOCKTYPE *dataptr = log->data; - } else { - - /* Ready to receive. */ - log->data = calloc(log->size, 1); - err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, - subtypeMPI_comms[log->subtype], &log->req); + /* Update the log for the initiation of this message. */ + log->injtic = getticks(); - /* Add a new recv request. */ - int ind = atomic_inc(&nr_recvs); - recvs_queue[ind] = log; - atomic_inc(&todo_recv); - } - if (err != MPI_SUCCESS) error("Failed to activate send or recv"); - - ind_req++; - - /* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are - * equally timed. Note we include a nanosleep, they are slow. */ - if (looptics == 0 && usetics) { - sleep.tv_nsec = 1; - nanosleep(&sleep, NULL); - looptics = (getticks() - starttics); - if (verbose) - message("injection loop took %.3f %s.", clocks_from_ticks(looptics), - clocks_getunit()); - } + /* Fill data with pattern. */ + if (datacheck) datacheck_fill(datasize, dataptr); + + /* Need to find the offset for this data in remote. */ + /* XXX Index of recv log on the remote side? */ + int index = ranklog_indices[INDEX4(nr_ranks, nr_ranks, task_subtype_count, + log->rank, log->otherrank, log->subtype, log->tag)]; + log->offset = ranklog_offsets[INDEX4(nr_ranks, nr_ranks, task_subtype_count, + log->rank, log->otherrank, log->subtype, index)]; + message("Using offset %zd at index %d (%d %d)", log->offset, index, + log->otherrank, log->tag ); + + /* And define header; dataptr[0] can be any value except UNLOCKED. */ + dataptr[0] = 0; + dataptr[1] = log->size; + dataptr[2] = log->tag; + + /* And start the send of data to other rank. Note in blocks. */ + //int ret = MPI_Raccumulate(&dataptr[1], datasize - 1, MPI_BLOCKTYPE, + // log->otherrank, log->offset + 1, datasize - 1, + // MPI_BLOCKTYPE, MPI_REPLACE, + // mpi_window[log->subtype], &log->req); + int ret = MPI_Raccumulate(dataptr, datasize, MPI_BLOCKTYPE, + log->otherrank, log->offset, datasize, + MPI_BLOCKTYPE, MPI_REPLACE, + mpi_window[log->subtype], &log->req); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data"); + + /* Add to the requests queue. */ + int ind = atomic_inc(&nr_req); + req_queue[ind] = log; + atomic_inc(&todo_req); } - /* All done, thread exiting. */ - if (verbose) { - message("%d injections completed, sends = %d, recvs = %d", ind_req, - nr_sends, nr_recvs); - message("remaining sends = %d, recvs = %d", todo_send, todo_recv); - if (usetics) message("deadtime %.3f ms", deadtime / 1.0e6); - } + /* All done. */ + atomic_dec(&injecting); + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), clocks_getunit()); - atomic_dec(&injecting); + return NULL; } - /** - * @brief main loop to run over a queue of MPI requests and test for when they - * complete. Returns the total amount of time spent in calls to MPI_Test and - * the number of times it was called. - * - * @param logs the list of logs pointing to requests. - * @param nr_logs pointer to the variable containing the current number of - * logs. - * @param todos pointer to the variable containing the number of requests that - * are still active. - * @param sum the total number of ticks spent in calls to MPI_Test. - * @param ncalls the total number of calls to MPI_Test. - * @param mint the minimum ticks an MPI_Test call took. - * @param maxt the maximum ticks an MPI_Test call took. + * @brief Requests thread, checks for local completion of the send accumulates + * and then sets the remote atomic so that the message can be received. */ -static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, - int volatile *todos, double *sum, int *ncalls, - ticks *mint, ticks *maxt) { - - /* Global MPI_Test statistics. */ - int lncalls = 0; - double lsum = 0.0; - ticks lmint = LONG_MAX; - ticks lmaxt = 0; - - /* We loop while new requests are being injected and we still have requests - * to complete. */ - while (injecting || (!injecting && *todos > 0)) { - int nlogs = *nr_logs; +static void *req_thread(void *arg) { + + message("%d: req thread starts with %d messages", *((int *)arg), nr_req); + ticks starttics = getticks(); + + while (injecting || (!injecting && todo_req > 0)) { + int nlogs = nr_req; for (int k = 0; k < nlogs; k++) { - struct mpiuse_log_entry *log = logs[k]; + struct mpiuse_log_entry *log = req_queue[k]; if (log != NULL && !log->done) { - ticks tics = getticks(); int res; MPI_Status stat; int err = MPI_Test(&log->req, &res, &stat); if (err != MPI_SUCCESS) { error("MPI_Test call failed"); } - - /* Increment etc. of statistics about time in MPI_Test. */ - ticks dt = getticks() - tics; - log->tsum += (double)dt; - lsum += (double)dt; - - log->nr_tests++; - lncalls++; - - if (dt < log->tmin) log->tmin = dt; - if (dt > log->tmax) log->tmax = dt; - if (dt < lmint) lmint = dt; - if (dt > lmaxt) lmaxt = dt; - if (res) { - /* Check data sent data is unchanged and received data is as - * expected. */ - if (datacheck && !datacheck_test(log->size, log->data)) { - error("Data mismatch on completion"); - } - /* Done, clean up. */ + // Get update. XXX is this needed? + int ret = + MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]); + if (ret != MPI_SUCCESS) + mpi_error_message(ret, "MPI_Win_flush failed"); + + /* Send the UNLOCKED message. */ + BLOCKTYPE newval[1]; + BLOCKTYPE oldval[1]; + newval[0] = UNLOCKED; + oldval[0] = 0; + ret = MPI_Compare_and_swap(&newval[0], log->data, &oldval[0], + MPI_BLOCKTYPE, log->otherrank, log->offset, + mpi_window[log->subtype]); + + if (ret != MPI_SUCCESS) + mpi_error_message(ret, "MPI_Compare_and_swap error"); + + /* And complete. */ + ret = MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]); + if (ret != MPI_SUCCESS) + mpi_error_message(ret, "MPI_Win_flush failed"); + + /* Done. */ log->done = 1; log->endtic = getticks(); - free(log->data); - atomic_dec(todos); + MPI_Free_mem(log->data); + atomic_dec(&todo_req); } } } } - /* All done. */ - *sum = lsum; - *ncalls = lncalls; - *mint = lmint; - *maxt = lmaxt; - return; -} - -/** - * @brief Send thread, checks if MPI_Isend requests have completed. - */ -static void *send_thread(void *arg) { - - if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting); - ticks starttics = getticks(); - - int ncalls; - double sum; - ticks mint; - ticks maxt; - queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt); - - message( - "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " - "%.3f (%s)", - ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), - clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); - if (verbose) - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), - clocks_getunit()); + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); - /* Thread exits. */ return NULL; } /** - * @brief Recv thread, checks if MPI_Irecv requests have completed. + * @brief Recv thread, checks for messages in the window from other ranks. */ static void *recv_thread(void *arg) { - if (verbose) message("%d: recv thread starts", *((int *)arg)); + message( + "%d: recv thread starts, checking for %d messages %d " + "ranks %d communicators", + *((int *)arg), nr_recv, nr_ranks, task_subtype_count); ticks starttics = getticks(); - int ncalls; - double sum; - ticks mint; - ticks maxt; - queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt); + /* No. of receives to process. */ + todo_recv = nr_recv; - message( - "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " - "%.3f (%s)", - ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), - clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); - if (verbose) - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), - clocks_getunit()); + /* We loop while new requests are being send and we still have messages + * to receive. */ + while (todo_recv > 0) { + for (int k = 0; k < nr_recv; k++) { + struct mpiuse_log_entry *log = recv_queue[k]; + if (log != NULL && !log->done) { + + /* On the first attempt we start listening for this receive. */ + if (log->injtic == 0) log->injtic = getticks(); + + /* Check if that part of the window has been unlocked. */ + BLOCKTYPE *dataptr = log->data; + BLOCKTYPE volatile lock = dataptr[0]; + if (lock == UNLOCKED) { + + /* OK, so data should be ready for use, check the tag and size. */ + if ((size_t)log->size == dataptr[1] && + (size_t)log->tag == dataptr[2]) { + if (verbose) /* Check data sent data is unchanged. */ + if (datacheck) { + if (!datacheck_test(log->size - HEADER_SIZE, &dataptr[HEADER_SIZE], + log->otherrank)) { + error("Data mismatch on completion"); + } + } + + /* Done, clean up. */ + log->done = 1; + log->endtic = getticks(); + atomic_dec(&todo_recv); + if (todo_recv == 0) break; + + } else { + error("Unlocked data has incorrect tag or size: %zd/%zd %d/%zd", + log->size, dataptr[1], log->tag, dataptr[2]); + } + } + + /* Need to allow for some MPI progession. Since we make no MPI calls + * (by intent receive is a passive target so only the sender should + * make calls that move data) use a no-op call. Only need to do this + * when not injecting. */ + if (!injecting) { + int flag = 0; + int ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, + &flag, MPI_STATUS_IGNORE); + if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Iprobe failed"); + } + } + } + } + message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + clocks_getunit()); /* Thread exits. */ return NULL; @@ -375,15 +396,21 @@ static void pick_logs(void) { size_t nlogs = mpiuse_nr_logs(); /* Duplicate of logs. */ - reqs_queue = (struct mpiuse_log_entry **)calloc( + req_queue = (struct mpiuse_log_entry **)calloc( nlogs, sizeof(struct mpiuse_log_entry *)); - nr_reqs = 0; - sends_queue = (struct mpiuse_log_entry **)calloc( + nr_req = 0; + send_queue = (struct mpiuse_log_entry **)calloc( nlogs, sizeof(struct mpiuse_log_entry *)); - nr_sends = 0; - recvs_queue = (struct mpiuse_log_entry **)calloc( + nr_send = 0; + recv_queue = (struct mpiuse_log_entry **)calloc( nlogs, sizeof(struct mpiuse_log_entry *)); - nr_recvs = 0; + nr_recv = 0; + + /* Arrays of offsets and indices for all ranks. */ + ranklog_offsets = (MPI_Aint *)calloc(nlogs * task_subtype_count * nr_ranks * nr_ranks , sizeof(MPI_Aint)); + ranklog_indices = (int *)calloc(maxtags * task_subtype_count * nr_ranks * nr_ranks, sizeof(int)); + for (int k = 0; k < maxtags * task_subtype_count * nr_ranks *nr_ranks; k++) ranklog_indices[k] = -1; + message("nlogs = %zd", nlogs); for (int k = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = mpiuse_get_log(k); @@ -394,12 +421,66 @@ static void pick_logs(void) { if (messagesize > 0) log->size = messagesize; /* Scale size. */ - log->size *= messagescale ; + log->size *= messagescale; + + /* Into blocks. */ + log->size = toblocks(log->size); + + /* Make room for control words. */ + log->size += HEADER_SIZE; + + /* Allocate memory for use in log. */ + int ret = MPI_Alloc_mem(tobytes(log->size), MPI_INFO_NULL, &log->data); + if (ret != MPI_SUCCESS) { + mpi_error_message(ret, "MPI_Alloc_mem failed (%zd)", log->size); + } - /* And keep this log. */ - log->data = NULL; - reqs_queue[nr_reqs] = log; - nr_reqs++; + /* Get MPI address and global log index. */ + MPI_Get_address(log->data, &log->offset); + //log->offset = (MPI_Aint) &log->data; + log->index = k; + + + if (log->type == task_type_recv) { + + message("recv log from rank: %d/%d/%d/%d %zd %zd", + log->otherrank, myrank, log->subtype, + log->tag, log->offset, log->index); + + /* Keep offset for sharing with send side. */ + ranklog_offsets[INDEX4(nr_ranks, nr_ranks, task_subtype_count, + log->otherrank, myrank, + log->subtype, log->index)] = log->offset; + ranklog_indices[INDEX4(nr_ranks, nr_ranks, task_subtype_count, + log->otherrank, myrank, + log->subtype, log->tag)] = log->index; + + /* And attach to window. */ + MPI_Win_attach(mpi_window[log->subtype], log->data, tobytes(log->size)); + + recv_queue[nr_recv] = log; + nr_recv++; + } else { + + //message("send log to rank: %d %zd %d", log->otherrank, + // log->offset, log->tag); + + /* Sends local data that is not in a window. */ + send_queue[nr_send] = log; + nr_send++; + } + + log->done = 0; + BLOCKTYPE *dataptr = log->data; + dataptr[0] = 0; + log->nr_tests = 0; + log->tsum = 0.0; + log->tmax = 0; + log->tmin = LONG_MAX; + log->endtic = 0; + + // XXX test XXX + datacheck_zero(log->size, log->data); } else { error("task type '%d' is not a known send or recv task", log->type); @@ -408,14 +489,9 @@ static void pick_logs(void) { } /* Sort into increasing time. */ - qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs); + qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs); + qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs); - /* Check. */ - for (int k = 0; k < nr_reqs - 1; k++) { - if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic) - message("reqs_queue: %lld > %lld", reqs_queue[k]->tic, - reqs_queue[k + 1]->tic); - } } /** @@ -453,14 +529,11 @@ int main(int argc, char *argv[]) { /* Handle the command-line, we expect a mpiuse data file to read and various * options. */ int opt; - while ((opt = getopt(argc, argv, "vfdc:s:")) != -1) { + while ((opt = getopt(argc, argv, "vdc:s:")) != -1) { switch (opt) { case 'd': datacheck = 1; break; - case 'f': - usetics = 0; - break; case 'v': verbose = 1; break; @@ -487,21 +560,64 @@ int main(int argc, char *argv[]) { * queues. Note this has all ranks for a single steps, SWIFT outputs one MPI * log per rank per step, so you need to combine all ranks from a step. */ mpiuse_log_restore(infile); - int nranks = mpiuse_nr_ranks(); + nr_ranks = mpiuse_nr_ranks(); /* This should match the expected size. */ - if (nr_nodes != nranks) + if (nr_nodes != nr_ranks) error("The number of MPI ranks %d does not match the expected value %d", - nranks, nr_nodes); + nr_ranks, nr_nodes); - /* Create communicators for each subtype of the tasks. */ + /* Now for the one-sided setup... Each rank needs a communicator and a + * dynamic window to attach the memory to. */ for (int i = 0; i < task_subtype_count; i++) { MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); + int ret = MPI_Win_create_dynamic(MPI_INFO_NULL, subtypeMPI_comms[i], + &mpi_window[i]); + if (ret != MPI_SUCCESS) { + mpi_error_message(ret, "MPI_Win_create_dynamic failed"); + } + + /* Assert a shared lock with all the other processes on this window. + * Strictly needed as we use threads, so cannot lock or unlock as + * a means of synchronization. */ + MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]); } - /* Each rank requires its own queue, so extract them. */ + /* Each rank requires its own queue, so extract them, also associate memory + * with windows. */ pick_logs(); + /* We need to share all the offsets for each log so they can push data into + * the correct parts of our receive window. */ + size_t nlogs = mpiuse_nr_logs(); + MPI_Allreduce(MPI_IN_PLACE, ranklog_offsets, nr_ranks * nr_ranks * task_subtype_count * + nlogs, MPI_AINT, MPI_MAX, MPI_COMM_WORLD); + MPI_Allreduce(MPI_IN_PLACE, ranklog_indices, nr_ranks * nr_ranks * task_subtype_count * + maxtags, MPI_INT, MPI_MAX, MPI_COMM_WORLD); + + if (myrank == 0) { + int count = 0; + for (int l = 0; l < nr_ranks; l++) { + for (int k = 0; k < nr_ranks; k++) { + for (int i = 0; i < task_subtype_count; i++) { + for (int j = 0; j < maxtags; j++) { + int index = ranklog_indices[INDEX4(nr_ranks, nr_ranks, + task_subtype_count, l, k, i, j)]; + if (index >= 0) { + MPI_Aint offset = ranklog_offsets[INDEX4(nr_ranks, nr_ranks, + task_subtype_count, l, k, i, index)]; + if (offset > 0) { + message("%d/%d/%d/%d %zd %d", l, k , i, j, offset, index); + count++; + } + } + } + } + } + } + message("offsets = %d", count); + } + /* Time to start time. Try to make it synchronous across the ranks. */ MPI_Barrier(MPI_COMM_WORLD); clocks_set_cpufreq(0); @@ -512,33 +628,43 @@ int main(int argc, char *argv[]) { message("Using fixed message size of %zd", messagesize); if (messagescale != 1.0f) message("Using message scale of %f", messagescale); - if (!usetics) message("Using fast untimed injections"); if (datacheck) message("Checking data pattern on send and recv completion"); } - /* Make three threads, one for injecting tasks and two to check for - * completions of the sends and recv independently. */ - pthread_t injectthread; - if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0) - error("Failed to create injection thread."); + /* Make three threads, one for injecting the sends, one to listen for + * injection completion and one for testing receives. */ pthread_t sendthread; if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) error("Failed to create send thread."); + pthread_t reqthread; + if (pthread_create(&reqthread, NULL, &req_thread, &myrank) != 0) + error("Failed to create send thread."); pthread_t recvthread; if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) error("Failed to create recv thread."); - /* Wait until all threads have exited and all MPI requests have completed. */ - pthread_join(injectthread, NULL); + /* Wait until all threads have exited and all message exchanges have + * completed. */ + pthread_join(reqthread, NULL); pthread_join(sendthread, NULL); pthread_join(recvthread, NULL); + + /* XXX Should free all the log memory, or do that as we go... */ + + /* Free the window locks. Only after we all arrive. */ + MPI_Barrier(MPI_COMM_WORLD); + for (int i = 0; i < task_subtype_count; i++) { + MPI_Win_unlock_all(mpi_window[i]); + MPI_Win_free(&mpi_window[i]); + } + /* Dump the updated MPI logs. */ MPI_Barrier(MPI_COMM_WORLD); fflush(stdout); if (myrank == 0) message("Dumping updated log"); - mpiuse_dump_logs(nranks, logfile); + mpiuse_dump_logs(nr_ranks, logfile); /* Shutdown MPI. */ res = MPI_Finalize(); diff --git a/testdata/EAGLE_25-mpiuse_report-step18302-16ranks-inter-step.dat.gz b/testdata/EAGLE_25-mpiuse_report-step18302-16ranks-inter-step.dat.gz deleted file mode 100644 index cdc12c1155c307218f0c8f6a37d6d683b1b9686f..0000000000000000000000000000000000000000 Binary files a/testdata/EAGLE_25-mpiuse_report-step18302-16ranks-inter-step.dat.gz and /dev/null differ