Skip to content
Snippets Groups Projects
Commit 0cd76249 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Cleanup

parent fae46c95
No related branches found
No related tags found
1 merge request!11Draft: Fast one-sided MPI version
......@@ -20,23 +20,22 @@
// Fully one sided approach with passive target communication. This means only
// the sending side updates the window buffer and since we have threaded
// access we can only use flushes with a shared lock that is permanently open
// to move data. The send side has no window, as it only pushes data.
// to move data. The send side has no associated window, as it only pushes data.
//
// So each rank needs a receive window that has room for all the expected
// sends, plus an additional element for controlling the readyness of the data
// sends, plus additional elements for controlling the readiness of the data
// (this is an atomic send that should be guaranteed to only be updated after
// the send of the main data).
// the send of the main data) and correctness.
//
// In this implementation the size of the receive buffer per rank is just the
// sum of all the messages we know we are about to get. The order of that
// buffer is determined by the rank and tags, which gives us a list of offsets
// into the buffer mapped by the ranktag, which we need to share with any rank
// that is expected to send us data. We'll send this data using normal MPI
// (could be done either as another extension into the window, which we get,
// but we'd need to synchronize that across all ranks, or we could use the
// global communicator to share this in a similar fashion).
//
// XXX ranktag per subtype is not unique we need to include the size.
// buffer is determined by the send and receive rank and the tag, which gives
// us a list of offsets into the buffer mapped by the ranktag, which we need
// to share with any rank that is expected to send us data. We'll send this
// data using normal MPI (could be done either as another extension into the
// window, which we get, but we'd need to synchronize that across all ranks,
// or we could use the global communicator to share this in a similar
// fashion).
#include <limits.h>
#include <mpi.h>
......@@ -67,21 +66,24 @@ int myrank = -1;
/* Number of ranks. */
static int nr_ranks;
/* Bit shift to accomodate all the bits of the maximum rank id. */
static int rank_shift = 0;
/* Maximum no. of messages (logs). */
static size_t max_logs = 0;
/* Flags for controlling access. */
/* Flags for controlling access. High end of size_t. */
static size_t UNLOCKED = (((size_t)2 << 63) - 1);
/* Size of a block of memory. All addressible memory chunks need to be a
* multiple of this as as we need to align sends in memory. */
* multiple of this as we need to align sends and receives in memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Size of message header in blocks. The flag, size and tag. Note size and tag
* are just for sanity checks. The flag value controls access to the main data
* areas. */
/* Size of message header in blocks. The unlocked flag, size and tag. Note
* size and tag are just for sanity checks. The flag value controls access to
* the main data areas. */
static const size_t HEADER_SIZE = 3;
/* Are we verbose. */
......@@ -106,7 +108,6 @@ static BLOCKTYPE *mpi_ptr[task_subtype_count] = {NULL};
static size_t ranktag_sizes[task_subtype_count] = {0};
static size_t *ranktag_counts;
static size_t *ranktag_offsets;
;
static size_t *ranktag_lists;
/* The local send queue. */
......@@ -120,7 +121,7 @@ static int volatile nr_recv = 0;
static int volatile todo_recv = 0;
/**
* @brief Convert ranks and tag into a single unique value.
* @brief Convert two ranks and tag into a single unique value.
*
* Assumes there is enough space in a size_t for these values.
*
......@@ -131,21 +132,10 @@ static int volatile todo_recv = 0;
* @result a unique value based on both values
*/
static size_t toranktag(int sendrank, int recvrank, int tag) {
int shift = (sizeof(int) * 8) - __builtin_clz(nr_ranks); /* XXX could precalc. */
//message("nr_ranks = %d, shift = %d", nr_ranks, shift);
size_t result = sendrank | recvrank << shift | tag << (shift * 2);
size_t result = sendrank | recvrank << rank_shift | tag << (rank_shift * 2);
return result;
}
static char *tokey(struct mpiuse_log_entry *log) {
static char buf[256];
sprintf(buf, "%d/%d/%d/%zd on %d ranktags %zd/%zd",
log->otherrank, log->tag, log->subtype, log->size, log->rank,
toranktag(log->rank, log->otherrank, log->tag), toranktag(log->rank, log->otherrank, log->tag));
return buf;
}
/**
* @brief Convert a byte count into a number of blocks, rounds up.
*
......@@ -179,7 +169,8 @@ static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
}
/**
* @brief test a filled data area for a value.
* @brief test a filled data area for a value, reports if any unexpected value
* is found.
*
* @param size size of data in bytes.
* @param data the data to check.
......@@ -229,37 +220,27 @@ static void *send_thread(void *arg) {
* subtype, tag and rank. Need to search the ranktag_lists for our ranktag
* value. XXX bisection search XXX */
size_t ranktag = toranktag(log->rank, log->otherrank, log->tag);
//message("looking for %s", tokey(log));
size_t counts = ranktag_counts[INDEX2(task_subtype_count, log->subtype,
log->otherrank)];
size_t offset = 0;
int found = 0;
struct mpiuse_log_entry *keeplog = NULL;
counts = max_logs;
int found = 0;
counts = max_logs; // XXX do we still need this?
for (size_t j = 0; j < counts; j++) {
if (ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype,
log->otherrank, j)] == ranktag) {
if (found) message("duplicate %zd: %s of %s", ranktag, tokey(log), tokey(keeplog));
offset = ranktag_offsets[INDEX3(task_subtype_count, nr_ranks,
log->subtype, log->otherrank, j)];
keeplog = log;
//message("%d/%d located offset %zd one of %zd ranktag: %zd", log->otherrank,
// log->subtype, offset, counts, ranktag);
found = 1;
//break;
break;
}
}
if (!found) {
fflush(stdout); fflush(stderr);
error("Failed sending a message of size %zd to %d/%d @ %zd\n, no offset"
" found for ranktag %zd, counts = %zd", datasize, log->otherrank, log->subtype,
offset, ranktag, counts);
error(
"Failed sending a message of size %zd to %d/%d "
"@ %zd\n, no offset found for ranktag %zd, counts = %zd",
datasize, log->otherrank, log->subtype, offset, ranktag, counts);
}
//message("Sending a message of size %zd to %d/%d @ %zd", datasize,
// log->otherrank, log->subtype, offset);
/* And send data to other rank. */
int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
......@@ -286,19 +267,6 @@ static void *send_thread(void *arg) {
ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
//if (verbose) {
// if (oldval[0] == dataptr[0]) {
// message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)",
// log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
// offset, k, nr_send);
// } else {
// message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd %d/%d",
// log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
// offset, k, nr_send);
// }
//}
//sleep(1);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
......@@ -318,19 +286,12 @@ static void *recv_thread(void *arg) {
*((int *)arg), nr_recv, nr_ranks, task_subtype_count);
ticks starttics = getticks();
/* Global statistics. */
//int lncalls = 0;
//double lsum = 0.0;
//ticks lmint = INT_MAX;
//ticks lmaxt = 0;
/* No. of receives to process. */
todo_recv = nr_recv;
/* We loop while new requests are being send and we still have messages
* to receive. */
while (todo_recv > 0) {
//sleep(5);
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done) {
......@@ -341,28 +302,18 @@ static void *recv_thread(void *arg) {
/* Check if that part of the window has been unlocked. */
BLOCKTYPE *dataptr = &mpi_ptr[log->subtype][offset];
BLOCKTYPE volatile lock = dataptr[0];
//message("Checking: %s @ %zd", (dataptr[0] == UNLOCKED?"unlocked":"locked"), offset);
//if (atomic_cas(&dataptr[0], UNLOCKED, UNLOCKED)) {
if (lock == UNLOCKED) {
//message("Checked: %s @ %zd", (dataptr[0] == UNLOCKED?"unlocked":"locked"), offset);
/* OK, so data should be ready for use, check the tag and size. */
if ((size_t)log->size == dataptr[1] && (size_t)log->tag == dataptr[2]) {
if (verbose)
//message("receive message %d/%d from %d @ offset %zd: "
// "dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
// log->otherrank, offset, dataptr[0], log->size,
// toblocks(log->size) + HEADER_SIZE);
/* Check data sent data is unchanged. */
if (datacheck) {
if (!datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], log->otherrank)) {
fflush(stdout);
fflush(stderr);
error("Data mismatch on completion");
if ((size_t)log->size == dataptr[1] &&
(size_t)log->tag == dataptr[2]) {
if (verbose) /* Check data sent data is unchanged. */
if (datacheck) {
if (!datacheck_test(toblocks(log->size), &dataptr[HEADER_SIZE],
log->otherrank)) {
error("Data mismatch on completion");
}
}
}
/* Done, clean up. */
log->done = 1;
......@@ -370,20 +321,10 @@ static void *recv_thread(void *arg) {
if (todo_recv == 0) break;
} else {
message("bad unlocked message %d/%d from %d @ offset %zd: "
"dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
log->otherrank, offset, dataptr[0], log->size,
toblocks(log->size) + HEADER_SIZE);
error("Unlocked data has incorrect tag or size: %zd/%zd %d/%zd",
log->size, dataptr[1], log->tag, dataptr[2]);
}
}
//else {
// message("unlocked message %d/%d from %d @ offset %zd: "
// "dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
// log->otherrank, offset, dataptr[0], log->size,
// toblocks(log->size) + HEADER_SIZE);
//}
}
/* Need to allow for some MPI progession. Since we make no MPI calls
* (by intent receive is a passive target so only the sender should
......@@ -448,7 +389,6 @@ static void pick_logs() {
log->done = 0;
log->data = NULL;
log->ranktag = toranktag(log->otherrank, log->rank, log->tag);
//message("add %s", tokey(log));
if (log->type == task_type_send) {
send_queue[nr_send] = log;
nr_send++;
......@@ -478,20 +418,18 @@ static void pick_logs() {
* of the windows. */
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank, k)] = log->ranktag;
ranktag_offsets[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank, k)] = ranktag_sizes[log->subtype];
ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank,
k)] = log->ranktag;
ranktag_offsets[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank,
k)] = ranktag_sizes[log->subtype];
log->offset = ranktag_sizes[log->subtype];
/* Need to use a multiple of blocks to keep alignment. */
size_t size = toblocks(log->size) + HEADER_SIZE;
//message("%d increment offset by %zd blocks", log->subtype, size);
ranktag_sizes[log->subtype] += size;
ranktag_counts[INDEX2(task_subtype_count, log->subtype, myrank)]++;
}
if (verbose)
message("local logs picked, nr_send = %d, nr_recv = %d ", nr_send, nr_recv);
}
/**
......@@ -557,39 +495,36 @@ int main(int argc, char *argv[]) {
error("The number of MPI ranks %d does not match the expected value %d",
nranks, nr_ranks);
/* Index of most significant bit in the maximum rank id. Assumes GCC
* intrinsic. */
rank_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(nr_ranks);
/* We all need to agree on a maximum count of logs, so we can exchange the
* offset arrays (would be ragged otherwise). */
* offset arrays (would be ragged otherwise and difficult to exchange). */
max_logs = mpiuse_nr_logs() / 2 + 1;
MPI_Allreduce(MPI_IN_PLACE, &max_logs, 1, MPI_AINT, MPI_MAX, MPI_COMM_WORLD);
if (verbose) message("maximum log count = %zd", max_logs);
/* Extract the send and recv messages for our rank. */
/* Extract the send and recv messages for our rank and populate the queues. */
pick_logs();
/* Now for the one-sided setup... Each rank needs a buffer per communicator
* with space for all the expected messages. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
size_t size = tobytes(ranktag_sizes[i]);
if (size == 0) size = BYTESINBLOCK;
MPI_Win_allocate(size, BYTESINBLOCK, MPI_INFO_NULL,
subtypeMPI_comms[i], &mpi_ptr[i], &mpi_window[i]);
memset(mpi_ptr[i], 170, tobytes(ranktag_sizes[i]));
//if (verbose)
// message("Allocated window of size %zd for subtype %d",
// ranktag_sizes[i], i);
/* Assert a shared lock with all the other processes on this
* window. Needed as we use threads, so cannot lock or unlock as a means
* of synchronization. */
MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]);
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
size_t size = tobytes(ranktag_sizes[i]);
if (size == 0) size = BYTESINBLOCK;
MPI_Win_allocate(size, BYTESINBLOCK, MPI_INFO_NULL, subtypeMPI_comms[i],
&mpi_ptr[i], &mpi_window[i]);
memset(mpi_ptr[i], 170, tobytes(ranktag_sizes[i]));
/* Assert a shared lock with all the other processes on this window.
* Strictly needed as we use threads, so cannot lock or unlock as
* a means of synchronization. */
MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]);
}
//message("Windows allocated");
/* We need to share all the offsets for each communucator with all the other
/* We need to share all the offsets for each communicator with all the other
* ranks so they can push data into the correct parts of our receive
* window. */
MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets,
......@@ -600,7 +535,6 @@ int main(int argc, char *argv[]) {
MPI_Allreduce(MPI_IN_PLACE, ranktag_lists,
task_subtype_count * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
MPI_COMM_WORLD);
//message("check: local logs picked, nr_send = %d, nr_recv = %d ", nr_send, nr_recv);
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier(MPI_COMM_WORLD);
......@@ -618,9 +552,6 @@ int main(int argc, char *argv[]) {
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
/* XXX could have more than one of these... With a partition of the
* subtypes. */
pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread.");
......@@ -630,7 +561,7 @@ int main(int argc, char *argv[]) {
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
/* Free the window locks. Once we all arrive. */
/* Free the window locks. Only after we all arrive. */
MPI_Barrier(MPI_COMM_WORLD);
for (int i = 0; i < task_subtype_count; i++) {
MPI_Win_unlock_all(mpi_window[i]);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment