Skip to content
Snippets Groups Projects

Draft: Fast one-sided MPI version

Open Peter W. Draper requested to merge asyncreallyonesided-fast into master
1 file
+ 40
31
Compare changes
  • Side-by-side
  • Inline
+ 40
31
@@ -55,10 +55,10 @@ static int UNLOCKED = -3;
@@ -55,10 +55,10 @@ static int UNLOCKED = -3;
* as we need to align in memory. */
* as we need to align in memory. */
#define BLOCKTYPE size_t
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
#define MPI_BLOCKTYPE MPI_AINT
static int BYTESINBLOCK = sizeof(BLOCKTYPE);
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
/* Size of message header in blocks. The flag, size and tag. */
/* Size of message header in blocks. The flag, size and tag. */
static size_t HEADER_SIZE = 3;
static const size_t HEADER_SIZE = 3;
/* Size of a message board in blocks, we have one of these per rank per
/* Size of a message board in blocks, we have one of these per rank per
* communicator (i.e. per window). */
* communicator (i.e. per window). */
@@ -100,7 +100,7 @@ static int volatile todo_recv = 0;
@@ -100,7 +100,7 @@ static int volatile todo_recv = 0;
* @result the number of blocks needed.
* @result the number of blocks needed.
*/
*/
static int toblocks(BLOCKTYPE nr_bytes) {
static int toblocks(BLOCKTYPE nr_bytes) {
return (nr_bytes * (BYTESINBLOCK - 1)) / (BYTESINBLOCK * BYTESINBLOCK);
return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
}
}
/**
/**
@@ -110,35 +110,35 @@ static int toblocks(BLOCKTYPE nr_bytes) {
@@ -110,35 +110,35 @@ static int toblocks(BLOCKTYPE nr_bytes) {
*
*
* @result the number of bytes.
* @result the number of bytes.
*/
*/
static BLOCKTYPE tobytes(int nr_blocks) {
static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); }
return (nr_blocks * BYTESINBLOCK);
}
/**
/**
* @brief fill a data area with a pattern that can be checked for changes.
* @brief fill a data area with our rank.
*
*
* @param size size of data in bytes.
* @param size size of data in bytes.
* @param data the data to fill.
* @param data the data to fill.
*/
*/
static void datacheck_fill(size_t size, void *data) {
static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
unsigned char *p = (unsigned char *)data;
for (BLOCKTYPE i = 0; i < size; i++) {
for (size_t i = 0; i < size; i++) {
data[i] = myrank;
p[i] = 170; /* 10101010 in bits. */
}
}
}
}
/**
/**
* @brief test a filled data area for our pattern.
* @brief test a filled data area for a value.
*
*
* @param size size of data in bytes.
* @param size size of data in bytes.
* @param data the data to fill.
* @param data the data to check.
 
* @param rank the value to, i.e. original rank.
*
*
* @result 1 on success, 0 otherwise.
* @result 1 on success, 0 otherwise.
*/
*/
static int datacheck_test(size_t size, void *data) {
static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) {
unsigned char *p = (unsigned char *)data;
for (size_t i = 0; i < size; i++) {
for (size_t i = 0; i < size; i++) {
if (p[i] != 170) return 0;
if (data[i] != rank) {
 
message("see %zd expected %d @ %zd", data[i], rank, i);
 
return 0;
 
}
}
}
return 1;
return 1;
}
}
@@ -162,7 +162,8 @@ static void *send_thread(void *arg) {
@@ -162,7 +162,8 @@ static void *send_thread(void *arg) {
log->data = dataptr;
log->data = dataptr;
/* Fill data with pattern. */
/* Fill data with pattern. */
if (datacheck) datacheck_fill(datasize, dataptr);
if (datacheck) datacheck_fill(toblocks(log->size),
 
&dataptr[HEADER_SIZE]);
/* First element is marked as LOCKED, so only we can update. */
/* First element is marked as LOCKED, so only we can update. */
dataptr[0] = LOCKED;
dataptr[0] = LOCKED;
@@ -170,10 +171,12 @@ static void *send_thread(void *arg) {
@@ -170,10 +171,12 @@ static void *send_thread(void *arg) {
dataptr[2] = log->tag;
dataptr[2] = log->tag;
/* And send data to other rank. */
/* And send data to other rank. */
MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE,
MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE,
MPI_REPLACE, mpi_window[log->subtype]);
MPI_REPLACE, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS)
 
error("Failed to accumulate data: %d", ret);
 
/* Now we change the last element to UNLOCKED so that the remote end can
/* Now we change the last element to UNLOCKED so that the remote end can
* find out that the data has arrived. */
* find out that the data has arrived. */
BLOCKTYPE newval[1];
BLOCKTYPE newval[1];
@@ -186,7 +189,7 @@ static void *send_thread(void *arg) {
@@ -186,7 +189,7 @@ static void *send_thread(void *arg) {
MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
//if (oldval[0] == dataptr[0]) {
// if (oldval[0] == dataptr[0]) {
// message("sent a message to %d/%d (%zd:%zd:%zd @ %zd)", log->otherrank,
// message("sent a message to %d/%d (%zd:%zd:%zd @ %zd)", log->otherrank,
// log->subtype, dataptr[0], oldval[0], newval[0],
// log->subtype, dataptr[0], oldval[0], newval[0],
// MESSAGE_SIZE * myrank);
// MESSAGE_SIZE * myrank);
@@ -201,7 +204,7 @@ static void *send_thread(void *arg) {
@@ -201,7 +204,7 @@ static void *send_thread(void *arg) {
* so we can use MPI_Test to get some local progression. */
* so we can use MPI_Test to get some local progression. */
newval[0] = UNLOCKED;
newval[0] = UNLOCKED;
while (newval[0] != LOCKED) {
while (newval[0] != LOCKED) {
//MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
// MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
MPI_Request request;
MPI_Request request;
MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank,
MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank,
@@ -213,7 +216,8 @@ static void *send_thread(void *arg) {
@@ -213,7 +216,8 @@ static void *send_thread(void *arg) {
}
}
}
}
message("sent and received... %d/%d/%d", k, nr_send, ((char *)log->data)[0]);
//message("sent and received... %d/%d/%d", k, nr_send,
 
// ((char *)log->data)[0]);
/* Ready the next send. */
/* Ready the next send. */
}
}
@@ -251,7 +255,8 @@ static void *recv_thread(void *arg) {
@@ -251,7 +255,8 @@ static void *recv_thread(void *arg) {
if (todo_recv <= 0) break;
if (todo_recv <= 0) break;
//MPI_Win_flush(n, mpi_window[j]); // XXX emergency measure
//MPI_Win_flush(n, mpi_window[j]); // XXX emergency measure
BLOCKTYPE lockval = mpi_ptr[j][n * MESSAGE_SIZE];
BLOCKTYPE *dataptr = &mpi_ptr[j][n * MESSAGE_SIZE];
 
BLOCKTYPE lockval = dataptr[0];
if (lockval == UNLOCKED) {
if (lockval == UNLOCKED) {
@@ -260,16 +265,19 @@ static void *recv_thread(void *arg) {
@@ -260,16 +265,19 @@ static void *recv_thread(void *arg) {
for (int k = 0; k < nr_recv; k++) {
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done && log->otherrank == n &&
if (log != NULL && !log->done && log->otherrank == n &&
log->subtype == j) {
log->subtype == j &&
 
log->size == dataptr[1] && log->tag == dataptr[2]) {
found = 1;
found = 1;
//message("We have a ready message %d/%d at %zd: lockval %zd",
//message("We have a ready message %d/%d at %zd: lockval %zd",
// log->rank, log->subtype, log->otherrank * MESSAGE_SIZE,
// log->rank, log->subtype, n * MESSAGE_SIZE,
// lockval);
// lockval);
/* Check data sent data is unchanged and received data is as
/* Check data sent data is unchanged and received data is as
* expected. */
* expected. */
if (datacheck && !datacheck_test(log->size, &mpi_ptr[j][n * MESSAGE_SIZE])) {
if (datacheck &&
 
!datacheck_test(toblocks(log->size),
 
&dataptr[HEADER_SIZE], n)) {
message("Data mismatch on completion");
message("Data mismatch on completion");
}
}
@@ -279,7 +287,7 @@ static void *recv_thread(void *arg) {
@@ -279,7 +287,7 @@ static void *recv_thread(void *arg) {
atomic_dec(&todo_recv);
atomic_dec(&todo_recv);
/* Now ready for next message. */
/* Now ready for next message. */
mpi_ptr[j][n * MESSAGE_SIZE] = LOCKED;
dataptr[0] = LOCKED;
break;
break;
}
}
@@ -358,8 +366,9 @@ static size_t pick_logs() {
@@ -358,8 +366,9 @@ static size_t pick_logs() {
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
if (verbose) message("maxsize = %zd, nr_send = %d, nr_recv = %d",
if (verbose)
maxsize, nr_send, nr_recv);
message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
 
nr_recv);
return maxsize;
return maxsize;
}
}
Loading