Skip to content
Snippets Groups Projects
Commit a757d065 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Some signs of life receive needs work on the logic

parent f860ad09
No related branches found
No related tags found
2 merge requests!11Draft: Fast one-sided MPI version,!8Draft: RDMA version with wrapped infinity calls
......@@ -17,6 +17,17 @@
*
******************************************************************************/
// Simple approach, use the window as a message board, capable of receiving a
// single message per ranks at a time, so needs to be larger than the largest
// message, and we need one per of these per rank.
//
// So we poll all ranks waiting for a push update unlocks its board, we then
// check for the tag and size, which need to match one of the expected
// messages, at which point we copy that away and release the board.
//
// On the send side we work synchronously, sending a message at a time
// waiting for our board to be unlocked by the receiver.
#include <limits.h>
#include <mpi.h>
#include <pthread.h>
......@@ -36,8 +47,16 @@ int myrank = -1;
/* Number of ranks. */
static int nr_ranks;
#define READY -2
#define DONE -10
/* Flags for controlling access. */
static int LOCKED = -2;
static int UNLOCKED = -3;
/* Size of message header. The flag, size and tag. */
static size_t HEADER_SIZE = sizeof(size_t) * 3;
/* Size of a message board, we have one of these per rank per communicator
* (i.e. per window). */
static size_t MESSAGE_SIZE = 0;
/* Are we verbose. */
static int verbose = 0;
......@@ -57,17 +76,8 @@ static MPI_Comm subtypeMPI_comms[task_subtype_count];
static MPI_Win mpi_window[task_subtype_count];
static char *mpi_ptr[task_subtype_count];
// Simple approach, use the window as a message board, capable of receiving a
// single message at a time, so needs to be larger than the largest message.
//
// So we poll for all possible messages, until we see our tag and size
// at which point we copy that away and release the window.
//
// On the send side we work synchronously, sending a message at a time
// (could have one send per rank underway, as a further development).
//
// Synchronization seems two side, with exclusive locks on both sides
// and an atomic flag to control access.
/* Size of a board for a rank. */
static size_t board_size = 0;
/* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue;
......@@ -110,6 +120,8 @@ static int datacheck_test(size_t size, void *data) {
/**
* @brief Send thread, sends messages to other ranks one-by-one.
*
* Messages are all considered in order, regardless of the subtype.
*/
static void *send_thread(void *arg) {
......@@ -118,34 +130,65 @@ static void *send_thread(void *arg) {
for (int k = 0; k < nr_send; k++) {
struct mpiuse_log_entry *log = send_queue[k];
log->data = calloc(log->size + 1, 1);
/* Data has the actual data and room for the header. */
log->data = calloc(HEADER_SIZE + log->size, 1);
size_t *dataptr = (size_t *)log->data;
/* Fill data with pattern. */
if (datacheck) datacheck_fill(log->size, log->data);
if (datacheck) datacheck_fill(HEADER_SIZE + log->size, dataptr);
/* Last char is marked as READY (to receive) */
((char *)log->data)[log->size] = READY;
/* First element is marked as LOCKED, so only we can update. */
dataptr[0] = LOCKED;
dataptr[1] = log->size;
dataptr[2] = log->tag;
/* And send data to other rank. */
MPI_Accumulate(log->data, log->size + 1, MPI_BYTE, log->otherrank, 0,
log->size + 1, MPI_BYTE, MPI_REPLACE,
MPI_Accumulate(log->data, HEADER_SIZE + log->size, MPI_BYTE,
log->otherrank, MESSAGE_SIZE * myrank,
HEADER_SIZE + log->size, MPI_BYTE, MPI_REPLACE,
mpi_window[log->subtype]);
/* Now we change the last element to DONE so that the remote end can
/* Now we change the last element to UNLOCKED so that the remote end can
* find out that the data has arrived. */
char newval[1];
char oldval[1];
newval[0] = DONE;
oldval[0] = DONE;
MPI_Compare_and_swap(&newval[0], &((char *)log->data)[log->size],
&oldval[0], MPI_BYTE, log->otherrank, log->size,
size_t newval[1];
size_t oldval[1];
newval[0] = UNLOCKED;
oldval[0] = 0;
MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_AINT,
log->otherrank, myrank * MESSAGE_SIZE,
mpi_window[log->subtype]);
//MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
MPI_Win_flush_all(mpi_window[log->subtype]);
// XXX need to make sure the data area is free to overwrite once more XXX
// flip the remote atomic.
if (oldval[0] == dataptr[0]) {
message("sent a message to %d/%d (%zd:%zd:%zd)", log->otherrank,
log->subtype, dataptr[0], oldval[0], newval[0]);
} else {
message("failed to send a message to %d/%d (%zd:%zd:%zd)", log->otherrank,
log->subtype, dataptr[0], oldval[0], newval[0]);
}
/* Wait for completion, this is when remote flips back to LOCKED. We poll
* on a get, as the local window is only used for receiving. Use an Rget
* so we can use MPI_Test to get some local progression. */
while (dataptr[0] == LOCKED) {
MPI_Request request;
MPI_Rget(dataptr, 1, MPI_AINT, log->otherrank, 0, 1, MPI_AINT,
mpi_window[log->subtype], &request);
MPI_Win_flush_all(mpi_window[log->subtype]);
int flag = 0;
while (flag == 0) {
MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
}
message("Waiting for unlock (%zd)", dataptr[0]);
}
message("sent and received... %d/%d", k, nr_send);
/* Ready the next send. */
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
......@@ -180,17 +223,15 @@ static void *recv_thread(void *arg) {
if (log != NULL && !log->done) {
ticks tics = getticks();
int arrived = 0;
//message("Checking at %zd", log->size);
if (mpi_ptr[log->subtype][log->size] == DONE) arrived = 1;
// Need to allow for some MPI progession... Since we make no
// MPI calls. Should not be needed if using a progression thread?
int flag;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
&flag, MPI_STATUS_IGNORE);
MPI_Win_flush_all(mpi_window[log->subtype]); // XXX emergency measure
// XXX match this to the expected receive...
/* See if this message is ready (should really check for messages
* and match to the queue, so other way around). */
int ready = 0;
size_t lockval = ((size_t *)&mpi_ptr[log->subtype][log->otherrank * MESSAGE_SIZE])[0];
//message("Checking %d/%d at %zd: lockval %zd", log->rank, log->subtype,
// log->otherrank * MESSAGE_SIZE, lockval);
if (lockval == UNLOCKED) ready = 1;
/* Increment etc. of statistics about time spent waiting. */
ticks dt = getticks() - tics;
......@@ -205,11 +246,14 @@ static void *recv_thread(void *arg) {
if (dt < lmint) lmint = dt;
if (dt > lmaxt) lmaxt = dt;
if (arrived) {
if (ready) {
message("We have a ready message %d/%d at %zd: lockval %zd", log->rank, log->subtype,
log->otherrank * MESSAGE_SIZE, lockval);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(log->size, log->data)) {
error("Data mismatch on completion");
if (datacheck && !datacheck_test(log->size, &mpi_ptr[log->subtype][log->otherrank * MESSAGE_SIZE])) {
message("Data mismatch on completion");
}
/* Done, clean up. */
......@@ -217,7 +261,16 @@ static void *recv_thread(void *arg) {
log->endtic = getticks();
free(log->data);
atomic_dec(&todo_recv);
/* Ready for next message. */
((size_t *)&mpi_ptr[log->subtype][log->otherrank * MESSAGE_SIZE])[0] = LOCKED;
}
/* Need to allow for some MPI progession. Since we make no
* MPI calls. Should not be needed if using a progression thread? */
int flag = 0;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
&flag, MPI_STATUS_IGNORE);
}
}
}
......@@ -363,11 +416,14 @@ int main(int argc, char *argv[]) {
/* Extract the send and recv messages for our rank. */
size_t maxsize = pick_logs();
/* Size of a message board. */
MESSAGE_SIZE = maxsize + HEADER_SIZE;
/* Now for the one-sided setup... We need a buffer with space largest
* message. */
* message, plus one of these per rank. */
for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
MPI_Win_allocate(maxsize + sizeof(size_t), sizeof(int), MPI_INFO_NULL,
MPI_Win_allocate(MESSAGE_SIZE * nr_ranks, sizeof(int), MPI_INFO_NULL,
subtypeMPI_comms[i], &mpi_ptr[i], &mpi_window[i]);
/* Assert a shared lock with all the other processes on this window. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment