Skip to content
Snippets Groups Projects
Commit 16ff100c authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Run 3 ranks...

parent b8b8e541
No related branches found
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
#CFLAGS = -g -O0 -Wall -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined #CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined
CFLAGS = -g -O0 -Wall -Iinfinity/include CFLAGS = -g -O0 -Wall -Iinfinity/include
......
...@@ -83,7 +83,8 @@ static int volatile todo_send = 0; ...@@ -83,7 +83,8 @@ static int volatile todo_send = 0;
/* The local receive queue. */ /* The local receive queue. */
static struct mpiuse_log_entry **volatile recv_queue; static struct mpiuse_log_entry **volatile recv_queue;
static int volatile nr_recv[16] = {0}; // XXX needs to be dynamic static int volatile nr_recv = 0;
static int volatile nr_recvs[16] = {0}; // XXX needs to be dynamic
/* Starting up the server ends. */ /* Starting up the server ends. */
static int volatile starting[16] = {1}; // XXX needs to be dynamic... static int volatile starting[16] = {1}; // XXX needs to be dynamic...
...@@ -254,7 +255,7 @@ static void *recv_thread(void *arg) { ...@@ -254,7 +255,7 @@ static void *recv_thread(void *arg) {
message("Accepting incoming connections on %d", BASE_PORT + rank); message("Accepting incoming connections on %d", BASE_PORT + rank);
/* No. of receives to process. */ /* No. of receives to process. */
int todo_recv = nr_recv[rank]; int todo_recv = nr_recvs[rank];
/* We loop while new requests are being send and we still have messages /* We loop while new requests are being send and we still have messages
* to receive. */ * to receive. */
...@@ -273,15 +274,15 @@ static void *recv_thread(void *arg) { ...@@ -273,15 +274,15 @@ static void *recv_thread(void *arg) {
/* Now find the associated log. */ /* Now find the associated log. */
int found = 0; int found = 0;
for (int k = 0; k < nr_recv[rank]; k++) { for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k]; struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done && log->otherrank == rank && if (log != NULL && !log->done && log->otherrank == rank &&
log->subtype == subtype && log->size == size && log->tag == tag) { log->subtype == subtype && log->size == size && log->tag == tag) {
found = 1; found = 1;
// if (verbose) if (verbose)
message("receive message subtype %d from %d on %d", log->subtype, rank, message("receive message subtype %d from %d on %d", log->subtype, rank,
myrank); myrank);
/* Check data sent data is unchanged and received data is as /* Check data sent data is unchanged and received data is as
* expected. */ * expected. */
...@@ -297,8 +298,9 @@ static void *recv_thread(void *arg) { ...@@ -297,8 +298,9 @@ static void *recv_thread(void *arg) {
} }
} }
if (!found) { if (!found) {
error("No matching receive on connections to %d (%d of %d todo)", error("No matching receive on connections to %d (%d of %d todo:"
BASE_PORT + rank, todo_recv, nr_recv[rank]); " rank = %d otherrank = %d subtype = %d size = %zd tag = %d)",
BASE_PORT + rank, todo_recv, nr_recvs[rank], myrank, rank, subtype, size, tag);
} }
// Ready for next use of buffer? // Ready for next use of buffer?
...@@ -338,7 +340,7 @@ static size_t pick_logs() { ...@@ -338,7 +340,7 @@ static size_t pick_logs() {
nr_send = 0; nr_send = 0;
recv_queue = (struct mpiuse_log_entry **)calloc( recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *)); nlogs, sizeof(struct mpiuse_log_entry *));
int all_nr_recv = 0; nr_recv = 0;
for (size_t k = 0; k < nlogs; k++) { for (size_t k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k); struct mpiuse_log_entry *log = mpiuse_get_log(k);
...@@ -350,8 +352,8 @@ static size_t pick_logs() { ...@@ -350,8 +352,8 @@ static size_t pick_logs() {
send_queue[nr_send] = log; send_queue[nr_send] = log;
nr_send++; nr_send++;
} else if (log->type == task_type_recv) { } else if (log->type == task_type_recv) {
recv_queue[all_nr_recv] = log; recv_queue[nr_recv] = log;
all_nr_recv++; nr_recv++;
} else { } else {
error("task type '%d' is not a known send or recv task", log->type); error("task type '%d' is not a known send or recv task", log->type);
} }
...@@ -363,19 +365,19 @@ static size_t pick_logs() { ...@@ -363,19 +365,19 @@ static size_t pick_logs() {
} }
/* Now we need to count the numbers of messages to send per rank. */ /* Now we need to count the numbers of messages to send per rank. */
for (int k = 0; k < nr_ranks; k++) nr_recv[k] = 0; for (int k = 0; k < nr_ranks; k++) nr_recvs[k] = 0;
for (int k = 0; k < all_nr_recv; k++) { for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k]; struct mpiuse_log_entry *log = recv_queue[k];
nr_recv[log->otherrank]++; nr_recvs[log->otherrank]++;
} }
/* Sort into increasing tag. */ /* Sort into increasing tag. */
qsort(recv_queue, all_nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs); qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs); qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
if (verbose) { if (verbose) {
message("maxsize = %zd, nr_send = %d, all_nr_recv = %d", maxsize, nr_send, message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
all_nr_recv); nr_recv);
} }
return maxsize; return maxsize;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment