Skip to content
Snippets Groups Projects
Commit 4e2e7b57 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Passes datacheck tests

parent df8dc442
Branches
No related tags found
2 merge requests!11Draft: Fast one-sided MPI version,!8Draft: RDMA version with wrapped infinity calls
This commit is part of merge request !11. Comments created here will be created in the context of that merge request.
...@@ -55,10 +55,10 @@ static int UNLOCKED = -3; ...@@ -55,10 +55,10 @@ static int UNLOCKED = -3;
* as we need to align in memory. */ * as we need to align in memory. */
#define BLOCKTYPE size_t #define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT #define MPI_BLOCKTYPE MPI_AINT
static int BYTESINBLOCK = sizeof(BLOCKTYPE); static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Size of message header in blocks. The flag, size and tag. */ /* Size of message header in blocks. The flag, size and tag. */
static size_t HEADER_SIZE = 3; static const size_t HEADER_SIZE = 3;
/* Size of a message board in blocks, we have one of these per rank per /* Size of a message board in blocks, we have one of these per rank per
* communicator (i.e. per window). */ * communicator (i.e. per window). */
...@@ -100,7 +100,7 @@ static int volatile todo_recv = 0; ...@@ -100,7 +100,7 @@ static int volatile todo_recv = 0;
* @result the number of blocks needed. * @result the number of blocks needed.
*/ */
static int toblocks(BLOCKTYPE nr_bytes) { static int toblocks(BLOCKTYPE nr_bytes) {
return (nr_bytes * (BYTESINBLOCK - 1)) / (BYTESINBLOCK * BYTESINBLOCK); return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
} }
/** /**
...@@ -110,35 +110,35 @@ static int toblocks(BLOCKTYPE nr_bytes) { ...@@ -110,35 +110,35 @@ static int toblocks(BLOCKTYPE nr_bytes) {
* *
* @result the number of bytes. * @result the number of bytes.
*/ */
static BLOCKTYPE tobytes(int nr_blocks) { static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); }
return (nr_blocks * BYTESINBLOCK);
}
/** /**
* @brief fill a data area with a pattern that can be checked for changes. * @brief fill a data area with our rank.
* *
* @param size size of data in bytes. * @param size size of data in bytes.
* @param data the data to fill. * @param data the data to fill.
*/ */
static void datacheck_fill(size_t size, void *data) { static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
unsigned char *p = (unsigned char *)data; for (BLOCKTYPE i = 0; i < size; i++) {
for (size_t i = 0; i < size; i++) { data[i] = myrank;
p[i] = 170; /* 10101010 in bits. */
} }
} }
/** /**
* @brief test a filled data area for our pattern. * @brief test a filled data area for a value.
* *
* @param size size of data in bytes. * @param size size of data in bytes.
* @param data the data to fill. * @param data the data to check.
* @param rank the value to, i.e. original rank.
* *
* @result 1 on success, 0 otherwise. * @result 1 on success, 0 otherwise.
*/ */
static int datacheck_test(size_t size, void *data) { static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) { for (size_t i = 0; i < size; i++) {
if (p[i] != 170) return 0; if (data[i] != rank) {
message("see %zd expected %d @ %zd", data[i], rank, i);
return 0;
}
} }
return 1; return 1;
} }
...@@ -162,7 +162,8 @@ static void *send_thread(void *arg) { ...@@ -162,7 +162,8 @@ static void *send_thread(void *arg) {
log->data = dataptr; log->data = dataptr;
/* Fill data with pattern. */ /* Fill data with pattern. */
if (datacheck) datacheck_fill(datasize, dataptr); if (datacheck) datacheck_fill(toblocks(log->size),
&dataptr[HEADER_SIZE]);
/* First element is marked as LOCKED, so only we can update. */ /* First element is marked as LOCKED, so only we can update. */
dataptr[0] = LOCKED; dataptr[0] = LOCKED;
...@@ -170,10 +171,12 @@ static void *send_thread(void *arg) { ...@@ -170,10 +171,12 @@ static void *send_thread(void *arg) {
dataptr[2] = log->tag; dataptr[2] = log->tag;
/* And send data to other rank. */ /* And send data to other rank. */
MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank, int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE, MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE,
MPI_REPLACE, mpi_window[log->subtype]); MPI_REPLACE, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS)
error("Failed to accumulate data: %d", ret);
/* Now we change the last element to UNLOCKED so that the remote end can /* Now we change the last element to UNLOCKED so that the remote end can
* find out that the data has arrived. */ * find out that the data has arrived. */
BLOCKTYPE newval[1]; BLOCKTYPE newval[1];
...@@ -186,7 +189,7 @@ static void *send_thread(void *arg) { ...@@ -186,7 +189,7 @@ static void *send_thread(void *arg) {
MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
//if (oldval[0] == dataptr[0]) { // if (oldval[0] == dataptr[0]) {
// message("sent a message to %d/%d (%zd:%zd:%zd @ %zd)", log->otherrank, // message("sent a message to %d/%d (%zd:%zd:%zd @ %zd)", log->otherrank,
// log->subtype, dataptr[0], oldval[0], newval[0], // log->subtype, dataptr[0], oldval[0], newval[0],
// MESSAGE_SIZE * myrank); // MESSAGE_SIZE * myrank);
...@@ -201,7 +204,7 @@ static void *send_thread(void *arg) { ...@@ -201,7 +204,7 @@ static void *send_thread(void *arg) {
* so we can use MPI_Test to get some local progression. */ * so we can use MPI_Test to get some local progression. */
newval[0] = UNLOCKED; newval[0] = UNLOCKED;
while (newval[0] != LOCKED) { while (newval[0] != LOCKED) {
//MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); // MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
MPI_Request request; MPI_Request request;
MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank, MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank,
...@@ -213,7 +216,8 @@ static void *send_thread(void *arg) { ...@@ -213,7 +216,8 @@ static void *send_thread(void *arg) {
} }
} }
message("sent and received... %d/%d/%d", k, nr_send, ((char *)log->data)[0]); //message("sent and received... %d/%d/%d", k, nr_send,
// ((char *)log->data)[0]);
/* Ready the next send. */ /* Ready the next send. */
} }
...@@ -251,7 +255,8 @@ static void *recv_thread(void *arg) { ...@@ -251,7 +255,8 @@ static void *recv_thread(void *arg) {
if (todo_recv <= 0) break; if (todo_recv <= 0) break;
//MPI_Win_flush(n, mpi_window[j]); // XXX emergency measure //MPI_Win_flush(n, mpi_window[j]); // XXX emergency measure
BLOCKTYPE lockval = mpi_ptr[j][n * MESSAGE_SIZE]; BLOCKTYPE *dataptr = &mpi_ptr[j][n * MESSAGE_SIZE];
BLOCKTYPE lockval = dataptr[0];
if (lockval == UNLOCKED) { if (lockval == UNLOCKED) {
...@@ -260,16 +265,19 @@ static void *recv_thread(void *arg) { ...@@ -260,16 +265,19 @@ static void *recv_thread(void *arg) {
for (int k = 0; k < nr_recv; k++) { for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k]; struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done && log->otherrank == n && if (log != NULL && !log->done && log->otherrank == n &&
log->subtype == j) { log->subtype == j &&
log->size == dataptr[1] && log->tag == dataptr[2]) {
found = 1; found = 1;
//message("We have a ready message %d/%d at %zd: lockval %zd", //message("We have a ready message %d/%d at %zd: lockval %zd",
// log->rank, log->subtype, log->otherrank * MESSAGE_SIZE, // log->rank, log->subtype, n * MESSAGE_SIZE,
// lockval); // lockval);
/* Check data sent data is unchanged and received data is as /* Check data sent data is unchanged and received data is as
* expected. */ * expected. */
if (datacheck && !datacheck_test(log->size, &mpi_ptr[j][n * MESSAGE_SIZE])) { if (datacheck &&
!datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], n)) {
message("Data mismatch on completion"); message("Data mismatch on completion");
} }
...@@ -279,7 +287,7 @@ static void *recv_thread(void *arg) { ...@@ -279,7 +287,7 @@ static void *recv_thread(void *arg) {
atomic_dec(&todo_recv); atomic_dec(&todo_recv);
/* Now ready for next message. */ /* Now ready for next message. */
mpi_ptr[j][n * MESSAGE_SIZE] = LOCKED; dataptr[0] = LOCKED;
break; break;
} }
...@@ -358,8 +366,9 @@ static size_t pick_logs() { ...@@ -358,8 +366,9 @@ static size_t pick_logs() {
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs); qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs); qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
if (verbose) message("maxsize = %zd, nr_send = %d, nr_recv = %d", if (verbose)
maxsize, nr_send, nr_recv); message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
nr_recv);
return maxsize; return maxsize;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment