diff --git a/mpiuse.c b/mpiuse.c index e25981defe39914f325f895eb382fd2d7c1e3827..49ca14571faaffe7af2e871ca772a9209f10224c 100644 --- a/mpiuse.c +++ b/mpiuse.c @@ -174,9 +174,11 @@ void mpiuse_log_restore(const char *filename) { * @brief dump the logs for all ranks to a file. * * @param nranks the number of ranks. + * @param standard only write a standard log, this can be used as input to + * other runs. * @param dumpfile the file to write */ -void mpiuse_dump_logs(int nranks, const char *dumpfile) { +void mpiuse_dump_logs(int nranks, int standard, const char *dumpfile) { /* Make sure output file is empty, only on one rank. */ FILE *fd; @@ -184,13 +186,21 @@ void mpiuse_dump_logs(int nranks, const char *dumpfile) { fd = fopen(dumpfile, "w"); /* Header. */ - fprintf(fd, - "# logticin logtic injtic endtic dtic step rank otherrank itype " - " isubtype tag size nr_tests tsum tmin tmax\n"); + if (standard) { + fprintf(fd, + "# stic etic dtic step rank otherrank type itype " + " subtype isubtype activation tag size sum\n"); + } else { + fprintf(fd, + "# logticin logtic injtic endtic dtic step rank otherrank itype " + " isubtype tag size nr_tests tsum tmin tmax\n"); + } fclose(fd); } MPI_Barrier(MPI_COMM_WORLD); + const char *types[] = {"send", "recv"}; + /* Loop over all ranks, one by one, getting each rank to append their * logs. */ for (int k = 0; k < nranks; k++) { @@ -210,20 +220,31 @@ void mpiuse_dump_logs(int nranks, const char *dumpfile) { * version to match the expected injection times for this new run. */ size_t nlogs = mpiuse_log_count; ticks basetics = 0; + long long sum = 0; for (size_t k = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = &mpiuse_log[k]; if (log->rank == myrank && log->endtic > 0) { if (basetics == 0) basetics = log->tic; - fprintf(fd, - "%lld %.4f %.4f %.4f %.6f %d %d %d %d %d %d %zd %d %.4f %.6f " - "%.6f\n", - log->tic, clocks_from_ticks(log->tic - basetics), - clocks_from_ticks(log->injtic - clocks_start_ticks), - clocks_from_ticks(log->endtic - clocks_start_ticks), - clocks_from_ticks(log->endtic - log->injtic), log->step, - log->rank, log->otherrank, log->type, log->subtype, log->tag, - log->size, log->nr_tests, clocks_from_ticks(log->tsum), - clocks_from_ticks(log->tmin), clocks_from_ticks(log->tmax)); + if (standard) { + fprintf(fd, "%lld %lld %lld %d %d %d %s %d %s %d %d %d %lld %lld\n", + log->injtic, log->injtic, log->endtic - log->injtic, + log->step, log->rank, log->otherrank, + types[log->type - SEND_TYPE], log->type, "none", + log->subtype, log->activation, log->tag, log->size, sum); + sum += log->size; + } else { + fprintf( + fd, + "%lld %.4f %.4f %.4f %.6f %d %d %d %d %d %d %zd %d %.4f %.6f " + "%.6f\n", + log->tic, clocks_from_ticks(log->tic - basetics), + clocks_from_ticks(log->injtic - clocks_start_ticks), + clocks_from_ticks(log->endtic - clocks_start_ticks), + clocks_from_ticks(log->endtic - log->injtic), log->step, + log->rank, log->otherrank, log->type, log->subtype, log->tag, + log->size, log->nr_tests, clocks_from_ticks(log->tsum), + clocks_from_ticks(log->tmin), clocks_from_ticks(log->tmax)); + } } } fclose(fd); @@ -400,10 +421,10 @@ void mpiuse_log_generate(int nr_nodes, int nr_logs, int size, int random, for (int i = 0; i < nr_nodes; i++) { for (int j = 0; j < nr_nodes; j++) { if (i != j) { - mpiuse_log_allocation(i, 1, k, SEND_TYPE, NO_SUBTYPE, 1, - (size_t)logsize, j, tag); - mpiuse_log_allocation(j, 1, k, RECV_TYPE, NO_SUBTYPE, 1, - (size_t)logsize, i, tag); + mpiuse_log_allocation(i, 1, k, SEND_TYPE, NO_SUBTYPE, 1, logsize, j, + tag); + mpiuse_log_allocation(j, 1, k, RECV_TYPE, NO_SUBTYPE, 1, logsize, i, + tag); } } } diff --git a/mpiuse.h b/mpiuse.h index 71d8982f0cbb020dd6a563d2f8cb756ccacf3dd5..7ae96d6a16fdc436f783f33513b32e99f71dbb75 100644 --- a/mpiuse.h +++ b/mpiuse.h @@ -88,7 +88,7 @@ struct mpiuse_log_entry { #ifndef SEND_TYPE #define SEND_TYPE 25 #define RECV_TYPE 26 -#define NO_SUBTYPE 0 +#define NO_SUBTYPE 1 #endif /* API. */ @@ -99,7 +99,7 @@ struct mpiuse_log_entry *mpiuse_get_log(int ind); void mpiuse_log_restore(const char *filename); int mpiuse_nr_logs(void); int mpiuse_nr_ranks(void); -void mpiuse_dump_logs(int nranks, const char *logfile); +void mpiuse_dump_logs(int nranks, int standard, const char *logfile); void mpiuse_log_generate(int nr_nodes, int nr_logs, int size, int random, long int seed, int uniform, const char *cdf, diff --git a/post-process.py b/post-process.py index 458fae25a26a1a2a6be033449dc3c7ef94e294fd..e6d88241d1f4dca0c2b8d767b9237af32b2d7152 100755 --- a/post-process.py +++ b/post-process.py @@ -92,7 +92,7 @@ with open(infile, "r") as fp: if line[0] == '#': continue words = line.split() - if words[itypecol] == "22": + if words[itypecol] == "25": key = words[otherrankcol] + "/" + \ words[rankcol] + "/" + \ words[isubtypecol] + "/" + \ @@ -105,7 +105,7 @@ with open(infile, "r") as fp: sends.append(words) nsends = nsends + 1 - elif words[itypecol] == "23": + elif words[itypecol] == "26": key = words[rankcol] + "/" + \ words[otherrankcol] + "/" + \ words[isubtypecol] + "/" + \ diff --git a/swiftmpifakestepsim.c b/swiftmpifakestepsim.c index d14d8538de4f2cd51d599881b6dcd4e9ef6678c8..c1f3fa65524eac41681cf48853d738a47bad7d2d 100644 --- a/swiftmpifakestepsim.c +++ b/swiftmpifakestepsim.c @@ -127,9 +127,9 @@ static void *inject_thread(void *arg) { } else { /* Ready to receive. */ - log->data = calloc(log->size, 1); - err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag, - node_comms[log->otherrank], &log->req); + log->data = calloc(abs(log->size), 1); + err = MPI_Irecv(log->data, abs(log->size), MPI_BYTE, log->otherrank, + log->tag, node_comms[log->otherrank], &log->req); /* Add a new recv request. */ int ind = atomic_inc(&nr_recvs); @@ -356,7 +356,8 @@ static void pick_logs(int random) { int nrecv = 0; for (int k = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = mpiuse_get_log(k); - if (log->rank == myrank && log->activation && log->type == RECV_TYPE) { + if (log->rank == myrank && log->activation == 1 && + log->type == RECV_TYPE) { log->data = NULL; reqs_queue[nr_reqs] = log; nr_reqs++; @@ -371,7 +372,8 @@ static void pick_logs(int random) { int nsend = 0; for (int k = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = mpiuse_get_log(k); - if (log->rank == myrank && log->activation && log->type == SEND_TYPE) { + if (log->rank == myrank && log->activation == 1 && + log->type == SEND_TYPE) { log->data = NULL; reqs_queue[nr_reqs] = log; nr_reqs++; @@ -388,7 +390,7 @@ static void pick_logs(int random) { * @brief usage help. */ static void usage(char *argv[]) { - fprintf(stderr, "Usage: %s [options] nr_messages logfile.dat\n", argv[0]); + fprintf(stderr, "Usage: %s [vds:rgx:c:o:f:z] nr_messages logfile.dat\n", argv[0]); fprintf(stderr, " options: -v verbose, -d data check, -s size (bytes/scale), \n" "\t -f <1|2> randomize injection order, 1 == just sends, " @@ -398,7 +400,8 @@ static void usage(char *argv[]) { "\t-r -c <file> use cdf from file, size is a scale factor., |\n" "\t-r -o <file> use occurence sample of values in a file, size is a " "scale factor.,] \n" - "\t-x random seed\n"); + "\t-x random seed\n" + "\t-z output log in standard format, i.e. can be used as input"); fflush(stderr); } @@ -424,15 +427,16 @@ int main(int argc, char *argv[]) { /* Handle the command-line, we expect the number of messages to exchange per * rank an output log and some options, the interesting ones are a size and * whether to use a random selections of various kinds. */ - int size = 1024; - int random = 0; - int randomorder = 0; - int uniform = 1; char *cdf = NULL; char *odata = NULL; int opt; + int random = 0; + int randomorder = 0; + int size = 1024; + int standard = 0; + int uniform = 1; unsigned int seed = default_seed; - while ((opt = getopt(argc, argv, "vds:rgx:c:o:f:")) != -1) { + while ((opt = getopt(argc, argv, "vds:rgx:c:o:f:z")) != -1) { switch (opt) { case 'd': datacheck = 1; @@ -458,6 +462,9 @@ int main(int argc, char *argv[]) { case 'v': verbose = 1; break; + case 'z': + standard = 1; + break; case 'x': seed = atol(optarg); break; @@ -559,7 +566,7 @@ int main(int argc, char *argv[]) { MPI_Barrier(MPI_COMM_WORLD); fflush(stdout); if (myrank == 0) message("Dumping updated log"); - mpiuse_dump_logs(nranks, logfile); + mpiuse_dump_logs(nranks, standard, logfile); /* Shutdown MPI. */ res = MPI_Finalize(); diff --git a/swiftmpistepsim.c b/swiftmpistepsim.c index bca2efcfcb2414ea42e0aaf4ecc8e1e3e8cd76e4..a7597a168f5232ebc67e5508ee1eb98f0a2c788c 100644 --- a/swiftmpistepsim.c +++ b/swiftmpistepsim.c @@ -66,15 +66,13 @@ static int volatile ind_req[NITHREADS] = {0}; static int volatile nr_reqs[NITHREADS] = {0}; static int volatile injecting = NITHREADS; -static struct mpiuse_log_entry **volatile recvs_queue; -static int volatile nr_recvs = 0; -static int volatile ind_recv = 0; -static int volatile todo_recv = 0; +static struct mpiuse_log_entry **volatile recvs_queue[NITHREADS]; +static int volatile nr_recvs[NITHREADS] = {0}; +static int volatile todo_recv[NITHREADS] = {0}; -static struct mpiuse_log_entry **volatile sends_queue; -static int volatile nr_sends = 0; -static int volatile ind_send = 0; -static int volatile todo_send = 0; +static struct mpiuse_log_entry **volatile sends_queue[NITHREADS]; +static int volatile nr_sends[NITHREADS] = {0}; +static int volatile todo_send[NITHREADS] = {0}; /* CPU frequency of the machine that created the MPI log. */ // XXX need to store this in the data file. @@ -109,11 +107,13 @@ static int datacheck_test(size_t size, void *data) { return 1; } -static void injection_runner(int qid) { +static void injection_runner(int tid) { - if (verbose) message("%d: injection thread starts", qid); + if (verbose) message("%d: injection thread starts", tid); ticks starttics = getticks(); - struct mpiuse_log_entry **reqs = reqs_queue[qid]; + struct mpiuse_log_entry **reqs = reqs_queue[tid]; + struct mpiuse_log_entry **sends = sends_queue[tid]; + struct mpiuse_log_entry **recvs = recvs_queue[tid]; /* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */ ticks basetic = reqs[0]->tic; @@ -122,8 +122,8 @@ static void injection_runner(int qid) { struct timespec sleep; sleep.tv_sec = 0; - while (ind_req[qid] < nr_reqs[qid]) { - struct mpiuse_log_entry *log = reqs[ind_req[qid]]; + while (ind_req[tid] < nr_reqs[tid]) { + struct mpiuse_log_entry *log = reqs[ind_req[tid]]; if (usetics && waitns == 0) { /* Expect time between this request and the previous one. */ @@ -164,7 +164,7 @@ static void injection_runner(int qid) { log->nr_tests = 0; log->tsum = 0.0; log->tmax = 0; - log->tmin = INT_MAX; + log->tmin = LONG_MAX; log->endtic = 0; log->injtic = getticks(); @@ -181,9 +181,9 @@ static void injection_runner(int qid) { subtypeMPI_comms[log->subtype], &log->req); /* Add a new send request. */ - int ind = atomic_inc(&nr_sends); - sends_queue[ind] = log; - atomic_inc(&todo_send); + int ind = atomic_inc(&nr_sends[tid]); + sends[ind] = log; + atomic_inc(&todo_send[tid]); } else { @@ -193,13 +193,13 @@ static void injection_runner(int qid) { subtypeMPI_comms[log->subtype], &log->req); /* Add a new recv request. */ - int ind = atomic_inc(&nr_recvs); - recvs_queue[ind] = log; - atomic_inc(&todo_recv); + int ind = atomic_inc(&nr_recvs[tid]); + recvs[ind] = log; + atomic_inc(&todo_recv[tid]); } if (err != MPI_SUCCESS) error("Failed to activate send or recv"); - ind_req[qid]++; + ind_req[tid]++; /* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are * equally timed. Note we include a nanosleep, they are slow. */ @@ -215,12 +215,12 @@ static void injection_runner(int qid) { /* All done, thread exiting. */ if (verbose) { - message("%d injections completed, sends = %d, recvs = %d", ind_req[qid], - nr_sends, nr_recvs); - message("remaining sends = %d, recvs = %d", todo_send, todo_recv); + message("%d injections completed, sends = %d, recvs = %d", ind_req[tid], + nr_sends[tid], nr_recvs[tid]); + message("remaining sends = %d, recvs = %d", todo_send[tid], todo_recv[tid]); if (usetics || waitns > 0) message("deadtime %.3f ms", deadtime / 1.0e6); } - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + message("%d took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics), clocks_getunit()); atomic_dec(&injecting); } @@ -259,7 +259,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, /* Global MPI_Test statistics. */ int lncalls = 0; double lsum = 0.0; - ticks lmint = INT_MAX; + ticks lmint = LONG_MAX; ticks lmaxt = 0; /* We loop while new requests are being injected and we still have requests @@ -320,22 +320,25 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs, */ static void *send_thread(void *arg) { - if (verbose) message("%d: send thread starts", *((int *)arg)); + int tid = *((int *)arg); + if (verbose) message("%d: send thread starts", tid); ticks starttics = getticks(); int ncalls; double sum; ticks mint; ticks maxt; - queue_runner(sends_queue, &nr_sends, &todo_send, &sum, &ncalls, &mint, &maxt); + queue_runner(sends_queue[tid], &nr_sends[tid], &todo_send[tid], &sum, &ncalls, + &mint, &maxt); message( - "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " + "%d %d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max " + "time " "%.3f (%s)", - ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), + tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); if (verbose) - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + message("%d took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics), clocks_getunit()); /* Thread exits. */ @@ -347,22 +350,25 @@ static void *send_thread(void *arg) { */ static void *recv_thread(void *arg) { - if (verbose) message("%d: recv thread starts", *((int *)arg)); + int tid = *((int *)arg); + if (verbose) message("%d: recv thread starts", tid); ticks starttics = getticks(); int ncalls; double sum; ticks mint; ticks maxt; - queue_runner(recvs_queue, &nr_recvs, &todo_recv, &sum, &ncalls, &mint, &maxt); + queue_runner(recvs_queue[tid], &nr_recvs[tid], &todo_recv[tid], &sum, &ncalls, + &mint, &maxt); message( - "%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time " + "%d %d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max " + "time " "%.3f (%s)", - ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), + tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls), clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit()); if (verbose) - message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), + message("%d took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics), clocks_getunit()); /* Thread exits. */ @@ -394,12 +400,6 @@ static void pick_logs(void) { struct mpiuse_log_entry **reqs = (struct mpiuse_log_entry **)calloc( nlogs, sizeof(struct mpiuse_log_entry *)); int nreqs = 0; - sends_queue = (struct mpiuse_log_entry **)calloc( - nlogs, sizeof(struct mpiuse_log_entry *)); - nr_sends = 0; - recvs_queue = (struct mpiuse_log_entry **)calloc( - nlogs, sizeof(struct mpiuse_log_entry *)); - nr_recvs = 0; for (int k = 0; k < nlogs; k++) { struct mpiuse_log_entry *log = mpiuse_get_log(k); @@ -410,7 +410,7 @@ static void pick_logs(void) { if (messagesize > 0) log->size = messagesize; /* Scale size. */ - log->size *= messagescale ; + log->size *= messagescale; /* And keep this log. */ log->data = NULL; @@ -437,17 +437,28 @@ static void pick_logs(void) { reqs_queue[k] = (struct mpiuse_log_entry **)malloc( sizeof(struct mpiuse_log_entry *) * nlogs); nr_reqs[k] = 0; + + sends_queue[k] = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_sends[k] = 0; + + recvs_queue[k] = (struct mpiuse_log_entry **)calloc( + nlogs, sizeof(struct mpiuse_log_entry *)); + nr_recvs[k] = 0; + ind_req[k] = 0; } for (int k = 0; k < nreqs; k++) { - int qid = k % NITHREADS; - reqs_queue[qid][nr_reqs[qid]] = reqs[k]; - nr_reqs[qid]++; + int tid = k % NITHREADS; + reqs_queue[tid][nr_reqs[tid]] = reqs[k]; + nr_reqs[tid]++; } - for (int k = 0; k < NITHREADS; k++) { - message("nr_reqs[%d] = %d", k, nr_reqs[k]); + if (verbose) { + for (int k = 0; k < NITHREADS; k++) { + message("nr_reqs[%d] = %d", k, nr_reqs[k]); + } } free(reqs); } @@ -456,9 +467,11 @@ static void pick_logs(void) { * @brief usage help. */ static void usage(char *argv[]) { - fprintf(stderr, "Usage: %s [-vfdc:s:n:] SWIFT_mpiuse-log-file.dat logfile.dat\n", + fprintf(stderr, + "Usage: %s [-vfdc:s:n:] SWIFT_mpiuse-log-file.dat logfile.dat\n", argv[0]); - fprintf(stderr, " options: -v verbose, -f fast injections, " + fprintf(stderr, + " options: -v verbose, -f fast injections, " "-d fill messages and check values on receive, " "-s <value> use fixed message of this size (bytes), " "-c <value> scale factor for all messages, " @@ -488,7 +501,7 @@ int main(int argc, char *argv[]) { /* Handle the command-line, we expect a mpiuse data file to read and various * options. */ int opt; - while ((opt = getopt(argc, argv, "vfdn:s:n:")) != -1) { + while ((opt = getopt(argc, argv, "vfdc:s:n:")) != -1) { switch (opt) { case 'd': datacheck = 1; @@ -535,7 +548,7 @@ int main(int argc, char *argv[]) { MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]); } - /* Each rank requires its own queue, so extract them. */ + /* Each rank and thread requires its own queue, so extract them. */ pick_logs(); /* Time to start time. Try to make it synchronous across the ranks. */ @@ -544,51 +557,46 @@ int main(int argc, char *argv[]) { if (myrank == 0) { message("Start of MPI tests"); message("=================="); + message("Using %d threads each for injection, send and receive", NITHREADS); if (waitns > 0) message("Adding fixed waits of %lld ns", waitns); - if (messagesize > 0) { - message(" "); - message(" Using fixed message size of %zd", messagesize); - } - message("=================="); - if (messagescale != 1.0f) { - message(" "); - message(" Using message scale of %f", messagescale); - } - if (verbose) { - if (!usetics) message("using fast untimed injections"); - if (datacheck) - message("checking data pattern on send and recv completion"); - } + if (messagesize > 0) + message("Using fixed message size of %zd", messagesize); + if (messagescale != 1.0f) + message("Using message scale of %g", messagescale); + if (!usetics) message("Using fast untimed injections"); + if (datacheck) message("Checking data pattern on send and recv completion"); + message(""); } - /* Make three threads, one for injecting tasks and two to check for + /* Make three types of thread, one for injecting tasks and two to check for * completions of the sends and recv independently. */ static int ks[NITHREADS]; - pthread_t injectthread[NITHREADS]; + pthread_t injectthreads[NITHREADS]; + pthread_t sendthreads[NITHREADS]; + pthread_t recvthreads[NITHREADS]; for (int k = 0; k < NITHREADS; k++) { ks[k] = k; - if (pthread_create(&injectthread[k], NULL, &inject_thread, &ks[k]) != 0) + if (pthread_create(&injectthreads[k], NULL, &inject_thread, &ks[k]) != 0) error("Failed to create injection thread."); + if (pthread_create(&sendthreads[k], NULL, &send_thread, &ks[k]) != 0) + error("Failed to create send thread."); + if (pthread_create(&recvthreads[k], NULL, &recv_thread, &ks[k]) != 0) + error("Failed to create recv thread."); } - pthread_t sendthread; - if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) - error("Failed to create send thread."); - pthread_t recvthread; - if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) - error("Failed to create recv thread."); - /* Wait until all threads have exited and all MPI requests have completed. */ - for (int k = 0; k < NITHREADS; k++) pthread_join(injectthread[k], NULL); - pthread_join(sendthread, NULL); - pthread_join(recvthread, NULL); + for (int k = 0; k < NITHREADS; k++) { + pthread_join(injectthreads[k], NULL); + pthread_join(sendthreads[k], NULL); + pthread_join(recvthreads[k], NULL); + } /* Dump the updated MPI logs. */ MPI_Barrier(MPI_COMM_WORLD); fflush(stdout); if (myrank == 0) message("Dumping updated log"); - mpiuse_dump_logs(nranks, logfile); + mpiuse_dump_logs(nranks, 0, logfile); /* Shutdown MPI. */ res = MPI_Finalize();