Skip to content
Snippets Groups Projects
Commit 9c25a2e1 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Only poll for active ranks and subtypes

Doesn't gain much
parent edf07c53
No related branches found
No related tags found
2 merge requests!11Draft: Fast one-sided MPI version,!8Draft: RDMA version with wrapped infinity calls
......@@ -92,6 +92,12 @@ static struct mpiuse_log_entry **volatile recv_queue;
static int volatile nr_recv = 0;
static int volatile todo_recv = 0;
/* Flags of active ranks and subtypes. */
static int *volatile active_ranks = NULL;
static int volatile nr_active_ranks = 0;
static int *volatile active_subtypes = NULL;
static int volatile nr_active_subtypes = 0;
/**
* @brief Convert a byte count into a number of blocks, rounds up.
*
......@@ -207,7 +213,6 @@ static void *send_thread(void *arg) {
* on a get, as the local window is only used for receiving. Use an Rget
* so we can use MPI_Test to get some local progression. */
newval[0] = UNLOCKED;
size_t counter = 0;
while (newval[0] != LOCKED) {
MPI_Request request;
......@@ -218,21 +223,13 @@ static void *send_thread(void *arg) {
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Rget failed");
/* After the rget to make sure we get a chance at completion. */
ret =
MPI_Win_flush(log->otherrank, mpi_window[log->subtype]); // emergency
ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");
int flag = 0;
while (flag == 0) {
ret = MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Test failed");
counter++;
if (counter > 1000000) {
message("message sent to %d/%d %zd %d/%d stuck in test",
log->otherrank, log->subtype, newval[0], k, nr_send);
counter = 0;
}
}
}
......@@ -253,8 +250,8 @@ static void *send_thread(void *arg) {
static void *recv_thread(void *arg) {
message(
"%d: recv thread starts, checking for %d messages %d ranks "
" %d communicators",
"%d: recv thread starts, checking for %d messages %d "
"ranks %d communicators",
*((int *)arg), nr_recv, nr_ranks, task_subtype_count);
ticks starttics = getticks();
......@@ -270,34 +267,37 @@ static void *recv_thread(void *arg) {
/* We loop while new requests are being send and we still have messages
* to receive. */
while (todo_recv > 0) {
for (int n = 0; n < nr_ranks; n++) {
for (int n = 0; n < nr_active_ranks; n++) {
if (todo_recv <= 0) break;
if (n == myrank) continue;
for (int j = 0; j < task_subtype_count; j++) {
int rank = active_ranks[n];
if (rank == myrank) continue;
for (int j = 0; j < nr_active_subtypes; j++) {
if (todo_recv <= 0) break;
int subtype = active_subtypes[j];
BLOCKTYPE *dataptr = &mpi_ptr[j][n * MESSAGE_SIZE];
BLOCKTYPE *dataptr = &mpi_ptr[subtype][rank * MESSAGE_SIZE];
if (dataptr[0] == UNLOCKED) {
/* We have a message waiting to be handled, find the log. */
int found = 0;
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done && log->otherrank == n &&
log->subtype == j && log->size == dataptr[1] &&
if (log != NULL && !log->done && log->otherrank == rank &&
log->subtype == subtype && log->size == dataptr[1] &&
log->tag == dataptr[2]) {
found = 1;
if (verbose)
message("receive message %d/%d from %d @ %zd: dataptr[0] %zd",
log->rank, log->subtype, n, n * MESSAGE_SIZE,
log->rank, log->subtype, rank, rank * MESSAGE_SIZE,
dataptr[0]);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], n)) {
&dataptr[HEADER_SIZE], rank)) {
message("Data mismatch on completion");
}
......@@ -363,6 +363,12 @@ static size_t pick_logs() {
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recv = 0;
/* Flags for active elements. */
active_ranks = (int *)calloc(nr_ranks, sizeof(int));
nr_active_ranks = 0;
active_subtypes = (int *)calloc(task_subtype_count, sizeof(int));
nr_active_subtypes = 0;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
if (log->activation) {
......@@ -378,19 +384,47 @@ static size_t pick_logs() {
} else {
error("task type '%d' is not a known send or recv task", log->type);
}
/* Activative rank and subtype. */
active_ranks[log->otherrank] = 1;
active_subtypes[log->subtype] = 1;
}
/* Across all ranks. */
if (log->size > maxsize) maxsize = log->size;
}
}
/* Active ranks and subtypes into indices. */
nr_active_ranks = 0;
for (int k = 0; k < nr_ranks; k++) {
if (active_ranks[k]) {
active_ranks[nr_active_ranks] = k;
message("active rank: %d", k);
nr_active_ranks++;
}
}
nr_active_subtypes = 0;
for (int k = 0; k < task_subtype_count; k++) {
if (active_subtypes[k]) {
active_subtypes[nr_active_subtypes] = k;
message("active subtype: %d", k);
nr_active_subtypes++;
}
}
/* Sort into increasing tag. */
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
if (verbose)
if (verbose) {
message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
nr_recv);
message("active ranks = %d, active subtypes = %d", nr_active_ranks,
nr_active_subtypes);
}
return maxsize;
}
......@@ -459,6 +493,8 @@ int main(int argc, char *argv[]) {
/* Extract the send and recv messages for our rank. */
size_t maxsize = pick_logs();
message("active ranks = %d, active subtypes = %d", nr_active_ranks,
nr_active_subtypes);
/* Size of a message board. Needs to align on size_t. */
MESSAGE_SIZE = toblocks(maxsize) + HEADER_SIZE;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment