Skip to content
Snippets Groups Projects
Commit 597645ec authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Change to use an arbitrary number of injection threads

Also fix a race condition
parent e89ae239
No related branches found
No related tags found
2 merge requests!9Draft: Multiple threads for inject, send and recv.,!3Draft: Multi injectors many
......@@ -3,7 +3,7 @@ CFLAGS = -g -O0 -Wall
all: swiftmpistepsim
swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c
swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c Makefile
$(CC) $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c -I/usr/include/mpi -lmpi -lpthread
clean:
......
......@@ -50,10 +50,11 @@ static const int task_subtype_count = 30; // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[30];
/* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue[2];
static int volatile ind_req[2] = {0,0};
static int volatile nr_reqs[2] = {0,0};
static int volatile injecting[2] = {1,1};
#define NITHREADS 4
static struct mpiuse_log_entry **volatile reqs_queue[NITHREADS];
static int volatile ind_req[NITHREADS] = {0};
static int volatile nr_reqs[NITHREADS] = {0};
static int volatile injecting = NITHREADS;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0;
......@@ -168,11 +169,11 @@ static void injection_runner(int qid) {
message("%d injections completed, sends = %d, recvs = %d", ind_req[qid],
nr_sends, nr_recvs);
message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
if (usetics|| waitns > 0) message("deadtime %.3f ms", deadtime / 1.0e6);
if (usetics || waitns > 0) message("deadtime %.3f ms", deadtime / 1.0e6);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
atomic_dec(&injecting[qid]);
atomic_dec(&injecting);
}
/**
......@@ -182,22 +183,10 @@ static void injection_runner(int qid) {
* attempt to start these with the same interval gap is made if usetics is
* set, otherwise we just do them as quickly as possible.
*/
static void *inject_thread1(void *arg) {
injection_runner(0);
static void *inject_thread(void *arg) {
injection_runner(*(int *)arg);
return NULL;
}
/**
* @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
*
* The requests are initiated in the time order of the original log and an
* attempt to start these with the same interval gap is made if usetics is
* set, otherwise we just do them as quickly as possible.
*/
static void *inject_thread2(void *arg) {
injection_runner(1);
return NULL;
}
/**
* @brief main loop to run over a queue of MPI requests and test for when they
......@@ -226,7 +215,7 @@ static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
/* We loop while new requests are being injected and we still have requests
* to complete. */
while ((injecting[0] || injecting[1]) || (!injecting[0] && !injecting[1] && *todos > 0)) {
while (injecting || (!injecting && *todos > 0)) {
int nlogs = *nr_logs;
for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = logs[k];
......@@ -347,13 +336,14 @@ static void pick_logs(void) {
size_t nlogs = mpiuse_nr_logs();
/* Duplicate of logs. */
struct mpiuse_log_entry **reqs = (struct mpiuse_log_entry **)malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
int nreqs = 0;
sends_queue = (struct mpiuse_log_entry **)malloc(
struct mpiuse_log_entry **reqs = (struct mpiuse_log_entry **)malloc(
sizeof(struct mpiuse_log_entry *) * nlogs);
int nreqs = 0;
sends_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_sends = 0;
recvs_queue = (struct mpiuse_log_entry **)malloc(
sizeof(struct mpiuse_log_entry *) * nlogs);
recvs_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recvs = 0;
for (int k = 0; k < nlogs; k++) {
......@@ -379,27 +369,28 @@ static void pick_logs(void) {
/* Check. */
for (int k = 0; k < nreqs - 1; k++) {
if (reqs[k]->tic > reqs[k+1]->tic)
message("reqs_queue: %lld > %lld", reqs[k]->tic, reqs[k+1]->tic);
if (reqs[k]->tic > reqs[k + 1]->tic)
message("reqs_queue: %lld > %lld", reqs[k]->tic, reqs[k + 1]->tic);
}
/* And partition into queues for injection. Use interleave pick so that
* close in time injects are on different queues. */
reqs_queue[0] = (struct mpiuse_log_entry **)malloc(
sizeof(struct mpiuse_log_entry *) * nlogs);
nr_reqs[0] = 0;
reqs_queue[1] = (struct mpiuse_log_entry **)malloc(
sizeof(struct mpiuse_log_entry *) * nlogs);
nr_reqs[1] = 0;
for (int k = 0; k < NITHREADS; k++) {
reqs_queue[k] = (struct mpiuse_log_entry **)malloc(
sizeof(struct mpiuse_log_entry *) * nlogs);
nr_reqs[k] = 0;
ind_req[k] = 0;
}
for (int k = 0; k < nreqs; k++) {
int qid = k % 2;
int qid = k % NITHREADS;
reqs_queue[qid][nr_reqs[qid]] = reqs[k];
nr_reqs[qid]++;
}
message("nr_reqs = %d, %d", nr_reqs[0], nr_reqs[1]);
for (int k = 0; k < NITHREADS; k++) {
message("nr_reqs[%d] = %d", k, nr_reqs[k]);
}
free(reqs);
}
......@@ -407,9 +398,12 @@ static void pick_logs(void) {
* @brief usage help.
*/
static void usage(char *argv[]) {
fprintf(stderr, "Usage: %s [-vf] [-n nanosec] SWIFT_mpiuse-log-file.dat "
"logfile.dat\n", argv[0]);
fprintf(stderr, " options: -v verbose, -f fast injections, "
fprintf(stderr,
"Usage: %s [-vf] [-n nanosec] SWIFT_mpiuse-log-file.dat "
"logfile.dat\n",
argv[0]);
fprintf(stderr,
" options: -v verbose, -f fast injections, "
"-n nanoseconds to wait\n");
fflush(stderr);
}
......@@ -444,7 +438,7 @@ int main(int argc, char *argv[]) {
case 'v':
verbose = 1;
break;
case 'n':
case 'n':
waitns = atoll(optarg);
break;
default:
......@@ -490,12 +484,14 @@ int main(int argc, char *argv[]) {
/* Make three threads, one for injecting tasks and two to check for
* completions of the sends and recv independently. */
pthread_t injectthread1;
if (pthread_create(&injectthread1, NULL, &inject_thread1, &myrank) != 0)
error("Failed to create injection thread.");
pthread_t injectthread2;
if (pthread_create(&injectthread2, NULL, &inject_thread2, &myrank) != 0)
error("Failed to create injection thread.");
static int ks[NITHREADS];
pthread_t injectthread[NITHREADS];
for (int k = 0; k < NITHREADS; k++) {
ks[k] = k;
if (pthread_create(&injectthread[k], NULL, &inject_thread, &ks[k]) != 0)
error("Failed to create injection thread.");
}
pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread.");
......@@ -504,8 +500,7 @@ int main(int argc, char *argv[]) {
error("Failed to create recv thread.");
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join(injectthread1, NULL);
pthread_join(injectthread2, NULL);
for (int k = 0; k < NITHREADS; k++) pthread_join(injectthread[k], NULL);
pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment