Skip to content
Snippets Groups Projects
Commit 988a1b06 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Split recv queues into rank list for speed, no discernable difference...

parent 2decfdc3
No related branches found
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
......@@ -81,10 +81,9 @@ static struct mpiuse_log_entry **volatile send_queue;
static int volatile nr_send = 0;
static int volatile todo_send = 0;
/* The local receive queue. */
static struct mpiuse_log_entry **volatile recv_queue;
static int volatile nr_recv = 0;
static int volatile nr_recvs[16] = {0}; // XXX needs to be dynamic
/* Local receive queues separated by rank. XXX needs to be dynamic. */
static int volatile nr_recvs[16] = {0};
static struct mpiuse_log_entry **volatile recvs_queue[16];
/* Starting up the server ends. */
static int volatile starting[16] = {1}; // XXX needs to be dynamic...
......@@ -267,8 +266,9 @@ static void *recv_thread(void *arg) {
->acceptIncomingConnection(); // We block here for first message.
//message("Accepting incoming connections on %d", BASE_PORT + rank);
/* No. of receives to process. */
/* No. of receives to process and associated queue. */
int todo_recv = nr_recvs[rank];
struct mpiuse_log_entry **recv_queue = recvs_queue[rank];
/* We loop while new requests are being send and we still have messages
* to receive. */
......@@ -287,7 +287,7 @@ static void *recv_thread(void *arg) {
/* Now find the associated log. XXX speed this up, local queue. */
int found = 0;
for (int k = 0; k < nr_recv; k++) {
for (int k = 0; k < nr_recvs[rank]; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done) {
......@@ -361,13 +361,13 @@ static size_t pick_logs() {
size_t nlogs = mpiuse_nr_logs();
size_t maxsize = 0;
/* Duplicate of logs. */
send_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
/* Queues of send and receive logs. */
send_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
nr_send = 0;
recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recv = 0;
struct mpiuse_log_entry **recv_queue = (struct mpiuse_log_entry **)
calloc(nlogs, sizeof(struct mpiuse_log_entry *));
int nr_recv = 0;
for (size_t k = 0; k < nlogs; k++) {
struct mpiuse_log_entry *log = mpiuse_get_log(k);
......@@ -393,16 +393,36 @@ static size_t pick_logs() {
}
}
/* Now we need to count the numbers of messages to send per rank. */
/* Sort into increasing tic. */
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
/* Now we need to count the numbers of messages to send per rank
* and create sub-queues for these.*/
for (int k = 0; k < nr_ranks; k++) nr_recvs[k] = 0;
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
nr_recvs[log->otherrank]++;
}
for (int k = 0; k < nr_ranks; k++) {
if (nr_recvs[k] > 0) {
recvs_queue[k] = (struct mpiuse_log_entry **)
calloc(nr_recvs[k], sizeof(struct mpiuse_log_entry *));
int i = 0;
for (int j = 0; j < nr_recv; j++) {
struct mpiuse_log_entry *log = recv_queue[j];
if (log->otherrank == k) {
recvs_queue[k][i] = recv_queue[j];
i++;
}
}
}
else {
recvs_queue[k] = NULL;
}
}
free(recv_queue);
/* Sort into increasing tag. */
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
if (verbose) {
message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment