diff --git a/mpiuse.h b/mpiuse.h index 707ab27547b14fc701fcfd53254d29d603007891..e05cf9c6f46fd3a9646fd89dcb6e82004e1a7ea2 100644 --- a/mpiuse.h +++ b/mpiuse.h @@ -82,6 +82,9 @@ struct mpiuse_log_entry { /* Minimum time in a test. */ ticks tmin; + + /* Insertion index. */ + size_t offset; }; /* API. */ diff --git a/swiftmpirdmastepsim.c b/swiftmpirdmastepsim.c index 40de9b31c0eb351cc90ffbd8e68b65ac01b134e0..80597519647abf34eaa2fb821c8a445a70595c09 100644 --- a/swiftmpirdmastepsim.c +++ b/swiftmpirdmastepsim.c @@ -30,15 +30,18 @@ #include "error.h" #include "mpiuse.h" -/* Global: Our rank for all to see. */ +/* Our rank for all to see. */ int myrank = -1; +/* Number of ranks. */ +static int nr_ranks; + +#define READY -2 +#define DONE -10 + /* Are we verbose. */ static int verbose = 0; -/* Attempt to keep original injection time differences. */ -static int usetics = 1; - /* Set a data pattern and check we get this back, slow... */ static int datacheck = 0; @@ -51,8 +54,9 @@ static const int task_type_recv = 23; static MPI_Comm subtypeMPI_comms[task_subtype_count]; /* And the windows for one-sided communications. */ -static MPI_Win mpi_window[task_subtype_count]; -static void *mpi_ptr[task_subtype_count]; +static MPI_Win mpi_recv_window[task_subtype_count]; +static MPI_Win mpi_send_window[task_subtype_count]; +static char *mpi_ptr[task_subtype_count]; static size_t *mpi_offsets; // The plan, each rank has a window per communicator, this is to receive @@ -74,25 +78,17 @@ static size_t *mpi_offsets; // need to associate a buffer with it, just sending the memory from each // message with the appropriate offset. -/* The local queues, these are rank specific. */ -static struct mpiuse_log_entry **volatile reqs_queue; -static int volatile ind_req = 0; -static int volatile nr_reqs = 0; -static int volatile injecting = 1; - +/* The local receive queue. */ static struct mpiuse_log_entry **volatile recvs_queue; static int volatile nr_recvs = 0; static int volatile ind_recv = 0; static int volatile todo_recv = 0; -static struct mpiuse_log_entry **volatile sends_queue; -static int volatile nr_sends = 0; -static int volatile ind_send = 0; -static int volatile todo_send = 0; - -/* CPU frequency of the machine that created the MPI log. */ -// XXX need to store this in the data file. -static double log_clocks_cpufreq = 2194844448.0; +/* Requests from the messages log that we need to process on this rank. */ +static struct mpiuse_log_entry **volatile reqs_queue; +static int volatile ind_req = 0; +static int volatile nr_reqs = 0; +static int volatile injecting = 1; /** * @brief fill a data area with a pattern that can be checked for changes. @@ -124,56 +120,17 @@ static int datacheck_test(size_t size, void *data) { } /** - * @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests. - * - * The requests are initiated in the time order of the original log and an - * attempt to start these with the same interval gap is made if usetics is - * set, otherwise we just do them as quickly as possible. + * @brief Injection thread, sends messages to other ranks. */ static void *inject_thread(void *arg) { - if (verbose) message("%d: injection thread starts", *((int *)arg)); + message("%d: injection thread starts", *((int *)arg)); ticks starttics = getticks(); - /* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */ - ticks basetic = reqs_queue[0]->tic; - ticks looptics = 0; - double deadtime = 0.0; - struct timespec sleep; - sleep.tv_sec = 0; - + int offset = 0; while (ind_req < nr_reqs) { struct mpiuse_log_entry *log = reqs_queue[ind_req]; - if (usetics) { - /* Expect time between this request and the previous one. */ - ticks dt = log->tic - basetic; - basetic = log->tic; - - /* We guess some time below which we should not attempt to wait, - * otherwise we'll start to overrun, and just inject the next call if we - * are below that (we time the ticks this loop takes without any waiting - * and use that). Otherwise we wait a while. Note we need to convert the - * ticks of the log file into nanoseconds, that requires the original - * CPU frequency. */ - if (dt > looptics) { - - /* Remember to be fair and remove the looptics, then convert to - * nanoseconds. */ - double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9; - if (ns < 1.0e9) { - sleep.tv_nsec = (long)ns; - } else { - /* Wait more than one second. Must be an error, but complain and - * continue. */ - sleep.tv_nsec = (long)1.0e9; - message("wait greater than one second"); - } - nanosleep(&sleep, NULL); - deadtime += sleep.tv_nsec; - } - } - /* Initialise new log elements. */ log->done = 0; log->nr_tests = 0; @@ -186,102 +143,96 @@ static void *inject_thread(void *arg) { /* Differences to SWIFT: MPI_BYTE not the MPI_Type. */ int err = 0; if (log->type == task_type_send) { - log->data = calloc(log->size, 1); + log->data = calloc(log->size + sizeof(int), 1); /* Fill data with pattern. */ if (datacheck) datacheck_fill(log->size, log->data); - /* And send. */ - err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, - subtypeMPI_comms[log->subtype], &log->req); - - /* Add a new send request. */ - int ind = atomic_inc(&nr_sends); - sends_queue[ind] = log; - atomic_inc(&todo_send); + /* Last char is marked as READY (to receive) */ + ((char *)log->data)[log->size] = READY; + + /* XXX And send data to other rank at the appropriate offset. */ + MPI_Win_lock(MPI_LOCK_EXCLUSIVE, log->otherrank, 0, + mpi_send_window[log->subtype]); + size_t index = (log->otherrank * nr_ranks * nr_ranks) + + (log->rank * nr_ranks) + ind_req; + MPI_Raccumulate(log->data, log->size, MPI_BYTE, log->otherrank, + mpi_offsets[index], log->size, MPI_BYTE, MPI_REPLACE, + mpi_send_window[log->subtype], &log->req); + + /* Wait for local completion? */ + int flag = 0; + while (flag == 0) MPI_Test(&log->req, &flag, MPI_STATUS_IGNORE); + + /* Now we change the last element to DONE so that the remote end can + * find out that the data has arrived. */ + char newval[1]; + char oldval[1]; + newval[0] = DONE; + oldval[0] = DONE; + MPI_Compare_and_swap(&newval[0], &((char *)log->data)[log->size], + &oldval[0], MPI_BYTE, 0, log->size, + mpi_send_window[log->subtype]); + + /* Proceed locally. */ + MPI_Win_flush_local(0, mpi_send_window[log->subtype]); + MPI_Win_unlock(log->otherrank, mpi_send_window[log->subtype]); } else { - /* Ready to receive. */ - log->data = calloc(log->size, 1); - err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, - subtypeMPI_comms[log->subtype], &log->req); - - /* Add a new recv request. */ + /* Add entry so we start checking for the remote send. */ int ind = atomic_inc(&nr_recvs); recvs_queue[ind] = log; + log->offset = offset; + offset += log->size + 1; atomic_inc(&todo_recv); } if (err != MPI_SUCCESS) error("Failed to activate send or recv"); ind_req++; - - /* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are - * equally timed. Note we include a nanosleep, they are slow. */ - if (looptics == 0 && usetics) { - sleep.tv_nsec = 1; - nanosleep(&sleep, NULL); - looptics = (getticks() - starttics); - if (verbose) - message("injection loop took %.3f %s.", clocks_from_ticks(looptics), - clocks_getunit()); - } } - /* All done, thread exiting. */ - if (verbose) { - message("%d injections completed, sends = %d, recvs = %d", ind_req, - nr_sends, nr_recvs); - message("remaining sends = %d, recvs = %d", todo_send, todo_recv); - if (usetics) message("deadtime %.3f ms", deadtime / 1.0e6); - } message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), clocks_getunit()); + + /* All injected, so stop checking for new receives. */ atomic_dec(&injecting); return NULL; } /** - * @brief main loop to run over a queue of MPI requests and test for when they - * complete. Returns the total amount of time spent in calls to MPI_Test and - * the number of times it was called. - * - * @param logs the list of logs pointing to requests. - * @param nr_logs pointer to the variable containing the current number of - * logs. - * @param todos pointer to the variable containing the number of requests that - * are still active. - * @param sum the total number of ticks spent in calls to MPI_Test. - * @param ncalls the total number of calls to MPI_Test. - * @param mint the minimum ticks an MPI_Test call took. - * @param maxt the maximum ticks an MPI_Test call took. + * @brief Recv thread, checks if MPI_Irecv requests have completed. */ -static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, - int volatile *todos, double *sum, int *ncalls, - ticks *mint, ticks *maxt) { +static void *recv_thread(void *arg) { - /* Global MPI_Test statistics. */ + message("%d: recv thread starts", *((int *)arg)); + ticks starttics = getticks(); + + /* Global statistics. */ int lncalls = 0; double lsum = 0.0; ticks lmint = INT_MAX; ticks lmaxt = 0; /* We loop while new requests are being injected and we still have requests - * to complete. */ - while (injecting || (!injecting && *todos > 0)) { - int nlogs = *nr_logs; + * to receive. */ + while (injecting || (!injecting && todo_recv > 0)) { + int nlogs = nr_recvs; for (int k = 0; k < nlogs; k++) { - struct mpiuse_log_entry *log = logs[k]; + struct mpiuse_log_entry *log = recvs_queue[k]; if (log != NULL && !log->done) { ticks tics = getticks(); - int res; - MPI_Status stat; - int err = MPI_Test(&log->req, &res, &stat); - if (err != MPI_SUCCESS) { - error("MPI_Test call failed"); - } - /* Increment etc. of statistics about time in MPI_Test. */ + MPI_Win_lock(MPI_LOCK_SHARED, log->otherrank, 0, + mpi_recv_window[log->subtype]); + + int arrived = 0; + message("Checking at %zd", log->offset); + if (mpi_ptr[log->subtype][log->offset+log->size] == DONE) arrived = 1; + + MPI_Win_unlock(log->otherrank, mpi_recv_window[log->subtype]); + + /* Increment etc. of statistics about time spent waiting. */ ticks dt = getticks() - tics; log->tsum += (double)dt; lsum += (double)dt; @@ -294,7 +245,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, if (dt < lmint) lmint = dt; if (dt > lmaxt) lmaxt = dt; - if (res) { + if (arrived) { /* Check data sent data is unchanged and received data is as * expected. */ if (datacheck && !datacheck_test(log->size, log->data)) { @@ -305,66 +256,18 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, log->done = 1; log->endtic = getticks(); free(log->data); - atomic_dec(todos); + atomic_dec(&todo_recv); } } } } - /* All done. */ - *sum = lsum; - *ncalls = lncalls; - *mint = lmint; - *maxt = lmaxt; - return; -} - -/** - * @brief Send thread, checks if MPI_Isend requests have completed. - */ -static void *send_thread(void *arg) { - - if (verbose) message("%d: send thread starts (%d)", *((int *)arg), injecting); - ticks starttics = getticks(); - - int ncalls; - double sum; - ticks mint; - ticks maxt; - queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt); - message( - "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " + "%d MPI waiting calls, which took: %.3f, mean time %.3f, min time %.3f, " + "max time " "%.3f (%s)", - ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), - clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); - if (verbose) - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), - clocks_getunit()); - - /* Thread exits. */ - return NULL; -} - -/** - * @brief Recv thread, checks if MPI_Irecv requests have completed. - */ -static void *recv_thread(void *arg) { - - if (verbose) message("%d: recv thread starts", *((int *)arg)); - ticks starttics = getticks(); - - int ncalls; - double sum; - ticks mint; - ticks maxt; - queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt); - - message( - "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " - "%.3f (%s)", - ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), - clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); + lncalls, clocks_from_ticks(lsum), clocks_from_ticks(lsum / lncalls), + clocks_from_ticks(lmint), clocks_from_ticks(lmaxt), clocks_getunit()); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), clocks_getunit()); @@ -391,14 +294,12 @@ static int cmp_logs(const void *p1, const void *p2) { * The final list is sorted into increasing tag so we should have the * neccesary order for sending. * - * @param nr_ranks the number of MPI ranks. * @param count_persubtype count of messages per subtype. * @param total_persubtype total memory needed per subtype. * @param count_persubtype_perrank message counts per rank per subtype. * @param total_persubtype_perrank total memory per rank per subtype. */ -static void pick_logs(int nr_ranks, size_t *count_persubtype, - size_t *total_persubtype, +static void pick_logs(size_t *count_persubtype, size_t *total_persubtype, size_t *count_persubtype_perrank, size_t *total_persubtype_perrank) { size_t nlogs = mpiuse_nr_logs(); @@ -407,9 +308,6 @@ static void pick_logs(int nr_ranks, size_t *count_persubtype, reqs_queue = (struct mpiuse_log_entry **)calloc( nlogs, sizeof(struct mpiuse_log_entry *)); nr_reqs = 0; - sends_queue = (struct mpiuse_log_entry **)calloc( - nlogs, sizeof(struct mpiuse_log_entry *)); - nr_sends = 0; recvs_queue = (struct mpiuse_log_entry **)calloc( nlogs, sizeof(struct mpiuse_log_entry *)); nr_recvs = 0; @@ -424,18 +322,15 @@ static void pick_logs(int nr_ranks, size_t *count_persubtype, reqs_queue[nr_reqs] = log; nr_reqs++; - /* Totals of memory use for receives, + sizeof(int) for atomics, plus - * we round to sizeof(int) boundaries. */ + /* Totals of memory use for receives, + char for atomic flags */ if (log->type == task_type_recv) { - size_t sint = sizeof(int); - size_t size = (log->size + (sint - 1)) / sint * sint + sint; count_persubtype[log->subtype]++; - total_persubtype[log->subtype] += size; + total_persubtype[log->subtype] += log->size + 1; size_t index = log->otherrank * nr_ranks + log->subtype; count_persubtype_perrank[index]++; - total_persubtype_perrank[index] += size; + total_persubtype_perrank[index] += log->size + 1; } } else { @@ -449,9 +344,9 @@ static void pick_logs(int nr_ranks, size_t *count_persubtype, /* Check. */ for (int k = 0; k < nr_reqs - 1; k++) { - if (reqs_queue[k]->tic > reqs_queue[k + 1]->tic) - message("reqs_queue: %lld > %lld", reqs_queue[k]->tic, - reqs_queue[k + 1]->tic); + if (reqs_queue[k]->tag > reqs_queue[k + 1]->tag) + message("reqs_queue: %d > %d", reqs_queue[k]->tag, + reqs_queue[k + 1]->tag); } } @@ -476,7 +371,6 @@ int main(int argc, char *argv[]) { if (res != MPI_SUCCESS) error("Call to MPI_Init_thread failed with error %i.", res); - int nr_ranks = 0; res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks); if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res); @@ -487,14 +381,11 @@ int main(int argc, char *argv[]) { /* Handle the command-line, we expect a mpiuse data file to read and various * options. */ int opt; - while ((opt = getopt(argc, argv, "vfd")) != -1) { + while ((opt = getopt(argc, argv, "vd")) != -1) { switch (opt) { case 'd': datacheck = 1; break; - case 'f': - usetics = 0; - break; case 'v': verbose = 1; break; @@ -529,28 +420,45 @@ int main(int argc, char *argv[]) { calloc(task_subtype_count * nr_ranks, sizeof(size_t)); size_t *total_persubtype_perrank = calloc(task_subtype_count * nr_ranks, sizeof(size_t)); - pick_logs(nr_ranks, count_persubtype, total_persubtype, - count_persubtype_perrank, total_persubtype_perrank); + pick_logs(count_persubtype, total_persubtype, count_persubtype_perrank, + total_persubtype_perrank); /* Now for the one-sided setup... We need a buffer with space for receives * from all ranks in a communicator, plus an additional nr_ranks counts per * communicator. */ for (int i = 0; i < task_subtype_count; i++) { MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); - MPI_Win_allocate(total_persubtype[i] + nr_ranks * sizeof(size_t), - sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i], - mpi_ptr[i], &mpi_window[i]); + //if (total_persubtype[i] > 0) { + message("allocate window for communicator %d", i); + MPI_Win_allocate(total_persubtype[i] + nr_ranks * sizeof(size_t), + sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i], + &mpi_ptr[i], &mpi_recv_window[i]); + //} else { + //mpi_ptr[i] = NULL; + //mpi_recv_window[i] = MPI_WIN_NULL; + //} + + /* Send window, just needs to attach to communicator, not data, since it + * does not receive. */ + message("create window for communicator %d", i); + MPI_Win_create(NULL, 0, sizeof(int), MPI_INFO_NULL, subtypeMPI_comms[i], + &mpi_send_window[i]); } + message("Windows created"); /* Each rank needs to be informed what the offsets of its sends to this rank * should be. */ - mpi_offsets = calloc(sizeof(size_t), nr_ranks * task_subtype_count); + mpi_offsets = + calloc(sizeof(size_t), nr_ranks * nr_ranks * task_subtype_count); for (int k = 0; k < task_subtype_count; k++) { for (int i = 0; i < nr_ranks; i++) { - size_t index = i * nr_ranks + k; + size_t index = (myrank * nr_ranks * nr_ranks) + (i * nr_ranks) + k; mpi_offsets[index] += total_persubtype_perrank[index]; } } + MPI_Allreduce(MPI_IN_PLACE, mpi_offsets, + nr_ranks * nr_ranks * task_subtype_count, MPI_AINT, MPI_SUM, + MPI_COMM_WORLD); /* Time to start time. Try to make it synchronous across the ranks. */ MPI_Barrier(MPI_COMM_WORLD); @@ -559,27 +467,22 @@ int main(int argc, char *argv[]) { message("Start of MPI tests"); message("=================="); if (verbose) { - if (!usetics) message("using fast untimed injections"); if (datacheck) message("checking data pattern on send and recv completion"); } } - /* Make three threads, one for injecting tasks and two to check for - * completions of the sends and recv independently. */ + /* Make two threads, one for injecting tasks and one to check for + * completions of the remote sends. */ pthread_t injectthread; if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0) error("Failed to create injection thread."); - pthread_t sendthread; - if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) - error("Failed to create send thread."); pthread_t recvthread; if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) error("Failed to create recv thread."); /* Wait until all threads have exited and all MPI requests have completed. */ pthread_join(injectthread, NULL); - pthread_join(sendthread, NULL); pthread_join(recvthread, NULL); /* Dump the updated MPI logs. */