Skip to content
Snippets Groups Projects
Commit 50944463 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Capture non-progress, still not handling the complexity of sending to some...

Capture non-progress, still not handling the complexity of sending to some fixed offsets from a number of other ranks, needs a lot more guidance sent around the system
parent 589dbc34
No related branches found
No related tags found
No related merge requests found
...@@ -127,7 +127,8 @@ static void *inject_thread(void *arg) { ...@@ -127,7 +127,8 @@ static void *inject_thread(void *arg) {
message("%d: injection thread starts", *((int *)arg)); message("%d: injection thread starts", *((int *)arg));
ticks starttics = getticks(); ticks starttics = getticks();
int offset = 0; int recv_offset = 0;
int send_offset = 0;
while (ind_req < nr_reqs) { while (ind_req < nr_reqs) {
struct mpiuse_log_entry *log = reqs_queue[ind_req]; struct mpiuse_log_entry *log = reqs_queue[ind_req];
...@@ -155,7 +156,7 @@ static void *inject_thread(void *arg) { ...@@ -155,7 +156,7 @@ static void *inject_thread(void *arg) {
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, log->otherrank, 0, MPI_Win_lock(MPI_LOCK_EXCLUSIVE, log->otherrank, 0,
mpi_send_window[log->subtype]); mpi_send_window[log->subtype]);
size_t index = (log->otherrank * nr_ranks * nr_ranks) + size_t index = (log->otherrank * nr_ranks * nr_ranks) +
(log->rank * nr_ranks) + ind_req; (log->rank * nr_ranks) + send_offset;
MPI_Raccumulate(log->data, log->size, MPI_BYTE, log->otherrank, MPI_Raccumulate(log->data, log->size, MPI_BYTE, log->otherrank,
mpi_offsets[index], log->size, MPI_BYTE, MPI_REPLACE, mpi_offsets[index], log->size, MPI_BYTE, MPI_REPLACE,
mpi_send_window[log->subtype], &log->req); mpi_send_window[log->subtype], &log->req);
...@@ -171,20 +172,22 @@ static void *inject_thread(void *arg) { ...@@ -171,20 +172,22 @@ static void *inject_thread(void *arg) {
newval[0] = DONE; newval[0] = DONE;
oldval[0] = DONE; oldval[0] = DONE;
MPI_Compare_and_swap(&newval[0], &((char *)log->data)[log->size], MPI_Compare_and_swap(&newval[0], &((char *)log->data)[log->size],
&oldval[0], MPI_BYTE, 0, log->size, &oldval[0], MPI_BYTE, log->otherrank, log->size,
mpi_send_window[log->subtype]); mpi_send_window[log->subtype]);
/* Proceed locally. */ /* Proceed locally. */
MPI_Win_flush_local(0, mpi_send_window[log->subtype]); MPI_Win_flush_local(log->otherrank, mpi_send_window[log->subtype]);
MPI_Win_unlock(log->otherrank, mpi_send_window[log->subtype]); MPI_Win_unlock(log->otherrank, mpi_send_window[log->subtype]);
send_offset += log->size + 1;
} else { } else {
/* Add entry so we start checking for the remote send. */ /* Add entry so we start checking for the remote send. */
int ind = atomic_inc(&nr_recvs); int ind = atomic_inc(&nr_recvs);
recvs_queue[ind] = log; recvs_queue[ind] = log;
log->offset = offset; log->offset = recv_offset;
offset += log->size + 1; recv_offset += log->size + 1;
atomic_inc(&todo_recv); atomic_inc(&todo_recv);
} }
if (err != MPI_SUCCESS) error("Failed to activate send or recv"); if (err != MPI_SUCCESS) error("Failed to activate send or recv");
...@@ -227,9 +230,9 @@ static void *recv_thread(void *arg) { ...@@ -227,9 +230,9 @@ static void *recv_thread(void *arg) {
mpi_recv_window[log->subtype]); mpi_recv_window[log->subtype]);
int arrived = 0; int arrived = 0;
message("Checking at %zd", log->offset); //message("%d: Checking at %zd subtype: %d", k, log->offset, log->subtype);
if (mpi_ptr[log->subtype][log->offset+log->size] == DONE) arrived = 1; if (mpi_ptr[log->subtype][log->offset+log->size] == DONE) arrived = 1;
if (arrived) message("message has arrived?");
MPI_Win_unlock(log->otherrank, mpi_recv_window[log->subtype]); MPI_Win_unlock(log->otherrank, mpi_recv_window[log->subtype]);
/* Increment etc. of statistics about time spent waiting. */ /* Increment etc. of statistics about time spent waiting. */
...@@ -428,19 +431,12 @@ int main(int argc, char *argv[]) { ...@@ -428,19 +431,12 @@ int main(int argc, char *argv[]) {
* communicator. */ * communicator. */
for (int i = 0; i < task_subtype_count; i++) { for (int i = 0; i < task_subtype_count; i++) {
MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
//if (total_persubtype[i] > 0) { MPI_Win_allocate(total_persubtype[i] + nr_ranks * sizeof(size_t),
message("allocate window for communicator %d", i); sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i],
MPI_Win_allocate(total_persubtype[i] + nr_ranks * sizeof(size_t), &mpi_ptr[i], &mpi_recv_window[i]);
sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i],
&mpi_ptr[i], &mpi_recv_window[i]);
//} else {
//mpi_ptr[i] = NULL;
//mpi_recv_window[i] = MPI_WIN_NULL;
//}
/* Send window, just needs to attach to communicator, not data, since it /* Send window, just needs to attach to communicator, not data, since it
* does not receive. */ * does not receive. */
message("create window for communicator %d", i);
MPI_Win_create(NULL, 0, sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i], MPI_Win_create(NULL, 0, sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i],
&mpi_send_window[i]); &mpi_send_window[i]);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment