diff --git a/mpiuse.h b/mpiuse.h index 2ed512c1fe9eee06e6b2c55a90bd16fc773b0654..5cb705ca7f5441d548a37cb46dd86be56c82b365 100644 --- a/mpiuse.h +++ b/mpiuse.h @@ -59,6 +59,9 @@ struct mpiuse_log_entry { /* Interaction is complete. */ int done; + /* Completion is pending. */ + int pending; + /* Pointer to the data. */ void *data; diff --git a/swiftmpirdmastepsim.c b/swiftmpirdmastepsim.c index 8169235c93edbaa2630b0a046515594ea77820e2..49086579db994aac2b9ae917c4c73de2428a869f 100644 --- a/swiftmpirdmastepsim.c +++ b/swiftmpirdmastepsim.c @@ -286,11 +286,17 @@ static void *req_thread(void *arg) { message("%d: req thread starts with %d messages", *((int *)arg), nr_req); ticks starttics = getticks(); + /* Indices of messages that have completed, but we're not ready to complete + * yet. */ + static int nr_cache = 100; + int indices[100]; + int nr_ready = 0; + while (injecting || (!injecting && todo_req > 0)) { int nlogs = nr_req; for (int k = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = req_queue[k]; - if (log != NULL && !log->done) { + if (log != NULL && !log->done && !log->pending) { int res; MPI_Status stat; int err = MPI_Test(&log->req, &res, &stat); @@ -298,38 +304,78 @@ static void *req_thread(void *arg) { error("MPI_Test call failed"); } if (res) { + indices[nr_ready] = k; + nr_ready++; + log->pending = 1; + if (nr_ready == nr_cache || todo_req == 0) { - // Start new epoch on window? - int ret = + /* Time to complete some messages. */ + int ret = MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]); - if (ret != MPI_SUCCESS) - mpi_error_message(ret, "MPI_Win_flush failed"); - - /* Send the UNLOCKED message. */ - BLOCKTYPE newval[1]; - BLOCKTYPE oldval[1]; - newval[0] = UNLOCKED; - oldval[0] = 0; - ret = MPI_Compare_and_swap(&newval[0], log->data, &oldval[0], - MPI_BLOCKTYPE, log->otherrank, log->offset, - mpi_window[log->subtype]); - - if (ret != MPI_SUCCESS) - mpi_error_message(ret, "MPI_Compare_and_swap error"); - - /* And complete. */ - ret = MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]); - if (ret != MPI_SUCCESS) - mpi_error_message(ret, "MPI_Win_flush failed"); - - /* Done. */ - log->done = 1; - log->endtic = getticks(); - free(log->data); - atomic_dec(&todo_req); + if (ret != MPI_SUCCESS) + mpi_error_message(ret, "MPI_Win_flush failed"); + + message("releasing %d messages", nr_ready); + + for (int j = 0; j < nr_ready; j++) { + struct mpiuse_log_entry *log = req_queue[indices[j]]; + + /* Send the UNLOCKED message. */ + BLOCKTYPE newval[1]; + BLOCKTYPE oldval[1]; + newval[0] = UNLOCKED; + oldval[0] = 0; + ret = MPI_Compare_and_swap(&newval[0], log->data, &oldval[0], + MPI_BLOCKTYPE, log->otherrank, log->offset, + mpi_window[log->subtype]); + + if (ret != MPI_SUCCESS) + mpi_error_message(ret, "MPI_Compare_and_swap error"); + + /* Done. */ + log->done = 1; + log->endtic = getticks(); + free(log->data); + atomic_dec(&todo_req); + } + nr_ready = 0; + + /* And flush messages to release. */ + ret = MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]); + if (ret != MPI_SUCCESS) + mpi_error_message(ret, "MPI_Win_flush failed"); + } } } } + if (nr_ready > 0) { + + message("Completing tail: %d", nr_ready); + + /* Tail of messages to complete. */ + for (int j = 0; j < nr_ready; j++) { + struct mpiuse_log_entry *log = req_queue[indices[j]]; + + /* Send the UNLOCKED message. */ + BLOCKTYPE newval[1]; + BLOCKTYPE oldval[1]; + newval[0] = UNLOCKED; + oldval[0] = 0; + int ret = MPI_Compare_and_swap(&newval[0], log->data, &oldval[0], + MPI_BLOCKTYPE, log->otherrank, log->offset, + mpi_window[log->subtype]); + + if (ret != MPI_SUCCESS) + mpi_error_message(ret, "MPI_Compare_and_swap error"); + + /* Done. */ + log->done = 1; + log->endtic = getticks(); + free(log->data); + atomic_dec(&todo_req); + } + nr_ready = 0; + } } message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), @@ -446,13 +492,13 @@ static void pick_logs() { /* Duplicate of logs. Bit large... */ req_queue = (struct mpiuse_log_entry **)calloc( - nlogs, sizeof(struct mpiuse_log_entry *)); + nlogs / 2 + 1, sizeof(struct mpiuse_log_entry *)); nr_req = 0; send_queue = (struct mpiuse_log_entry **)calloc( - nlogs, sizeof(struct mpiuse_log_entry *)); + nlogs / 2 + 1, sizeof(struct mpiuse_log_entry *)); nr_send = 0; recv_queue = (struct mpiuse_log_entry **)calloc( - nlogs, sizeof(struct mpiuse_log_entry *)); + nlogs / 2 + 1, sizeof(struct mpiuse_log_entry *)); nr_recv = 0; for (int k = 0; k < nlogs; k++) { @@ -460,6 +506,7 @@ static void pick_logs() { if (log->activation) { if (log->rank == myrank) { log->done = 0; + log->pending = 0; log->injtic = 0; log->endtic = 0; log->data = NULL;