diff --git a/swiftmpistepsim.c b/swiftmpistepsim.c index 056d6c260300d61dec0959c1d78b3a69e06e2ca7..7f351342a465bb419b5e97935590b142de11b750 100644 --- a/swiftmpistepsim.c +++ b/swiftmpistepsim.c @@ -29,6 +29,10 @@ // // Also the send side has no associated window, as it only pushes data, so we // cannot seem to register that? +// +// Broken method with OpenMPI which has a limit on the number of +// memory regions that can be attached to a window (32). Runs with +// Intel MPI, but not as fast as ptp. #include <limits.h> @@ -44,7 +48,8 @@ #include "error.h" #include "mpiuse.h" -#define maxtags 1000 +// XXX broken for sure... +#define maxtags 6000 /* Global: Our rank for all to see. */ int myrank = -1; @@ -208,8 +213,8 @@ static void *send_thread(void *arg) { log->rank, log->otherrank, log->subtype, log->tag)]; log->offset = ranklog_offsets[INDEX4(nr_ranks, nr_ranks, task_subtype_count, log->rank, log->otherrank, log->subtype, index)]; - message("Using offset %zd at index %d (%d %d)", log->offset, index, - log->otherrank, log->tag ); + //message("Using offset %zd at index %d (%d %d)", log->offset, index, + // log->otherrank, log->tag ); /* And define header; dataptr[0] can be any value except UNLOCKED. */ dataptr[0] = 0; @@ -227,6 +232,13 @@ static void *send_thread(void *arg) { mpi_window[log->subtype], &log->req); if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data"); + // Push. Sadly all messages, not just this one. Restores order... But slow. + //if (k % 4 == 0 || k == nr_send - 1) { + // ret = MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]); + // if (ret != MPI_SUCCESS) + // mpi_error_message(ret, "MPI_Win_flush failed"); + //} + /* Add to the requests queue. */ int ind = atomic_inc(&nr_req); req_queue[ind] = log; @@ -252,31 +264,25 @@ static void *req_thread(void *arg) { while (injecting || (!injecting && todo_req > 0)) { int nlogs = nr_req; + int ndone = 0; for (int k = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = req_queue[k]; if (log != NULL && !log->done) { int res; MPI_Status stat; int err = MPI_Test(&log->req, &res, &stat); - if (err != MPI_SUCCESS) { - error("MPI_Test call failed"); - } + if (err != MPI_SUCCESS) error("MPI_Test call failed"); if (res) { - - // Get update. XXX is this needed? - int ret = - MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]); - if (ret != MPI_SUCCESS) - mpi_error_message(ret, "MPI_Win_flush failed"); + log->endtic = getticks(); /* Send the UNLOCKED message. */ BLOCKTYPE newval[1]; BLOCKTYPE oldval[1]; newval[0] = UNLOCKED; oldval[0] = 0; - ret = MPI_Compare_and_swap(&newval[0], log->data, &oldval[0], - MPI_BLOCKTYPE, log->otherrank, log->offset, - mpi_window[log->subtype]); + int ret = MPI_Compare_and_swap(&newval[0], log->data, &oldval[0], + MPI_BLOCKTYPE, log->otherrank, log->offset, + mpi_window[log->subtype]); if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Compare_and_swap error"); @@ -288,7 +294,8 @@ static void *req_thread(void *arg) { /* Done. */ log->done = 1; - log->endtic = getticks(); + ndone++; + log->tmin = getticks(); MPI_Free_mem(log->data); atomic_dec(&todo_req); } @@ -345,6 +352,7 @@ static void *recv_thread(void *arg) { /* Done, clean up. */ log->done = 1; log->endtic = getticks(); + log->tmin = getticks(); atomic_dec(&todo_recv); if (todo_recv == 0) break; @@ -412,7 +420,7 @@ static void pick_logs(void) { for (int k = 0; k < maxtags * task_subtype_count * nr_ranks *nr_ranks; k++) ranklog_indices[k] = -1; message("nlogs = %zd", nlogs); - for (int k = 0; k < nlogs; k++) { + for (int k = 0, kk = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = mpiuse_get_log(k); if (log->rank == myrank && log->activation) { if (log->type == task_type_send || log->type == task_type_recv) { @@ -438,24 +446,25 @@ static void pick_logs(void) { /* Get MPI address and global log index. */ MPI_Get_address(log->data, &log->offset); //log->offset = (MPI_Aint) &log->data; - log->index = k; - + log->index = kk; + kk++; if (log->type == task_type_recv) { - message("recv log from rank: %d/%d/%d/%d %zd %zd", - log->otherrank, myrank, log->subtype, - log->tag, log->offset, log->index); + //message("recv log from rank: %d/%d/%d/%d %zd %zd", + // log->otherrank, myrank, log->subtype, + // log->tag, log->offset, log->index); /* Keep offset for sharing with send side. */ - ranklog_offsets[INDEX4(nr_ranks, nr_ranks, task_subtype_count, + ranklog_offsets[INDEX4(nr_ranks, nr_ranks, task_subtype_count, log->otherrank, myrank, - log->subtype, log->index)] = log->offset; - ranklog_indices[INDEX4(nr_ranks, nr_ranks, task_subtype_count, + log->subtype, log->index)] = log->offset; + ranklog_indices[INDEX4(nr_ranks, nr_ranks, task_subtype_count, log->otherrank, myrank, - log->subtype, log->tag)] = log->index; + log->subtype, log->tag)] = log->index; /* And attach to window. */ + message("%d recv log->size = %zd", kk, tobytes(log->size)); MPI_Win_attach(mpi_window[log->subtype], log->data, tobytes(log->size)); recv_queue[nr_recv] = log; @@ -465,7 +474,11 @@ static void pick_logs(void) { //message("send log to rank: %d %zd %d", log->otherrank, // log->offset, log->tag); - /* Sends local data that is not in a window. */ + /* And attach to window. Attempt to register this side as well. No + * difference. */ + message("%d send log->size = %zd", kk, tobytes(log->size)); + MPI_Win_attach(mpi_window[log->subtype], log->data, tobytes(log->size)); + send_queue[nr_send] = log; nr_send++; } @@ -476,7 +489,8 @@ static void pick_logs(void) { log->nr_tests = 0; log->tsum = 0.0; log->tmax = 0; - log->tmin = LONG_MAX; + log->tmin = 0; + log->injtic = 0; log->endtic = 0; // XXX test XXX @@ -498,9 +512,9 @@ static void pick_logs(void) { * @brief usage help. */ static void usage(char *argv[]) { - fprintf(stderr, "Usage: %s [-vfdc:s:] SWIFT_mpiuse-log-file.dat logfile.dat\n", + fprintf(stderr, "Usage: %s [-vdc:s:] SWIFT_mpiuse-log-file.dat logfile.dat\n", argv[0]); - fprintf(stderr, " options: -v verbose, -f fast injections, " + fprintf(stderr, " options: -v verbose, " "-d fill messages and check values on receive, " "-s <value> use fixed message of this size (bytes), " "-c <value> scale factor for all messages\n"); @@ -590,10 +604,12 @@ int main(int argc, char *argv[]) { /* We need to share all the offsets for each log so they can push data into * the correct parts of our receive window. */ size_t nlogs = mpiuse_nr_logs(); - MPI_Allreduce(MPI_IN_PLACE, ranklog_offsets, nr_ranks * nr_ranks * task_subtype_count * - nlogs, MPI_AINT, MPI_MAX, MPI_COMM_WORLD); - MPI_Allreduce(MPI_IN_PLACE, ranklog_indices, nr_ranks * nr_ranks * task_subtype_count * - maxtags, MPI_INT, MPI_MAX, MPI_COMM_WORLD); + size_t count = nr_ranks * nr_ranks * (size_t)task_subtype_count * (size_t)(nlogs/2); + message("1 count = %zd / %d", count, (int)count); + MPI_Allreduce(MPI_IN_PLACE, ranklog_offsets, count, MPI_AINT, MPI_MAX, MPI_COMM_WORLD); + count = nr_ranks * nr_ranks * (size_t) task_subtype_count * (size_t) maxtags; + message("2 count = %zd / %d", count, (int)count); + MPI_Allreduce(MPI_IN_PLACE, ranklog_indices, count, MPI_INT, MPI_MAX, MPI_COMM_WORLD); if (myrank == 0) { int count = 0; @@ -620,6 +636,7 @@ int main(int argc, char *argv[]) { /* Time to start time. Try to make it synchronous across the ranks. */ MPI_Barrier(MPI_COMM_WORLD); + sleep(3); clocks_set_cpufreq(0); if (myrank == 0) { message("Start of MPI tests"); @@ -664,7 +681,7 @@ int main(int argc, char *argv[]) { MPI_Barrier(MPI_COMM_WORLD); fflush(stdout); if (myrank == 0) message("Dumping updated log"); - mpiuse_dump_logs(nr_ranks, logfile); + mpiuse_dump_logs(nr_ranks, 0, logfile); /* Shutdown MPI. */ res = MPI_Finalize();