Skip to content
Snippets Groups Projects
Commit 2decfdc3 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Add timers back

parent 64fa3245
No related branches found
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
......@@ -202,6 +202,7 @@ static void *send_thread(void *arg) {
BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
BLOCKTYPE *dataptr = (BLOCKTYPE *)calloc(datasize, BYTESINBLOCK);
log->data = dataptr;
log->injtic = getticks();
/* Fill data with pattern. */
if (datacheck)
......@@ -214,7 +215,7 @@ static void *send_thread(void *arg) {
dataptr[2] = log->size;
dataptr[3] = log->tag;
/* Need to assign to a buffer. */
/* Need to assign to a buffer to register memory. */
auto *sendBuffer = new infinity::memory::Buffer(context, dataptr,
tobytes(datasize));
......@@ -222,8 +223,9 @@ static void *send_thread(void *arg) {
infinity::requests::RequestToken requestToken(context);
qp->send(sendBuffer, &requestToken);
requestToken.waitUntilCompleted();
log->endtic = getticks();
//delete sendBuffer;// XXX Can we reuse ?
delete sendBuffer;// XXX Can we reuse ?
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
......@@ -283,29 +285,36 @@ static void *recv_thread(void *arg) {
size_t size = dataptr[2];
int tag = dataptr[3];
/* Now find the associated log. */
/* Now find the associated log. XXX speed this up, local queue. */
int found = 0;
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
if (log != NULL && !log->done && log->otherrank == rank &&
log->subtype == subtype && log->size == size && log->tag == tag) {
found = 1;
if (verbose)
message("receive message subtype %d from %d on %d", log->subtype,
rank, myrank);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], log->tag)) {
message("Data mismatch on completion");
if (log != NULL && !log->done) {
/* On the first attempt we start listening for this receive. */
if (log->injtic == 0) log->injtic = getticks();
if (log->otherrank == rank && log->subtype == subtype &&
log->size == size && log->tag == tag) {
found = 1;
if (verbose)
message("receive message subtype %d from %d on %d", log->subtype,
rank, myrank);
/* Check data sent data is unchanged and received data is as
* expected. */
if (datacheck && !datacheck_test(toblocks(log->size),
&dataptr[HEADER_SIZE], log->tag)) {
message("Data mismatch on completion");
}
/* Done, clean up. */
log->done = 1;
// free(log->data); // XXX should really offload the data to be fair.
log->endtic = getticks();
todo_recv--;
}
/* Done, clean up. */
log->done = 1;
// free(log->data); // XXX should really offload the data to be fair.
todo_recv--;
}
}
if (!found) {
......@@ -339,8 +348,8 @@ static int cmp_logs(const void *p1, const void *p2) {
struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
if (l1->tag > l2->tag) return 1;
if (l1->tag < l2->tag) return -1;
if (l1->tic > l2->tic) return 1;
if (l1->tic < l2->tic) return -1;
return 0;
}
......@@ -365,6 +374,8 @@ static size_t pick_logs() {
if (log->activation) {
if (log->rank == myrank) {
log->done = 0;
log->injtic = 0;
log->endtic = 0;
log->data = NULL;
if (log->type == task_type_send) {
send_queue[nr_send] = log;
......@@ -535,7 +546,7 @@ int main(int argc, char *argv[]) {
message("All servers are started");
// And make sure all remotes are also ready.
MPI_Barrier(MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD); // Vital...
message("All synchronized");
/* Now we have a thread per rank to send the messages. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment