Skip to content
Snippets Groups Projects

Draft: Fast one-sided MPI version

Open Peter W. Draper requested to merge asyncreallyonesided-fast into master
1 file
+ 99
43
Compare changes
  • Side-by-side
  • Inline
+ 99
43
@@ -17,6 +17,17 @@
*
******************************************************************************/
// Simple approach, use the window as a message board, capable of receiving a
// single message per ranks at a time, so needs to be larger than the largest
// message, and we need one per of these per rank.
//
// So we poll all ranks waiting for a push update unlocks its board, we then
// check for the tag and size, which need to match one of the expected
// messages, at which point we copy that away and release the board.
//
// On the send side we work synchronously, sending a message at a time
// waiting for our board to be unlocked by the receiver.
#include <limits.h>
#include <mpi.h>
#include <pthread.h>
@@ -36,8 +47,16 @@ int myrank = -1;
/* Number of ranks. */
static int nr_ranks;
#define READY -2
#define DONE -10
/* Flags for controlling access. */
static int LOCKED = -2;
static int UNLOCKED = -3;
/* Size of message header. The flag, size and tag. */
static size_t HEADER_SIZE = sizeof(size_t) * 3;
/* Size of a message board, we have one of these per rank per communicator
* (i.e. per window). */
static size_t MESSAGE_SIZE = 0;
/* Are we verbose. */
static int verbose = 0;
@@ -57,17 +76,8 @@ static MPI_Comm subtypeMPI_comms[task_subtype_count];
static MPI_Win mpi_window[task_subtype_count];
static char *mpi_ptr[task_subtype_count];
// Simple approach, use the window as a message board, capable of receiving a
// single message at a time, so needs to be larger than the largest message.
//
// So we poll for all possible messages, until we see our tag and size
// at which point we copy that away and release the window.
//
// On the send side we work synchronously, sending a message at a time
// (could have one send per rank underway, as a further development).
//
// Synchronization seems two side, with exclusive locks on both sides
// and an atomic flag to control access.
/* Size of a board for a rank. */
static size_t board_size = 0;
/* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue;
@@ -110,6 +120,8 @@ static int datacheck_test(size_t size, void *data) {
/**
* @brief Send thread, sends messages to other ranks one-by-one.
*
* Messages are all considered in order, regardless of the subtype.
*/
static void *send_thread(void *arg) {
@@ -118,34 +130,65 @@ static void *send_thread(void *arg) {
for (int k = 0; k < nr_send; k++) {
struct mpiuse_log_entry *log = send_queue[k];
log->data = calloc(log->size + 1, 1);
/* Data has the actual data and room for the header. */
log->data = calloc(HEADER_SIZE + log->size, 1);
size_t *dataptr = (size_t *)log->data;
/* Fill data with pattern. */
if (datacheck) datacheck_fill(log->size, log->data);
if (datacheck) datacheck_fill(HEADER_SIZE + log->size, dataptr);
/* Last char is marked as READY (to receive) */
((char *)log->data)[log->size] = READY;
/* First element is marked as LOCKED, so only we can update. */
dataptr[0] = LOCKED;
dataptr[1] = log->size;
dataptr[2] = log->tag;
/* And send data to other rank. */
MPI_Accumulate(log->data, log->size + 1, MPI_BYTE, log->otherrank, 0,
log->size + 1, MPI_BYTE, MPI_REPLACE,
MPI_Accumulate(log->data, HEADER_SIZE + log->size, MPI_BYTE,
log->otherrank, MESSAGE_SIZE * myrank,
HEADER_SIZE + log->size, MPI_BYTE, MPI_REPLACE,
mpi_window[log->subtype]);
/* Now we change the last element to DONE so that the remote end can
/* Now we change the last element to UNLOCKED so that the remote end can
* find out that the data has arrived. */
char newval[1];
char oldval[1];
newval[0] = DONE;
oldval[0] = DONE;
MPI_Compare_and_swap(&newval[0], &((char *)log->data)[log->size],
&oldval[0], MPI_BYTE, log->otherrank, log->size,
size_t newval[1];
size_t oldval[1];
newval[0] = UNLOCKED;
oldval[0] = 0;
MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_AINT,
log->otherrank, myrank * MESSAGE_SIZE,
mpi_window[log->subtype]);
//MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
MPI_Win_flush_all(mpi_window[log->subtype]);
// XXX need to make sure the data area is free to overwrite once more XXX
// flip the remote atomic.
if (oldval[0] == dataptr[0]) {
message("sent a message to %d/%d (%zd:%zd:%zd)", log->otherrank,
log->subtype, dataptr[0], oldval[0], newval[0]);
} else {
message("failed to send a message to %d/%d (%zd:%zd:%zd)", log->otherrank,
log->subtype, dataptr[0], oldval[0], newval[0]);
}
/* Wait for completion, this is when remote flips back to LOCKED. We poll
* on a get, as the local window is only used for receiving. Use an Rget
* so we can use MPI_Test to get some local progression. */
while (dataptr[0] == LOCKED) {
MPI_Request request;
MPI_Rget(dataptr, 1, MPI_AINT, log->otherrank, 0, 1, MPI_AINT,
mpi_window[log->subtype], &request);
MPI_Win_flush_all(mpi_window[log->subtype]);
int flag = 0;
while (flag == 0) {
MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
}
message("Waiting for unlock (%zd)", dataptr[0]);
}
message("sent and received... %d/%d", k, nr_send);
/* Ready the next send. */
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
@@ -180,17 +223,15 @@ static void *recv_thread(void *arg) {
if (log != NULL && !log->done) {
ticks tics = getticks();
int arrived = 0;
//message("Checking at %zd", log->size);
if (mpi_ptr[log->subtype][log->size] == DONE) arrived = 1;
// Need to allow for some MPI progession... Since we make no
// MPI calls. Should not be needed if using a progression thread?
int flag;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
&flag, MPI_STATUS_IGNORE);
MPI_Win_flush_all(mpi_window[log->subtype]); // XXX emergency measure
// XXX match this to the expected receive...
/* See if this message is ready (should really check for messages
* and match to the queue, so other way around). */
int ready = 0;
size_t lockval = ((size_t *)&mpi_ptr[log->subtype][log->otherrank * MESSAGE_SIZE])[0];
//message("Checking %d/%d at %zd: lockval %zd", log->rank, log->subtype,
// log->otherrank * MESSAGE_SIZE, lockval);
if (lockval == UNLOCKED) ready = 1;
/* Increment etc. of statistics about time spent waiting. */
ticks dt = getticks() - tics;
@@ -205,11 +246,14 @@ static void *recv_thread(void *arg) {
if (dt < lmint) lmint = dt;
if (dt > lmaxt) lmaxt = dt;
if (arrived) {
if (ready) {
message("We have a ready message %d/%d at %zd: lockval %zd", log->rank, log->subtype,
log->otherrank * MESSAGE_SIZE, lockval);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(log->size, log->data)) {
error("Data mismatch on completion");
if (datacheck && !datacheck_test(log->size, &mpi_ptr[log->subtype][log->otherrank * MESSAGE_SIZE])) {
message("Data mismatch on completion");
}
/* Done, clean up. */
@@ -217,7 +261,16 @@ static void *recv_thread(void *arg) {
log->endtic = getticks();
free(log->data);
atomic_dec(&todo_recv);
/* Ready for next message. */
((size_t *)&mpi_ptr[log->subtype][log->otherrank * MESSAGE_SIZE])[0] = LOCKED;
}
/* Need to allow for some MPI progession. Since we make no
* MPI calls. Should not be needed if using a progression thread? */
int flag = 0;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
&flag, MPI_STATUS_IGNORE);
}
}
}
@@ -363,11 +416,14 @@ int main(int argc, char *argv[]) {
/* Extract the send and recv messages for our rank. */
size_t maxsize = pick_logs();
/* Size of a message board. */
MESSAGE_SIZE = maxsize + HEADER_SIZE;
/* Now for the one-sided setup... We need a buffer with space largest
* message. */
* message, plus one of these per rank. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
MPI_Win_allocate(maxsize + sizeof(size_t), sizeof(int), MPI_INFO_NULL,
MPI_Win_allocate(MESSAGE_SIZE * nr_ranks, sizeof(int), MPI_INFO_NULL,
subtypeMPI_comms[i], &mpi_ptr[i], &mpi_window[i]);
/* Assert a shared lock with all the other processes on this window. */
Loading