Skip to content
Snippets Groups Projects
Commit 5fa068ab authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Fix various races against the requests lists and the queues

Stop changing number of requests after handoff to MPI and don't let waiting drop to zero when we are about to requeue testsome

There is a lock, that should be necessary if we switch stealing back on, don't want to call MPI_Testsome from multiple threads
parent 6561362f
No related branches found
No related tags found
1 merge request!894WIP: Use MPI_Testsome to control the activation of recv tasks
...@@ -1507,10 +1507,14 @@ void scheduler_reweight(struct scheduler *s, int verbose) { ...@@ -1507,10 +1507,14 @@ void scheduler_reweight(struct scheduler *s, int verbose) {
cost = 2e9; cost = 2e9;
break; break;
case task_type_recv: case task_type_recv:
if (count_i < 1e5) if (t->subtype == task_subtype_testsome) {
cost = 5.f * (wscale * count_i) * count_i; cost = 1.0f;
else } else {
cost = 1e9; if (count_i < 1e5)
cost = 5.f * (wscale * count_i) * count_i;
else
cost = 1e9;
}
break; break;
default: default:
cost = 0; cost = 0;
...@@ -1582,8 +1586,6 @@ void scheduler_enqueue_mapper(void *map_data, int num_elements, ...@@ -1582,8 +1586,6 @@ void scheduler_enqueue_mapper(void *map_data, int num_elements,
/* Recv tasks are not enqueued until marked as ready by the testsome /* Recv tasks are not enqueued until marked as ready by the testsome
* tasks, but we do need to start the MPI recv for them for that to be * tasks, but we do need to start the MPI recv for them for that to be
* possible, so do that if needed. */ * possible, so do that if needed. */
//if (t->type == task_type_recv && t->subtype != task_subtype_testsome
// && t->wait == 1) {
if (t->type == task_type_recv && t->subtype != task_subtype_testsome) { if (t->type == task_type_recv && t->subtype != task_subtype_testsome) {
scheduler_start_recv(s, t); scheduler_start_recv(s, t);
} }
...@@ -1968,6 +1970,11 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) { ...@@ -1968,6 +1970,11 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) {
/* Release whatever locks this task held. */ /* Release whatever locks this task held. */
if (!t->implicit) task_unlock(t); if (!t->implicit) task_unlock(t);
#ifdef WITH_MPI
/* Keep against changes between now and use. */
int nr_recv_tasks = s->nr_recv_tasks;
#endif
/* Loop through the dependencies and add them to a queue if /* Loop through the dependencies and add them to a queue if
they are ready. */ they are ready. */
for (int k = 0; k < t->nr_unlock_tasks; k++) { for (int k = 0; k < t->nr_unlock_tasks; k++) {
...@@ -1999,7 +2006,15 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) { ...@@ -1999,7 +2006,15 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) {
if (!t->implicit) { if (!t->implicit) {
t->toc = getticks(); t->toc = getticks();
pthread_mutex_lock(&s->sleep_mutex); pthread_mutex_lock(&s->sleep_mutex);
atomic_dec(&s->waiting);
#ifdef WITH_MPI
/* Need to defer this as we may requeue, which could leave waiting at 0
* for a while... */
if (t->subtype != task_subtype_testsome) {
atomic_dec(&s->waiting);
}
#endif
pthread_cond_broadcast(&s->sleep_cond); pthread_cond_broadcast(&s->sleep_cond);
pthread_mutex_unlock(&s->sleep_mutex); pthread_mutex_unlock(&s->sleep_mutex);
} }
...@@ -2011,12 +2026,19 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) { ...@@ -2011,12 +2026,19 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) {
if (t->subtype == task_subtype_testsome) { if (t->subtype == task_subtype_testsome) {
/* The testsome task may have more work to do. We compare the number of /* The testsome task may have more work to do. We compare the number of
* processed recv's to the total non-skipped ones in the task lists. */ * processed recv's to the total non-skipped ones in the task lists. */
if (s->nr_recv_tasks > 0) { if (nr_recv_tasks > 0) {
t->skip = 0; t->skip = 0;
scheduler_enqueue(s, t); scheduler_enqueue(s, t);
} else { } else {
message("testsome task complete this step"); message("testsome task complete this step (%d/%d)", nr_recv_tasks,
s->nr_recv_tasks);
} }
/* Now remove the old waiting count. */
pthread_mutex_lock(&s->sleep_mutex);
atomic_dec(&s->waiting);
pthread_cond_broadcast(&s->sleep_cond);
pthread_mutex_unlock(&s->sleep_mutex);
} }
#endif #endif
...@@ -2198,6 +2220,11 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks, ...@@ -2198,6 +2220,11 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
pthread_key_create(&s->local_seed_pointer, NULL); pthread_key_create(&s->local_seed_pointer, NULL);
scheduler_reset(s, nr_tasks); scheduler_reset(s, nr_tasks);
#ifdef WITH_MPI
/* Init the requests lock. */
lock_init(&s->lock_requests);
#endif
/* XXX yes we did this... */ /* XXX yes we did this... */
myscheduler = s; myscheduler = s;
} }
......
...@@ -123,6 +123,7 @@ struct scheduler { ...@@ -123,6 +123,7 @@ struct scheduler {
int nr_size_requests; int nr_size_requests;
struct task **tasks_requests; struct task **tasks_requests;
int nr_recv_tasks; int nr_recv_tasks;
swift_lock_type lock_requests;
#endif #endif
}; };
......
...@@ -544,12 +544,15 @@ int task_lock(struct task *t) { ...@@ -544,12 +544,15 @@ int task_lock(struct task *t) {
#ifdef WITH_MPI #ifdef WITH_MPI
if (t->subtype == task_subtype_testsome) { if (t->subtype == task_subtype_testsome) {
/* Check for any messages that could be received. */ /* Check for any messages that could be received. */
struct scheduler *s = myscheduler; struct scheduler *s = myscheduler;
/* Don't want this to change when MPI is using it. */
int nr_requests = s->nr_requests;
int outcount = 0; int outcount = 0;
int indices[s->nr_requests]; int indices[nr_requests];
err = MPI_Testsome(s->nr_requests, s->requests, &outcount, indices, err = MPI_Testsome(nr_requests, s->requests, &outcount, indices,
MPI_STATUSES_IGNORE); MPI_STATUSES_IGNORE);
if (err != MPI_SUCCESS) { if (err != MPI_SUCCESS) {
mpi_error(err, "Failed to test for recv messages"); mpi_error(err, "Failed to test for recv messages");
...@@ -561,11 +564,11 @@ int task_lock(struct task *t) { ...@@ -561,11 +564,11 @@ int task_lock(struct task *t) {
s->tasks_requests[indices[k]]->recv_ready = 1; s->tasks_requests[indices[k]]->recv_ready = 1;
} }
/* XXX could remove from requests list? */ /* XXX could remove from requests list? Would need a lock. */
/* Decrement total recvs we've seen. Careful with special values. */ /* Decrement total recvs we've seen. Careful with special values. */
if (outcount > 0) { if (outcount > 0) {
s->nr_recv_tasks -= outcount; atomic_sub(&s->nr_recv_tasks, outcount);
return 1; return 1;
} }
return 0; return 0;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment