Skip to content
Snippets Groups Projects
Commit 38d2a38b authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Recast to work in blocks, really sizeof(size_t), alignment is a requirement

parent b050d77f
No related branches found
No related tags found
2 merge requests!11Draft: Fast one-sided MPI version,!8Draft: RDMA version with wrapped infinity calls
...@@ -51,12 +51,17 @@ static int nr_ranks; ...@@ -51,12 +51,17 @@ static int nr_ranks;
static int LOCKED = -2; static int LOCKED = -2;
static int UNLOCKED = -3; static int UNLOCKED = -3;
/* Size of message header. The flag, size and tag. */ /* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as
//static size_t HEADER_SIZE = sizeof(size_t) * 3; * as we need to align in memory. */
static size_t HEADER_SIZE = 1; #define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Size of a message board, we have one of these per rank per communicator /* Size of message header in blocks. The flag, size and tag. */
* (i.e. per window). */ static size_t HEADER_SIZE = 3;
/* Size of a message board in blocks, we have one of these per rank per
* communicator (i.e. per window). */
static size_t MESSAGE_SIZE = 0; static size_t MESSAGE_SIZE = 0;
/* Are we verbose. */ /* Are we verbose. */
...@@ -75,7 +80,7 @@ static MPI_Comm subtypeMPI_comms[task_subtype_count]; ...@@ -75,7 +80,7 @@ static MPI_Comm subtypeMPI_comms[task_subtype_count];
/* And the windows for one-sided communications. */ /* And the windows for one-sided communications. */
static MPI_Win mpi_window[task_subtype_count]; static MPI_Win mpi_window[task_subtype_count];
static char *mpi_ptr[task_subtype_count]; static BLOCKTYPE *mpi_ptr[task_subtype_count];
/* Size of a board for a rank. */ /* Size of a board for a rank. */
static size_t board_size = 0; static size_t board_size = 0;
...@@ -90,6 +95,28 @@ static struct mpiuse_log_entry **volatile recv_queue; ...@@ -90,6 +95,28 @@ static struct mpiuse_log_entry **volatile recv_queue;
static int volatile nr_recv = 0; static int volatile nr_recv = 0;
static int volatile todo_recv = 0; static int volatile todo_recv = 0;
/**
* @brief Convert a byte count into a number of blocks, rounds up.
*
* @param nr_bytes the number of bytes.
*
* @result the number of blocks needed.
*/
static int toblocks(BLOCKTYPE nr_bytes) {
return (nr_bytes * (BYTESINBLOCK - 1)) / (BYTESINBLOCK * BYTESINBLOCK);
}
/**
* @brief Convert a block count into a number of bytes.
*
* @param nr_block the number of blocks.
*
* @result the number of bytes.
*/
static BLOCKTYPE tobytes(int nr_blocks) {
return (nr_blocks * BYTESINBLOCK);
}
/** /**
* @brief fill a data area with a pattern that can be checked for changes. * @brief fill a data area with a pattern that can be checked for changes.
* *
...@@ -133,42 +160,43 @@ static void *send_thread(void *arg) { ...@@ -133,42 +160,43 @@ static void *send_thread(void *arg) {
struct mpiuse_log_entry *log = send_queue[k]; struct mpiuse_log_entry *log = send_queue[k];
/* Data has the actual data and room for the header. */ /* Data has the actual data and room for the header. */
log->data = calloc(HEADER_SIZE + log->size, 1); BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
//size_t *dataptr = (size_t *)log->data; BLOCKTYPE *dataptr = calloc(datasize, BYTESINBLOCK);
log->data = dataptr;
/* Fill data with pattern. */ /* Fill data with pattern. */
if (datacheck) datacheck_fill(HEADER_SIZE + log->size, log->data); if (datacheck) datacheck_fill(datasize, dataptr);
/* First element is marked as LOCKED, so only we can update. */ /* First element is marked as LOCKED, so only we can update. */
((char *)log->data)[0] = LOCKED; dataptr[0] = LOCKED;
//dataptr[1] = log->size; dataptr[1] = log->size;
//dataptr[2] = log->tag; dataptr[2] = log->tag;
/* And send data to other rank. */ /* And send data to other rank. */
MPI_Accumulate(log->data, HEADER_SIZE + log->size, MPI_BYTE, log->otherrank, MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
MESSAGE_SIZE * myrank, HEADER_SIZE + log->size, MPI_BYTE, MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE,
MPI_REPLACE, mpi_window[log->subtype]); MPI_REPLACE, mpi_window[log->subtype]);
/* Now we change the last element to UNLOCKED so that the remote end can /* Now we change the last element to UNLOCKED so that the remote end can
* find out that the data has arrived. */ * find out that the data has arrived. */
char newval[1]; BLOCKTYPE newval[1];
char oldval[1]; BLOCKTYPE oldval[1];
newval[0] = UNLOCKED; newval[0] = UNLOCKED;
oldval[0] = 0; oldval[0] = 0;
MPI_Compare_and_swap(&newval[0], log->data, &oldval[0], MPI_BYTE, MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_BLOCKTYPE,
log->otherrank, myrank * MESSAGE_SIZE, log->otherrank, MESSAGE_SIZE * myrank,
mpi_window[log->subtype]); mpi_window[log->subtype]);
// MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); // MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
MPI_Win_flush_all(mpi_window[log->subtype]); MPI_Win_flush_all(mpi_window[log->subtype]);
if (oldval[0] == ((char *)log->data)[0]) { if (oldval[0] == dataptr[0]) {
message("sent a message to %d/%d (%d:%d:%d @ %zd)", log->otherrank, message("sent a message to %d/%d (%zd:%zd:%zd @ %zd)", log->otherrank,
log->subtype, ((char *)log->data)[0], oldval[0], newval[0], log->subtype, dataptr[0], oldval[0], newval[0],
MESSAGE_SIZE * myrank); MESSAGE_SIZE * myrank);
} else { } else {
message("failed to send a message to %d/%d (%d:%d:%d) @ %zd", log->otherrank, message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd",
log->subtype, ((char *)log->data)[0], oldval[0], newval[0], log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
MESSAGE_SIZE * myrank); MESSAGE_SIZE * myrank);
} }
...@@ -180,8 +208,9 @@ static void *send_thread(void *arg) { ...@@ -180,8 +208,9 @@ static void *send_thread(void *arg) {
MPI_Win_flush_all(mpi_window[log->subtype]); MPI_Win_flush_all(mpi_window[log->subtype]);
MPI_Request request; MPI_Request request;
MPI_Rget(&newval[0], 1, MPI_BYTE, log->otherrank, myrank * MESSAGE_SIZE, MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank,
1, MPI_BYTE, mpi_window[log->subtype], &request); MESSAGE_SIZE * myrank, 1, MPI_BLOCKTYPE,
mpi_window[log->subtype], &request);
int flag = 0; int flag = 0;
while (flag == 0) { while (flag == 0) {
...@@ -222,23 +251,27 @@ static void *recv_thread(void *arg) { ...@@ -222,23 +251,27 @@ static void *recv_thread(void *arg) {
* to receive. */ * to receive. */
while (todo_recv > 0) { while (todo_recv > 0) {
for (int n = 0; n < nr_ranks; n++) { for (int n = 0; n < nr_ranks; n++) {
if (todo_recv <= 0) break;
if (n == myrank) continue; if (n == myrank) continue;
for (int j = 0; j < task_subtype_count; j++) { for (int j = 0; j < task_subtype_count; j++) {
if (todo_recv <= 0) break;
MPI_Win_flush_all(mpi_window[j]); // XXX emergency measure MPI_Win_flush_all(mpi_window[j]); // XXX emergency measure
char lockval = mpi_ptr[j][n * MESSAGE_SIZE]; BLOCKTYPE lockval = mpi_ptr[j][n * MESSAGE_SIZE];
message("lockval check %d/%d at %zd: lockval %d", n, j, message("lockval check %d/%d at %zd: lockval %zd", n, j,
n * MESSAGE_SIZE, lockval); n * MESSAGE_SIZE, lockval);
for (int kk = n * MESSAGE_SIZE; kk < (n+1) * MESSAGE_SIZE; kk++) { for (BLOCKTYPE kk = n * MESSAGE_SIZE; kk < (n+1) * MESSAGE_SIZE; kk++) {
if (mpi_ptr[j][kk] != 0) { if (mpi_ptr[j][kk] != 0) {
message("non-zero %d at %d", mpi_ptr[j][kk], kk); message("non-zero %zd at %zd", mpi_ptr[j][kk], kk);
} }
} }
if (lockval == UNLOCKED) { if (lockval == UNLOCKED) {
message("unlock message %d/%d at %zd: lockval %d", n, j, message("unlock message %d/%d at %zd: lockval %zd, possibles: %d", n, j,
n * MESSAGE_SIZE, lockval); n * MESSAGE_SIZE, lockval, todo_recv);
/* We have a message waiting to be handled, find the log. */ /* We have a message waiting to be handled, find the log. */
int found = 0; int found = 0;
...@@ -248,14 +281,13 @@ static void *recv_thread(void *arg) { ...@@ -248,14 +281,13 @@ static void *recv_thread(void *arg) {
log->subtype == j) { log->subtype == j) {
found = 1; found = 1;
message("We have a ready message %d/%d at %zd: lockval %d", message("We have a ready message %d/%d at %zd: lockval %zd",
log->rank, log->subtype, log->otherrank * MESSAGE_SIZE, log->rank, log->subtype, log->otherrank * MESSAGE_SIZE,
lockval); lockval);
/* Check data sent data is unchanged and received data is as /* Check data sent data is unchanged and received data is as
* expected. */ * expected. */
if (datacheck && if (datacheck && !datacheck_test(log->size, &mpi_ptr[j][n * MESSAGE_SIZE])) {
!datacheck_test(log->size,&mpi_ptr[j][n * MESSAGE_SIZE])) {
message("Data mismatch on completion"); message("Data mismatch on completion");
} }
...@@ -265,9 +297,11 @@ static void *recv_thread(void *arg) { ...@@ -265,9 +297,11 @@ static void *recv_thread(void *arg) {
atomic_dec(&todo_recv); atomic_dec(&todo_recv);
/* Now ready for next message. */ /* Now ready for next message. */
((char *)&mpi_ptr[j][n * MESSAGE_SIZE])[0] = LOCKED; mpi_ptr[j][n * MESSAGE_SIZE] = LOCKED;
break; break;
} else {
message("%d miss: %d/%d/%d", k, log->otherrank, log->subtype, log->done);
} }
} }
if (!found) { if (!found) {
...@@ -324,18 +358,14 @@ static size_t pick_logs() { ...@@ -324,18 +358,14 @@ static size_t pick_logs() {
struct mpiuse_log_entry *log = mpiuse_get_log(k); struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->activation) { if (log->activation) {
if (log->rank == myrank) { if (log->rank == myrank) {
log->done = 0;
log->data = NULL;
if (log->type == task_type_send) { if (log->type == task_type_send) {
log->data = NULL;
send_queue[nr_send] = log; send_queue[nr_send] = log;
nr_send++; nr_send++;
} else if (log->type == task_type_recv) { } else if (log->type == task_type_recv) {
log->data = NULL;
recv_queue[nr_recv] = log; recv_queue[nr_recv] = log;
nr_recv++; nr_recv++;
} else { } else {
error("task type '%d' is not a known send or recv task", log->type); error("task type '%d' is not a known send or recv task", log->type);
} }
...@@ -419,15 +449,16 @@ int main(int argc, char *argv[]) { ...@@ -419,15 +449,16 @@ int main(int argc, char *argv[]) {
/* Extract the send and recv messages for our rank. */ /* Extract the send and recv messages for our rank. */
size_t maxsize = pick_logs(); size_t maxsize = pick_logs();
/* Size of a message board. */ /* Size of a message board. Needs to align on size_t. */
MESSAGE_SIZE = maxsize + HEADER_SIZE; MESSAGE_SIZE = toblocks(maxsize) + HEADER_SIZE;
/* Now for the one-sided setup... We need a buffer with space largest /* Now for the one-sided setup... We need a buffer with space largest
* message, plus one of these per rank. */ * message, plus one of these per rank. */
for (int i = 0; i < task_subtype_count; i++) { for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
MPI_Win_allocate(MESSAGE_SIZE * nr_ranks, sizeof(int), MPI_INFO_NULL, MPI_Win_allocate(tobytes(MESSAGE_SIZE * nr_ranks), BYTESINBLOCK,
subtypeMPI_comms[i], &mpi_ptr[i], &mpi_window[i]); MPI_INFO_NULL, subtypeMPI_comms[i], &mpi_ptr[i],
&mpi_window[i]);
/* Assert a shared lock with all the other processes on this window. */ /* Assert a shared lock with all the other processes on this window. */
MPI_Win_lock_all(0, mpi_window[i]); MPI_Win_lock_all(0, mpi_window[i]);
...@@ -462,6 +493,7 @@ int main(int argc, char *argv[]) { ...@@ -462,6 +493,7 @@ int main(int argc, char *argv[]) {
/* Free the window locks. */ /* Free the window locks. */
for (int i = 0; i < task_subtype_count; i++) { for (int i = 0; i < task_subtype_count; i++) {
MPI_Win_unlock_all(mpi_window[i]); MPI_Win_unlock_all(mpi_window[i]);
MPI_Win_free(&mpi_window[i]);
} }
/* Dump the updated MPI logs. */ /* Dump the updated MPI logs. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment