diff --git a/swiftmpirdmastepsim.c b/swiftmpirdmastepsim.c index c81a1e47da84c0232f3acf1b46afd35f7a32c0e5..bd2c1794bc80e20bb99e383868487670a32df679 100644 --- a/swiftmpirdmastepsim.c +++ b/swiftmpirdmastepsim.c @@ -17,6 +17,17 @@ * ******************************************************************************/ +// Simple approach, use the window as a message board, capable of receiving a +// single message per ranks at a time, so needs to be larger than the largest +// message, and we need one per of these per rank. +// +// So we poll all ranks waiting for a push update unlocks its board, we then +// check for the tag and size, which need to match one of the expected +// messages, at which point we copy that away and release the board. +// +// On the send side we work synchronously, sending a message at a time +// waiting for our board to be unlocked by the receiver. + #include <limits.h> #include <mpi.h> #include <pthread.h> @@ -36,8 +47,16 @@ int myrank = -1; /* Number of ranks. */ static int nr_ranks; -#define READY -2 -#define DONE -10 +/* Flags for controlling access. */ +static int LOCKED = -2; +static int UNLOCKED = -3; + +/* Size of message header. The flag, size and tag. */ +static size_t HEADER_SIZE = sizeof(size_t) * 3; + +/* Size of a message board, we have one of these per rank per communicator + * (i.e. per window). */ +static size_t MESSAGE_SIZE = 0; /* Are we verbose. */ static int verbose = 0; @@ -57,17 +76,8 @@ static MPI_Comm subtypeMPI_comms[task_subtype_count]; static MPI_Win mpi_window[task_subtype_count]; static char *mpi_ptr[task_subtype_count]; -// Simple approach, use the window as a message board, capable of receiving a -// single message at a time, so needs to be larger than the largest message. -// -// So we poll for all possible messages, until we see our tag and size -// at which point we copy that away and release the window. -// -// On the send side we work synchronously, sending a message at a time -// (could have one send per rank underway, as a further development). -// -// Synchronization seems two side, with exclusive locks on both sides -// and an atomic flag to control access. +/* Size of a board for a rank. */ +static size_t board_size = 0; /* The local send queue. */ static struct mpiuse_log_entry **volatile send_queue; @@ -110,6 +120,8 @@ static int datacheck_test(size_t size, void *data) { /** * @brief Send thread, sends messages to other ranks one-by-one. + * + * Messages are all considered in order, regardless of the subtype. */ static void *send_thread(void *arg) { @@ -118,34 +130,65 @@ static void *send_thread(void *arg) { for (int k = 0; k < nr_send; k++) { struct mpiuse_log_entry *log = send_queue[k]; - log->data = calloc(log->size + 1, 1); + + /* Data has the actual data and room for the header. */ + log->data = calloc(HEADER_SIZE + log->size, 1); + size_t *dataptr = (size_t *)log->data; /* Fill data with pattern. */ - if (datacheck) datacheck_fill(log->size, log->data); + if (datacheck) datacheck_fill(HEADER_SIZE + log->size, dataptr); - /* Last char is marked as READY (to receive) */ - ((char *)log->data)[log->size] = READY; + /* First element is marked as LOCKED, so only we can update. */ + dataptr[0] = LOCKED; + dataptr[1] = log->size; + dataptr[2] = log->tag; /* And send data to other rank. */ - MPI_Accumulate(log->data, log->size + 1, MPI_BYTE, log->otherrank, 0, - log->size + 1, MPI_BYTE, MPI_REPLACE, + MPI_Accumulate(log->data, HEADER_SIZE + log->size, MPI_BYTE, + log->otherrank, MESSAGE_SIZE * myrank, + HEADER_SIZE + log->size, MPI_BYTE, MPI_REPLACE, mpi_window[log->subtype]); - /* Now we change the last element to DONE so that the remote end can + /* Now we change the last element to UNLOCKED so that the remote end can * find out that the data has arrived. */ - char newval[1]; - char oldval[1]; - newval[0] = DONE; - oldval[0] = DONE; - MPI_Compare_and_swap(&newval[0], &((char *)log->data)[log->size], - &oldval[0], MPI_BYTE, log->otherrank, log->size, + size_t newval[1]; + size_t oldval[1]; + newval[0] = UNLOCKED; + oldval[0] = 0; + MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_AINT, + log->otherrank, myrank * MESSAGE_SIZE, mpi_window[log->subtype]); + //MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); MPI_Win_flush_all(mpi_window[log->subtype]); - // XXX need to make sure the data area is free to overwrite once more XXX - // flip the remote atomic. + if (oldval[0] == dataptr[0]) { + message("sent a message to %d/%d (%zd:%zd:%zd)", log->otherrank, + log->subtype, dataptr[0], oldval[0], newval[0]); + } else { + message("failed to send a message to %d/%d (%zd:%zd:%zd)", log->otherrank, + log->subtype, dataptr[0], oldval[0], newval[0]); + } + + /* Wait for completion, this is when remote flips back to LOCKED. We poll + * on a get, as the local window is only used for receiving. Use an Rget + * so we can use MPI_Test to get some local progression. */ + while (dataptr[0] == LOCKED) { + MPI_Request request; + MPI_Rget(dataptr, 1, MPI_AINT, log->otherrank, 0, 1, MPI_AINT, + mpi_window[log->subtype], &request); + MPI_Win_flush_all(mpi_window[log->subtype]); + + int flag = 0; + while (flag == 0) { + MPI_Test(&request, &flag, MPI_STATUS_IGNORE); + } + message("Waiting for unlock (%zd)", dataptr[0]); + } + + message("sent and received... %d/%d", k, nr_send); + /* Ready the next send. */ } message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), @@ -180,17 +223,15 @@ static void *recv_thread(void *arg) { if (log != NULL && !log->done) { ticks tics = getticks(); - int arrived = 0; - //message("Checking at %zd", log->size); - if (mpi_ptr[log->subtype][log->size] == DONE) arrived = 1; - - // Need to allow for some MPI progession... Since we make no - // MPI calls. Should not be needed if using a progression thread? - int flag; - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, - &flag, MPI_STATUS_IGNORE); + MPI_Win_flush_all(mpi_window[log->subtype]); // XXX emergency measure - // XXX match this to the expected receive... + /* See if this message is ready (should really check for messages + * and match to the queue, so other way around). */ + int ready = 0; + size_t lockval = ((size_t *)&mpi_ptr[log->subtype][log->otherrank * MESSAGE_SIZE])[0]; + //message("Checking %d/%d at %zd: lockval %zd", log->rank, log->subtype, + // log->otherrank * MESSAGE_SIZE, lockval); + if (lockval == UNLOCKED) ready = 1; /* Increment etc. of statistics about time spent waiting. */ ticks dt = getticks() - tics; @@ -205,11 +246,14 @@ static void *recv_thread(void *arg) { if (dt < lmint) lmint = dt; if (dt > lmaxt) lmaxt = dt; - if (arrived) { + if (ready) { + message("We have a ready message %d/%d at %zd: lockval %zd", log->rank, log->subtype, + log->otherrank * MESSAGE_SIZE, lockval); + /* Check data sent data is unchanged and received data is as * expected. */ - if (datacheck && !datacheck_test(log->size, log->data)) { - error("Data mismatch on completion"); + if (datacheck && !datacheck_test(log->size, &mpi_ptr[log->subtype][log->otherrank * MESSAGE_SIZE])) { + message("Data mismatch on completion"); } /* Done, clean up. */ @@ -217,7 +261,16 @@ static void *recv_thread(void *arg) { log->endtic = getticks(); free(log->data); atomic_dec(&todo_recv); + + /* Ready for next message. */ + ((size_t *)&mpi_ptr[log->subtype][log->otherrank * MESSAGE_SIZE])[0] = LOCKED; } + + /* Need to allow for some MPI progession. Since we make no + * MPI calls. Should not be needed if using a progression thread? */ + int flag = 0; + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, + &flag, MPI_STATUS_IGNORE); } } } @@ -363,11 +416,14 @@ int main(int argc, char *argv[]) { /* Extract the send and recv messages for our rank. */ size_t maxsize = pick_logs(); + /* Size of a message board. */ + MESSAGE_SIZE = maxsize + HEADER_SIZE; + /* Now for the one-sided setup... We need a buffer with space largest - * message. */ + * message, plus one of these per rank. */ for (int i = 0; i < task_subtype_count; i++) { MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); - MPI_Win_allocate(maxsize + sizeof(size_t), sizeof(int), MPI_INFO_NULL, + MPI_Win_allocate(MESSAGE_SIZE * nr_ranks, sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i], &mpi_ptr[i], &mpi_window[i]); /* Assert a shared lock with all the other processes on this window. */