diff --git a/src/scheduler.h b/src/scheduler.h index 6bb0165136c36c85a4512e4276b3824967aa2127..cea49555502dd6ed607cf063ab09a4716593f5e7 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -135,7 +135,7 @@ struct scheduler { #ifdef WITH_MPI /* MPI windows for one-sided messages. We have one per task subtype. */ MPI_Win osmpi_window[task_subtype_count]; - scheduler_osmpi_blocktype *osmpi_ptr[task_subtype_count]; + volatile scheduler_osmpi_blocktype *osmpi_ptr[task_subtype_count]; size_t osmpi_max_size[task_subtype_count]; /* Lock for one at a time sends and recvs (should be one per subtype per node). */ diff --git a/src/task.c b/src/task.c index 64b8ced2cc720cbc3e5916770ce690edf0cec60f..2132e33c144b823f03e5aad09dd1aaceff6d661d 100644 --- a/src/task.c +++ b/src/task.c @@ -672,9 +672,6 @@ int task_lock(struct scheduler *s, struct task *t) { " (cf %lld %zd)", cj->nodeID, subtype, dataptr[2], dataptr[1], t->flags, t->size); - // XXX test test test - err = MPI_Win_flush(cj->nodeID, s->osmpi_window[subtype]); - /* Now we change the first element to unlocked so that the remote end * can find out that the data has arrived. */ scheduler_osmpi_blocktype newval[1]; @@ -756,7 +753,7 @@ int task_lock(struct scheduler *s, struct task *t) { /* Check for a message waiting for this subtype, tag and size from our * expected node. If so accept. XXX need the scheduler and these * max_sizes need synchronizing... */ - scheduler_osmpi_blocktype *dataptr = + volatile scheduler_osmpi_blocktype *dataptr = &s->osmpi_ptr[subtype][s->osmpi_max_size[subtype] * ci->nodeID]; if (dataptr[0] == (scheduler_osmpi_blocktype) scheduler_osmpi_unlocked) { @@ -771,13 +768,13 @@ int task_lock(struct scheduler *s, struct task *t) { mpiuse_log_allocation(type, subtype, &t->buff, 0, 0, 0, 0); /* Ready to process. So copy to local buffers. */ - memcpy(t->buff, &dataptr[scheduler_osmpi_header_size], t->size); + memcpy(t->buff, (void *)&dataptr[scheduler_osmpi_header_size], t->size); /* Ready for next recv. */ dataptr[0] = scheduler_osmpi_locked; //if (s->space->e->verbose) // XXX no no no - message("recv message from %d/%d)", ci->nodeID, subtype); + //message("recv message from %d/%d)", ci->nodeID, subtype); if (lock_unlock(&s->recv_lock) != 0) { error("Unlocking the MPI recv lock failed.\n"); } @@ -790,7 +787,7 @@ int task_lock(struct scheduler *s, struct task *t) { /* Cache this message. */ int result = 0; void *buff = calloc(1, dataptr[1]); - memcpy(buff, &dataptr[scheduler_osmpi_header_size], dataptr[1]); + memcpy(buff, (void *)&dataptr[scheduler_osmpi_header_size], dataptr[1]); mpicache_add(s->mpicache, ci->nodeID, subtype, dataptr[2], dataptr[1], buff); @@ -821,7 +818,7 @@ int task_lock(struct scheduler *s, struct task *t) { dataptr[0] = scheduler_osmpi_locked; //if (s->space->e->verbose) // XXX no no no - message("recv message from %d/%d)", ci->nodeID, subtype); + //message("recv message from %d/%d)", ci->nodeID, subtype); if (lock_unlock(&s->recv_lock) != 0) { error("Unlocking the MPI recv lock failed.\n"); } @@ -831,7 +828,6 @@ int task_lock(struct scheduler *s, struct task *t) { } else { /* Check the cache. */ - int result = 0; void *buff = NULL; size_t size; mpicache_fetch(s->mpicache, ci->nodeID, subtype, t->flags, @@ -842,23 +838,21 @@ int task_lock(struct scheduler *s, struct task *t) { /* Here we go. */ memcpy(t->buff, buff, t->size); - result = 1; free(buff); /* And log deactivation, if logging enabled. */ mpiuse_log_allocation(type, subtype, &t->buff, 0, 0, 0, 0); + /* XXX code re-use XXX Release the lock so another task can have a go. */ + if (lock_unlock(&s->recv_lock) != 0) { + error("Unlocking the MPI recv lock failed.\n"); + } + return 1; } //else { // message("Cache miss from %d, subtype: %d, tag: %lld, size %zd", // ci->nodeID, subtype, t->flags, t->size); //} - - /* XXX code re-use XXX Release the lock so another task can have a go. */ - if (lock_unlock(&s->recv_lock) != 0) { - error("Unlocking the MPI recv lock failed.\n"); - } - return result; } /* While we have the lock, look for any cachable messages from all @@ -866,16 +860,23 @@ int task_lock(struct scheduler *s, struct task *t) { * currently queued. (XXX active subtypes) */ for (int k = 0; k < task_subtype_count; k++) { for (int j = 0; j < s->space->e->nr_nodes; j++) { - dataptr = &s->osmpi_ptr[k][s->osmpi_max_size[k] * j]; - if (dataptr[0] == (scheduler_osmpi_blocktype) scheduler_osmpi_unlocked) { + if (j != engine_rank) { + dataptr = &s->osmpi_ptr[k][s->osmpi_max_size[k] * j]; + if (dataptr[0] == (scheduler_osmpi_blocktype) scheduler_osmpi_unlocked) { + + message("Anonymous caching from %d, subtype: %d, tag: %zd, size %zd", + j, k, dataptr[2], dataptr[1]); fflush(stdout); + + /* Cache this message. */ + void *buff = calloc(1, dataptr[1]); + memcpy(buff, (void *)&dataptr[scheduler_osmpi_header_size], dataptr[1]); + mpicache_add(s->mpicache, j, k, dataptr[2], dataptr[1], buff); - message("Caching from %d, subtype: %d, tag: %zd, size %zd", - j, k, dataptr[2], dataptr[1]); + /* Ready for next recv. */ + dataptr[0] = scheduler_osmpi_locked; - /* Cache this message. */ - void *buff = calloc(1, dataptr[1]); - memcpy(buff, &dataptr[scheduler_osmpi_header_size], dataptr[1]); - mpicache_add(s->mpicache, j, k, dataptr[2], dataptr[1], buff); + // Should we break? + } } } } diff --git a/tools/match_mpireports.py b/tools/match_mpireports.py index 3541506c41cbc8ca7f7ce67b30f42bb013adf35c..66b9857ff2effb6c4e099f31146706b0f174bda5 100755 --- a/tools/match_mpireports.py +++ b/tools/match_mpireports.py @@ -102,6 +102,12 @@ for f in infiles: recvs[key].append(line[:-1]) # Now output. Note we could have unmatched recv keys, we don't check for that. +print "# send_stic send_etic send_dtic send_step send_rank send_otherrank " + \ + "send_type send_itype send_subtype send_isubtype send_activation " + \ + "send_tag send_size send_sum " + \ + "recv_stic recv_etic recv_dtic recv_step recv_rank recv_otherrank " + \ + "recv_type recv_itype recv_subtype recv_isubtype recv_activation " + \ + "recv_tag recv_size recv_sum " for key in sends: if key in recvs: if len(sends[key]) == 1 and len(recvs[key]) == 1: