Skip to content
Snippets Groups Projects
Commit 2502a55e authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Split the work of handling the local finalization of sends into two and use...

Split the work of handling the local finalization of sends into two and use two threads to handle that
parent c706c1c8
No related branches found
No related tags found
No related merge requests found
...@@ -125,10 +125,15 @@ static int volatile nr_recv = 0; ...@@ -125,10 +125,15 @@ static int volatile nr_recv = 0;
static int volatile todo_recv = 0; static int volatile todo_recv = 0;
/* The local requests queue. */ /* The local requests queue. */
static struct mpiuse_log_entry **volatile req_queue; static struct mpiuse_log_entry **volatile req_queue1;
static int volatile ind_req = 0; static int volatile ind_req1 = 0;
static int volatile nr_req = 0; static int volatile nr_req1 = 0;
static int volatile todo_req = 0; static int volatile todo_req1 = 0;
static struct mpiuse_log_entry **volatile req_queue2;
static int volatile ind_req2 = 0;
static int volatile nr_req2 = 0;
static int volatile todo_req2 = 0;
/** /**
* @brief Convert two ranks and tag into a single unique value. * @brief Convert two ranks and tag into a single unique value.
...@@ -263,10 +268,16 @@ static void *send_thread(void *arg) { ...@@ -263,10 +268,16 @@ static void *send_thread(void *arg) {
mpi_window[log->subtype], &log->req); mpi_window[log->subtype], &log->req);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data"); if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data");
/* Add to the requests queue. */ /* Add to the requests one of the queues. */
int ind = atomic_inc(&nr_req); if (k%2) {
req_queue[ind] = log; int ind = atomic_inc(&nr_req1);
atomic_inc(&todo_req); req_queue1[ind] = log;
atomic_inc(&todo_req1);
} else {
int ind = atomic_inc(&nr_req2);
req_queue2[ind] = log;
atomic_inc(&todo_req2);
}
} }
/* All done. */ /* All done. */
...@@ -283,11 +294,27 @@ static void *send_thread(void *arg) { ...@@ -283,11 +294,27 @@ static void *send_thread(void *arg) {
*/ */
static void *req_thread(void *arg) { static void *req_thread(void *arg) {
message("%d: req thread starts with %d messages", *((int *)arg), nr_req); /* Our thread index for selecting queue. */
int us = *((int *)arg);
struct mpiuse_log_entry **volatile req_queue = NULL;
int volatile *todo_req = NULL;
int volatile *nr_req = NULL;
if (us == 1) {
req_queue = req_queue1;
todo_req = &todo_req1;
nr_req = &nr_req1;
} else {
req_queue = req_queue2;
todo_req = &todo_req2;
nr_req = &nr_req2;
}
message("%d: req thread starts with %d messages", us, *nr_req);
ticks starttics = getticks(); ticks starttics = getticks();
while (injecting || (!injecting && todo_req > 0)) { while (injecting || (!injecting && *todo_req > 0)) {
int nlogs = nr_req; int nlogs = *nr_req;
for (int k = 0; k < nlogs; k++) { for (int k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = req_queue[k]; struct mpiuse_log_entry *log = req_queue[k];
if (log != NULL && !log->done) { if (log != NULL && !log->done) {
...@@ -326,13 +353,13 @@ static void *req_thread(void *arg) { ...@@ -326,13 +353,13 @@ static void *req_thread(void *arg) {
log->done = 1; log->done = 1;
log->endtic = getticks(); log->endtic = getticks();
free(log->data); free(log->data);
atomic_dec(&todo_req); atomic_dec(todo_req);
} }
} }
} }
} }
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("%d: took %.3f %s.", us, clocks_from_ticks(getticks() - starttics),
clocks_getunit()); clocks_getunit());
return NULL; return NULL;
...@@ -445,9 +472,12 @@ static void pick_logs() { ...@@ -445,9 +472,12 @@ static void pick_logs() {
int nlogs = mpiuse_nr_logs(); int nlogs = mpiuse_nr_logs();
/* Duplicate of logs. Bit large... */ /* Duplicate of logs. Bit large... */
req_queue = (struct mpiuse_log_entry **)calloc( req_queue1 = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *)); nlogs, sizeof(struct mpiuse_log_entry *));
nr_req = 0; nr_req1 = 0;
req_queue2 = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_req2 = 0;
send_queue = (struct mpiuse_log_entry **)calloc( send_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *)); nlogs, sizeof(struct mpiuse_log_entry *));
nr_send = 0; nr_send = 0;
...@@ -643,16 +673,24 @@ int main(int argc, char *argv[]) { ...@@ -643,16 +673,24 @@ int main(int argc, char *argv[]) {
pthread_t sendthread; pthread_t sendthread;
if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0) if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
error("Failed to create send thread."); error("Failed to create send thread.");
pthread_t reqthread;
if (pthread_create(&reqthread, NULL, &req_thread, &myrank) != 0) int threadno1 = 1;
pthread_t reqthread1;
if (pthread_create(&reqthread1, NULL, &req_thread, &threadno1) != 0)
error("Failed to create send thread.");
int threadno2 = 2;
pthread_t reqthread2;
if (pthread_create(&reqthread2, NULL, &req_thread, &threadno2) != 0)
error("Failed to create send thread."); error("Failed to create send thread.");
pthread_t recvthread; pthread_t recvthread;
if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0) if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
error("Failed to create recv thread."); error("Failed to create recv thread.");
/* Wait until all threads have exited and all message exchanges have /* Wait until all threads have exited and all message exchanges have
* completed. */ * completed. */
pthread_join(reqthread, NULL); pthread_join(reqthread1, NULL);
pthread_join(reqthread2, NULL);
pthread_join(sendthread, NULL); pthread_join(sendthread, NULL);
pthread_join(recvthread, NULL); pthread_join(recvthread, NULL);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment