Skip to content
Snippets Groups Projects
Commit b6ea8a14 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Attempt at using a technique that uses the local memory buffers instead of a...

Attempt at using a technique that uses the local memory buffers instead of a large receive buffer per subtype
parent 5d810c1b
Branches
No related tags found
1 merge request!10Draft: Ragged one side registered buffers.
CFLAGS = -g -O0 -Wall
#CFLAGS = -g -O0 -Wall -std=gnu11 -fsanitize=address -fno-omit-frame-pointer
CFLAGS = -g -O0 -Wall -std=gnu11
all: swiftmpistepsim swiftmpifakestepsim
......
......@@ -5,18 +5,32 @@
extern int myrank;
/* Exit in error macro. */
#define error(s, ...) \
({ \
fflush(stdout); \
fprintf(stderr, "[%03i] %s %s:%s():%i: " s "\n", myrank, \
#define error(s, ...) \
({ \
fflush(stdout); \
fprintf(stderr, "[%03i] %s %s:%s():%i: " s "\n", myrank, \
clocks_get_timesincestart(), __FILE__, __FUNCTION__, __LINE__, \
##__VA_ARGS__); \
MPI_Abort(MPI_COMM_WORLD, -1); \
##__VA_ARGS__); \
MPI_Abort(MPI_COMM_WORLD, -1); \
})
/* Print a message */
#define message(s, ...) \
({ \
#define message(s, ...) \
({ \
printf("[%04i] %s %s: " s "\n", myrank, clocks_get_timesincestart(), \
__FUNCTION__, ##__VA_ARGS__); \
__FUNCTION__, ##__VA_ARGS__);fflush(stderr);fflush(stdout); \
})
/* Print MPI error string. */
#define mpi_error_message(res, s, ...) \
({ \
fflush(stdout); \
fprintf(stderr, "[%04i] %s %s:%s():%i: " s "\n", myrank, \
clocks_get_timesincestart(), __FILE__, __FUNCTION__, __LINE__, \
##__VA_ARGS__); \
int len = 1024; \
char buf[len]; \
MPI_Error_string(res, buf, &len); \
fprintf(stderr, "%s\n\n", buf); \
})
......@@ -82,6 +82,12 @@ struct mpiuse_log_entry {
/* Minimum time in a test. */
ticks tmin;
/* Remote offset. */
MPI_Aint offset;
/* Local log index, should be global. */
size_t index;
};
/* Flags for the types of request when generating fakes. */
......
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2019 Peter W. Draper
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
......@@ -17,11 +17,26 @@
*
******************************************************************************/
// One-sided variation will attempt to not use single contiguous memory
// allocations, and instead share the memory directly with MPI in the hope
// that we can avoid a memory copy into the device registered memory....
//
// Want to keep as close to the usual MPI_Isend/Irecv/Test usage as possible.
//
// Note that this means only the sending side updates the window buffer and
// since we have threaded access we can only use flushes with a shared lock
// that is permanently open to move data.
//
// Also the send side has no associated window, as it only pushes data, so we
// cannot seem to register that?
#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "atomic.h"
......@@ -29,15 +44,31 @@
#include "error.h"
#include "mpiuse.h"
#define maxtags 1000
/* Global: Our rank for all to see. */
int myrank = -1;
/* Number of ranks. */
static int nr_ranks;
/* Flags for controlling access. High end of size_t. */
static size_t UNLOCKED = (((size_t)2 << 63) - 1);
/* Size of a block of memory. All addressible memory chunks need to be a
* multiple of this as we need to align sends and receives in memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Size of message header in blocks. The unlocked flag, size and tag. Note
* size and tag are just for sanity checks. The flag value controls access to
* the main data areas. */
static const size_t HEADER_SIZE = 3;
/* Are we verbose. */
static int verbose = 0;
/* Attempt to keep original injection time differences. */
static int usetics = 1;
/* Scale to apply to the size of the messages we send. */
static float messagescale = 1.0;
......@@ -53,301 +84,291 @@ static const int task_type_send = 25;
static const int task_type_recv = 26;
/* Global communicators for each of the subtypes. */
static const int task_subtype_count = 34; // Just some upper limit on subtype.
#define task_subtype_count 34 // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[task_subtype_count];
/* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue;
static int volatile ind_req = 0;
static int volatile nr_reqs = 0;
/* Windows for one-sided communications. */
static MPI_Win mpi_window[task_subtype_count];
/* Offsets of the logs for all ranks. */
static MPI_Aint *ranklog_offsets;
static int *ranklog_indices;
#define INDEX4(nx, ny, nt, x, y, z, t) (nx * ny * nt * t + nx * ny * z + nx * y + x)
/* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue;
static int volatile nr_send = 0;
static int volatile todo_send = 0;
static int volatile injecting = 1;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0;
static int volatile ind_recv = 0;
/* The local receive queue. */
static struct mpiuse_log_entry **volatile recv_queue;
static int volatile nr_recv = 0;
static int volatile todo_recv = 0;
static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends = 0;
static int volatile ind_send = 0;
static int volatile todo_send = 0;
/* CPU frequency of the machine that created the MPI log. */
// XXX need to store this in the data file.
static double log_clocks_cpufreq = 2194844448.0;
/* The local requests queue. */
static struct mpiuse_log_entry **volatile req_queue;
static int volatile ind_req = 0;
static int volatile nr_req = 0;
static int volatile todo_req = 0;
/**
* @brief Convert a byte count into a number of blocks, rounds up.
*
* @param nr_bytes the number of bytes.
*
* @result the number of blocks needed.
*/
static int toblocks(BLOCKTYPE nr_bytes) {
return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
}
/**
* @brief Convert a block count into a number of bytes.
*
* @param nr_block the number of blocks.
*
* @result the number of bytes.
*/
static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) { return (nr_blocks * BYTESINBLOCK); }
/**
* @brief fill a data area with a pattern that can be checked for changes.
* @brief fill a data area with our rank.
*
* @param size size of data in bytes.
* @param size size of data in blocks.
* @param data the data to fill.
*/
static void datacheck_fill(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
p[i] = 170; /* 10101010 in bits. */
static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
for (BLOCKTYPE i = 0; i < size; i++) {
data[i] = myrank;
}
}
/**
* @brief test a filled data area for our pattern.
* @brief fill a data area with our rank.
*
* @param size size of data in bytes.
* @param size size of data in blocks.
* @param data the data to fill.
*/
static void datacheck_zero(BLOCKTYPE size, BLOCKTYPE *data) {
for (BLOCKTYPE i = 0; i < size; i++) {
data[i] = 0;
}
}
/**
* @brief test a filled data area for a value, reports if any unexpected value
* is found.
*
* @param size size of data in blocks.
* @param data the data to check.
* @param rank the value to, i.e. original rank.
*
* @result 1 on success, 0 otherwise.
*/
static int datacheck_test(size_t size, void *data) {
unsigned char *p = (unsigned char *)data;
static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) {
for (size_t i = 0; i < size; i++) {
if (p[i] != 170) return 0;
if (data[i] != (size_t)rank) {
message("see %zd expected %d @ %zd", data[i], rank, i);
return 0;
}
}
return 1;
}
/**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
* @brief Send thread, initiates the sending of messages to other ranks
* one-by-one with the correct offsets into the remote windows.
*
* The requests are initiated in the time order of the original log and an
* attempt to start these with the same interval gap is made if usetics is
* set, otherwise we just do them as quickly as possible.
* Messages are all considered in order, regardless of the subtype.
*/
static void *inject_thread(void *arg) {
static void *send_thread(void *arg) {
if (verbose) message("%d: injection thread starts", *((int *)arg));
message("%d: send thread starts with %d messages", *((int *)arg), nr_send);
ticks starttics = getticks();
/* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */
ticks basetic = reqs_queue[0]->tic;
ticks looptics = 0;
double deadtime = 0.0;
struct timespec sleep;
sleep.tv_sec = 0;
while (ind_req < nr_reqs) {
struct mpiuse_log_entry *log = reqs_queue[ind_req];
if (usetics) {
/* Expect time between this request and the previous one. */
ticks dt = log->tic - basetic;
basetic = log->tic;
/* We guess some time below which we should not attempt to wait,
* otherwise we'll start to overrun, and just inject the next call if we
* are below that (we time the ticks this loop takes without any waiting
* and use that). Otherwise we wait a while. Note we need to convert the
* ticks of the log file into nanoseconds, that requires the original
* CPU frequency. */
if (dt > looptics) {
/* Remember to be fair and remove the looptics, then convert to
* nanoseconds. */
double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9;
if (ns < 1.0e9) {
sleep.tv_nsec = (long)ns;
} else {
/* Wait more than one second. Must be an error, but complain and
* continue. */
sleep.tv_nsec = (long)1.0e9;
message("wait greater than one second");
}
nanosleep(&sleep, NULL);
deadtime += sleep.tv_nsec;
}
}
/* Initialise new log elements. */
log->done = 0;
log->nr_tests = 0;
log->tsum = 0.0;
log->tmax = 0;
log->tmin = LONG_MAX;
log->endtic = 0;
log->injtic = getticks();
/* Differences to SWIFT: MPI_BYTE not the MPI_Type. */
int err = 0;
if (log->type == task_type_send) {
log->data = calloc(log->size, 1);
/* Fill data with pattern. */
if (datacheck) datacheck_fill(log->size, log->data);
/* And send. */
err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
subtypeMPI_comms[log->subtype], &log->req);
injecting = 1;
for (int k = 0; k < nr_send; k++) {
struct mpiuse_log_entry *log = send_queue[k];
if (log == NULL) error("NULL send message queued (%d/%d)", k, nr_send);
/* Add a new send request. */
int ind = atomic_inc(&nr_sends);
sends_queue[ind] = log;
atomic_inc(&todo_send);
/* Data has the actual data and room for the header. */
BLOCKTYPE datasize = log->size;
BLOCKTYPE *dataptr = log->data;
} else {
/* Ready to receive. */
log->data = calloc(log->size, 1);
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
subtypeMPI_comms[log->subtype], &log->req);
/* Update the log for the initiation of this message. */
log->injtic = getticks();
/* Add a new recv request. */
int ind = atomic_inc(&nr_recvs);
recvs_queue[ind] = log;
atomic_inc(&todo_recv);
}
if (err != MPI_SUCCESS) error("Failed to activate send or recv");
ind_req++;
/* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are
* equally timed. Note we include a nanosleep, they are slow. */
if (looptics == 0 && usetics) {
sleep.tv_nsec = 1;
nanosleep(&sleep, NULL);
looptics = (getticks() - starttics);
if (verbose)
message("injection loop took %.3f %s.", clocks_from_ticks(looptics),
clocks_getunit());
}
/* Fill data with pattern. */
if (datacheck) datacheck_fill(datasize, dataptr);
/* Need to find the offset for this data in remote. */
/* XXX Index of recv log on the remote side? */
int index = ranklog_indices[INDEX4(nr_ranks, nr_ranks, task_subtype_count,
log->rank, log->otherrank, log->subtype, log->tag)];
log->offset = ranklog_offsets[INDEX4(nr_ranks, nr_ranks, task_subtype_count,
log->rank, log->otherrank, log->subtype, index)];
message("Using offset %zd at index %d (%d %d)", log->offset, index,
log->otherrank, log->tag );
/* And define header; dataptr[0] can be any value except UNLOCKED. */
dataptr[0] = 0;
dataptr[1] = log->size;
dataptr[2] = log->tag;
/* And start the send of data to other rank. Note in blocks. */
//int ret = MPI_Raccumulate(&dataptr[1], datasize - 1, MPI_BLOCKTYPE,
// log->otherrank, log->offset + 1, datasize - 1,
// MPI_BLOCKTYPE, MPI_REPLACE,
// mpi_window[log->subtype], &log->req);
int ret = MPI_Raccumulate(dataptr, datasize, MPI_BLOCKTYPE,
log->otherrank, log->offset, datasize,
MPI_BLOCKTYPE, MPI_REPLACE,
mpi_window[log->subtype], &log->req);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data");
/* Add to the requests queue. */
int ind = atomic_inc(&nr_req);
req_queue[ind] = log;
atomic_inc(&todo_req);
}
/* All done, thread exiting. */
if (verbose) {
message("%d injections completed, sends = %d, recvs = %d", ind_req,
nr_sends, nr_recvs);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
if (usetics) message("deadtime %.3f ms", deadtime / 1.0e6);
}
/* All done. */
atomic_dec(&injecting);
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
atomic_dec(&injecting);
return NULL;
}
/**
* @brief main loop to run over a queue of MPI requests and test for when they
* complete. Returns the total amount of time spent in calls to MPI_Test and
* the number of times it was called.
*
* @param logs the list of logs pointing to requests.
* @param nr_logs pointer to the variable containing the current number of
* logs.
* @param todos pointer to the variable containing the number of requests that
* are still active.
* @param sum the total number of ticks spent in calls to MPI_Test.
* @param ncalls the total number of calls to MPI_Test.
* @param mint the minimum ticks an MPI_Test call took.
* @param maxt the maximum ticks an MPI_Test call took.
* @brief Requests thread, checks for local completion of the send accumulates
* and then sets the remote atomic so that the message can be received.
*/
static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
int volatile *todos, double *sum, int *ncalls,
ticks *mint, ticks *maxt) {
/* Global MPI_Test statistics. */
int lncalls = 0;
double lsum = 0.0;
ticks lmint = LONG_MAX;
ticks lmaxt = 0;
/* We loop while new requests are being injected and we still have requests
* to complete. */
while (injecting || (!injecting && *todos > 0)) {
int nlogs = *nr_logs;
static void *req_thread(void *arg) {
message("%d: req thread starts with %d messages", *((int *)arg), nr_req);
ticks starttics = getticks();
while (injecting || (!injecting && todo_req > 0)) {
int nlogs = nr_req;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = logs[k];
struct mpiuse_log_entry *log = req_queue[k];
if (log != NULL && !log->done) {
ticks tics = getticks();
int res;
MPI_Status stat;
int err = MPI_Test(&log->req, &res, &stat);
if (err != MPI_SUCCESS) {
error("MPI_Test call failed");
}
/* Increment etc. of statistics about time in MPI_Test. */
ticks dt = getticks() - tics;
log->tsum += (double)dt;
lsum += (double)dt;
log->nr_tests++;
lncalls++;
if (dt < log->tmin) log->tmin = dt;
if (dt > log->tmax) log->tmax = dt;
if (dt < lmint) lmint = dt;
if (dt > lmaxt) lmaxt = dt;
if (res) {
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(log->size, log->data)) {
error("Data mismatch on completion");
}
/* Done, clean up. */
// Get update. XXX is this needed?
int ret =
MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS)
mpi_error_message(ret, "MPI_Win_flush failed");
/* Send the UNLOCKED message. */
BLOCKTYPE newval[1];
BLOCKTYPE oldval[1];
newval[0] = UNLOCKED;
oldval[0] = 0;
ret = MPI_Compare_and_swap(&newval[0], log->data, &oldval[0],
MPI_BLOCKTYPE, log->otherrank, log->offset,
mpi_window[log->subtype]);
if (ret != MPI_SUCCESS)
mpi_error_message(ret, "MPI_Compare_and_swap error");
/* And complete. */
ret = MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS)
mpi_error_message(ret, "MPI_Win_flush failed");
/* Done. */
log->done = 1;
log->endtic = getticks();
free(log->data);
atomic_dec(todos);
MPI_Free_mem(log->data);
atomic_dec(&todo_req);
}
}
}
}
/* All done. */
*sum = lsum;
*ncalls = lncalls;
*mint = lmint;
*maxt = lmaxt;
return;
}
/**
* @brief Send thread, checks if MPI_Isend requests have completed.
*/
static void *send_thread(void *arg) {
if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting);
ticks starttics = getticks();
int ncalls;
double sum;
ticks mint;
ticks maxt;
queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt);
message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time "
"%.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
}
/**
* @brief Recv thread, checks if MPI_Irecv requests have completed.
* @brief Recv thread, checks for messages in the window from other ranks.
*/
static void *recv_thread(void *arg) {
if (verbose) message("%d: recv thread starts", *((int *)arg));
message(
"%d: recv thread starts, checking for %d messages %d "
"ranks %d communicators",
*((int *)arg), nr_recv, nr_ranks, task_subtype_count);
ticks starttics = getticks();
int ncalls;
double sum;
ticks mint;
ticks maxt;
queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt);
/* No. of receives to process. */
todo_recv = nr_recv;
message(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time "
"%.3f (%s)",
ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* We loop while new requests are being send and we still have messages
* to receive. */
while (todo_recv > 0) {
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done) {
/* On the first attempt we start listening for this receive. */
if (log->injtic == 0) log->injtic = getticks();
/* Check if that part of the window has been unlocked. */
BLOCKTYPE *dataptr = log->data;
BLOCKTYPE volatile lock = dataptr[0];
if (lock == UNLOCKED) {
/* OK, so data should be ready for use, check the tag and size. */
if ((size_t)log->size == dataptr[1] &&
(size_t)log->tag == dataptr[2]) {
if (verbose) /* Check data sent data is unchanged. */
if (datacheck) {
if (!datacheck_test(log->size - HEADER_SIZE, &dataptr[HEADER_SIZE],
log->otherrank)) {
error("Data mismatch on completion");
}
}
/* Done, clean up. */
log->done = 1;
log->endtic = getticks();
atomic_dec(&todo_recv);
if (todo_recv == 0) break;
} else {
error("Unlocked data has incorrect tag or size: %zd/%zd %d/%zd",
log->size, dataptr[1], log->tag, dataptr[2]);
}
}
/* Need to allow for some MPI progession. Since we make no MPI calls
* (by intent receive is a passive target so only the sender should
* make calls that move data) use a no-op call. Only need to do this
* when not injecting. */
if (!injecting) {
int flag = 0;
int ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
&flag, MPI_STATUS_IGNORE);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Iprobe failed");
}
}
}
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
/* Thread exits. */
return NULL;
......@@ -375,15 +396,21 @@ static void pick_logs(void) {
size_t nlogs = mpiuse_nr_logs();
/* Duplicate of logs. */
reqs_queue = (struct mpiuse_log_entry **)calloc(
req_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_reqs = 0;
sends_queue = (struct mpiuse_log_entry **)calloc(
nr_req = 0;
send_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends = 0;
recvs_queue = (struct mpiuse_log_entry **)calloc(
nr_send = 0;
recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recvs = 0;
nr_recv = 0;
/* Arrays of offsets and indices for all ranks. */
ranklog_offsets = (MPI_Aint *)calloc(nlogs * task_subtype_count * nr_ranks * nr_ranks , sizeof(MPI_Aint));
ranklog_indices = (int *)calloc(maxtags * task_subtype_count * nr_ranks * nr_ranks, sizeof(int));
for (int k = 0; k < maxtags * task_subtype_count * nr_ranks *nr_ranks; k++) ranklog_indices[k] = -1;
message("nlogs = %zd", nlogs);
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
......@@ -394,12 +421,66 @@ static void pick_logs(void) {
if (messagesize > 0) log->size = messagesize;
/* Scale size. */
log->size *= messagescale ;
log->size *= messagescale;
/* Into blocks. */
log->size = toblocks(log->size);
/* Make room for control words. */
log->size += HEADER_SIZE;
/* Allocate memory for use in log. */
int ret = MPI_Alloc_mem(tobytes(log->size), MPI_INFO_NULL, &log->data);
if (ret != MPI_SUCCESS) {
mpi_error_message(ret, "MPI_Alloc_mem failed (%zd)", log->size);
}
/* And keep this log. */
log->data = NULL;
reqs_queue[nr_reqs] = log;
nr_reqs++;
/* Get MPI address and global log index. */
MPI_Get_address(log->data, &log->offset);
//log->offset = (MPI_Aint) &log->data;
log->index = k;
if (log->type == task_type_recv) {
message("recv log from rank: %d/%d/%d/%d %zd %zd",
log->otherrank, myrank, log->subtype,
log->tag, log->offset, log->index);
/* Keep offset for sharing with send side. */
ranklog_offsets[INDEX4(nr_ranks, nr_ranks, task_subtype_count,
log->otherrank, myrank,
log->subtype, log->index)] = log->offset;
ranklog_indices[INDEX4(nr_ranks, nr_ranks, task_subtype_count,
log->otherrank, myrank,
log->subtype, log->tag)] = log->index;
/* And attach to window. */
MPI_Win_attach(mpi_window[log->subtype], log->data, tobytes(log->size));
recv_queue[nr_recv] = log;
nr_recv++;
} else {
//message("send log to rank: %d %zd %d", log->otherrank,
// log->offset, log->tag);
/* Sends local data that is not in a window. */
send_queue[nr_send] = log;
nr_send++;
}
log->done = 0;
BLOCKTYPE *dataptr = log->data;
dataptr[0] = 0;
log->nr_tests = 0;
log->tsum = 0.0;
log->tmax = 0;
log->tmin = LONG_MAX;
log->endtic = 0;
// XXX test XXX
datacheck_zero(log->size, log->data);
} else {
error("task type '%d' is not a known send or recv task", log->type);
......@@ -408,14 +489,9 @@ static void pick_logs(void) {
}
/* Sort into increasing time. */
qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
/* Check. */
for (int k = 0; k < nr_reqs - 1; k++) {
if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic)
message("reqs_queue: %lld > %lld", reqs_queue[k]->tic,
reqs_queue[k + 1]->tic);
}
}
/**
......@@ -453,14 +529,11 @@ int main(int argc, char *argv[]) {
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int opt;
while ((opt = getopt(argc, argv, "vfdc:s:")) != -1) {
while ((opt = getopt(argc, argv, "vdc:s:")) != -1) {
switch (opt) {
case 'd':
datacheck = 1;
break;
case 'f':
usetics = 0;
break;
case 'v':
verbose = 1;
break;
......@@ -487,21 +560,64 @@ int main(int argc, char *argv[]) {
* queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
* log per rank per step, so you need to combine all ranks from a step. */
mpiuse_log_restore(infile);
int nranks = mpiuse_nr_ranks();
nr_ranks = mpiuse_nr_ranks();
/* This should match the expected size. */
if (nr_nodes != nranks)
if (nr_nodes != nr_ranks)
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_nodes);
nr_ranks, nr_nodes);
/* Create communicators for each subtype of the tasks. */
/* Now for the one-sided setup... Each rank needs a communicator and a
* dynamic window to attach the memory to. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
int ret = MPI_Win_create_dynamic(MPI_INFO_NULL, subtypeMPI_comms[i],
&mpi_window[i]);
if (ret != MPI_SUCCESS) {
mpi_error_message(ret, "MPI_Win_create_dynamic failed");
}
/* Assert a shared lock with all the other processes on this window.
* Strictly needed as we use threads, so cannot lock or unlock as
* a means of synchronization. */
MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]);
}
/* Each rank requires its own queue, so extract them. */
/* Each rank requires its own queue, so extract them, also associate memory
* with windows. */
pick_logs();
/* We need to share all the offsets for each log so they can push data into
* the correct parts of our receive window. */
size_t nlogs = mpiuse_nr_logs();
MPI_Allreduce(MPI_IN_PLACE, ranklog_offsets, nr_ranks * nr_ranks * task_subtype_count *
nlogs, MPI_AINT, MPI_MAX, MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, ranklog_indices, nr_ranks * nr_ranks * task_subtype_count *
maxtags, MPI_INT, MPI_MAX, MPI_COMM_WORLD);
if (myrank == 0) {
int count = 0;
for (int l = 0; l < nr_ranks; l++) {
for (int k = 0; k < nr_ranks; k++) {
for (int i = 0; i < task_subtype_count; i++) {
for (int j = 0; j < maxtags; j++) {
int index = ranklog_indices[INDEX4(nr_ranks, nr_ranks,
task_subtype_count, l, k, i, j)];
if (index >= 0) {
MPI_Aint offset = ranklog_offsets[INDEX4(nr_ranks, nr_ranks,
task_subtype_count, l, k, i, index)];
if (offset > 0) {
message("%d/%d/%d/%d %zd %d", l, k , i, j, offset, index);
count++;
}
}
}
}
}
}
message("offsets = %d", count);
}
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
clocks_set_cpufreq(0);
......@@ -512,33 +628,43 @@ int main(int argc, char *argv[]) {
message("Using fixed message size of %zd", messagesize);
if (messagescale != 1.0f)
message("Using message scale of %f", messagescale);
if (!usetics) message("Using fast untimed injections");
if (datacheck)
message("Checking data pattern on send and recv completion");
}
/* Make three threads, one for injecting tasks and two to check for
* completions of the sends and recv independently. */
pthread_t injectthread;
if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
error("Failed to create injection thread.");
/* Make three threads, one for injecting the sends, one to listen for
* injection completion and one for testing receives. */
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
pthread_t reqthread;
if (pthread_create(&reqthread, NULL, &req_thread, &myrank) != 0)
error("Failed to create send thread.");
pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread, NULL);
/* Wait until all threads have exited and all message exchanges have
* completed. */
pthread_join(reqthread, NULL);
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* XXX Should free all the log memory, or do that as we go... */
/* Free the window locks. Only after we all arrive. */
MPI_Barrier(MPI_COMM_WORLD);
for (int i = 0; i < task_subtype_count; i++) {
MPI_Win_unlock_all(mpi_window[i]);
MPI_Win_free(&mpi_window[i]);
}
/* Dump the updated MPI logs. */
MPI_Barrier(MPI_COMM_WORLD);
fflush(stdout);
if (myrank == 0) message("Dumping updated log");
mpiuse_dump_logs(nranks, logfile);
mpiuse_dump_logs(nr_ranks, logfile);
/* Shutdown MPI. */
res = MPI_Finalize();
......
File deleted
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment