Skip to content
Snippets Groups Projects
Commit 449e37d0 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Use a lock to protect requests list from being accessed mid-update

Probably don't need the atomics now, but left in as harmless annotations
parent 5fa068ab
Branches
No related tags found
1 merge request!894WIP: Use MPI_Testsome to control the activation of recv tasks
......@@ -3190,9 +3190,8 @@ void engine_maketasks(struct engine *e) {
}
/* Create the testsome task. */
sched->testsome = scheduler_addtask(sched, task_type_recv,
task_subtype_testsome,
0, 0, NULL, NULL);
sched->testsome = scheduler_addtask(
sched, task_type_recv, task_subtype_testsome, 0, 0, NULL, NULL);
threadpool_map(&e->threadpool, engine_addtasks_recv_mapper,
recv_cell_type_pairs, num_recv_cells,
......
......@@ -302,7 +302,6 @@ void queue_clean(struct queue *q) {
free(q->tid_incoming);
}
/**
* @brief Dump a formatted list of tasks in the queue to the given file stream.
*
......
......@@ -56,7 +56,7 @@
#include "version.h"
/* XXX hack reference to the scheduler instance. */
struct scheduler *myscheduler;
struct scheduler *scheduler_scheduler;
/**
* @brief Re-set the list of active tasks.
......@@ -1508,7 +1508,7 @@ void scheduler_reweight(struct scheduler *s, int verbose) {
break;
case task_type_recv:
if (t->subtype == task_subtype_testsome) {
cost = 1.0f;
cost = 1.0f;
} else {
if (count_i < 1e5)
cost = 5.f * (wscale * count_i) * count_i;
......@@ -1603,8 +1603,8 @@ void scheduler_enqueue_mapper(void *map_data, int num_elements,
void scheduler_start(struct scheduler *s) {
#ifdef WITH_MPI
s->nr_recv_tasks = 0;
s->nr_requests = 0;
s->nr_recv_tasks = 0;
s->nr_requests = 0;
#endif
/* Reset all task timers. */
......@@ -1620,7 +1620,8 @@ void scheduler_start(struct scheduler *s) {
s->tasks[i].recv_ready = 0;
s->tasks[i].recv_started = 0;
if (!s->tasks[i].skip && s->tasks[i].type == task_type_recv &&
s->tasks[i].subtype != task_subtype_testsome) s->nr_recv_tasks++;
s->tasks[i].subtype != task_subtype_testsome)
s->nr_recv_tasks++;
#endif
}
......@@ -1629,13 +1630,13 @@ void scheduler_start(struct scheduler *s) {
/* Initialise the requests storage. */
if (s->nr_size_requests < s->nr_recv_tasks) {
swift_free("requests", s->requests);
swift_free("ind_requests", s->tasks_requests);
s->requests =
(MPI_Request *)swift_malloc("requests", sizeof(MPI_Request) * s->nr_recv_tasks);
s->tasks_requests = (struct task **)
swift_malloc("tasks_requests", sizeof(struct task *) * s->nr_recv_tasks);
s->nr_size_requests = s->nr_recv_tasks;
swift_free("requests", s->requests);
swift_free("ind_requests", s->tasks_requests);
s->requests = (MPI_Request *)swift_malloc(
"requests", sizeof(MPI_Request) * s->nr_recv_tasks);
s->tasks_requests = (struct task **)swift_malloc(
"tasks_requests", sizeof(struct task *) * s->nr_recv_tasks);
s->nr_size_requests = s->nr_recv_tasks;
}
#endif
......@@ -1678,18 +1679,19 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
if (t->skip) return;
#ifdef WITH_MPI
/* Recv tasks are not enqueued immediately, we need to make sure that they
* are ready first (using the testsome task). We do, however, need to start
* the MPI recv for them for that to be possible, so do that if needed. */
//if (t->type == task_type_recv && t->subtype != task_subtype_testsome) {
// if (!t->recv_started) {
// message("Trying to enqueue recv task %s/%s %d", taskID_names[t->type],
// subtaskID_names[t->subtype], t->recv_ready);
// scheduler_start_recv(s, t);
// return;
// }
// message("Ready to run");
//}
/* Recv tasks are not enqueued immediately, we need to make sure that they
* are ready first (using the testsome task). We do, however, need to start
* the MPI recv for them for that to be possible, so do that if needed. */
// if (t->type == task_type_recv && t->subtype != task_subtype_testsome) {
// if (!t->recv_started) {
// message("Trying to enqueue recv task %s/%s %d",
// taskID_names[t->type],
// subtaskID_names[t->subtype], t->recv_ready);
// scheduler_start_recv(s, t);
// return;
// }
// message("Ready to run");
//}
#endif
/* If this is an implicit task, just pretend it's done. */
......@@ -2024,21 +2026,21 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) {
#ifdef WITH_MPI
if (t->subtype == task_subtype_testsome) {
/* The testsome task may have more work to do. We compare the number of
* processed recv's to the total non-skipped ones in the task lists. */
if (nr_recv_tasks > 0) {
t->skip = 0;
scheduler_enqueue(s, t);
} else {
message("testsome task complete this step (%d/%d)", nr_recv_tasks,
s->nr_recv_tasks);
}
/* The testsome task may have more work to do. We compare the number of
* processed recv's to the total non-skipped ones in the task lists. */
if (nr_recv_tasks > 0) {
t->skip = 0;
scheduler_enqueue(s, t);
} else {
message("testsome task complete this step (%d/%d)", nr_recv_tasks,
s->nr_recv_tasks);
}
/* Now remove the old waiting count. */
pthread_mutex_lock(&s->sleep_mutex);
atomic_dec(&s->waiting);
pthread_cond_broadcast(&s->sleep_cond);
pthread_mutex_unlock(&s->sleep_mutex);
/* Now remove the old waiting count. */
pthread_mutex_lock(&s->sleep_mutex);
atomic_dec(&s->waiting);
pthread_cond_broadcast(&s->sleep_cond);
pthread_mutex_unlock(&s->sleep_mutex);
}
#endif
......@@ -2098,7 +2100,7 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
const struct task *prev) {
struct task *res = NULL;
const int nr_queues = s->nr_queues;
//unsigned int seed = qid;
// unsigned int seed = qid;
/* Check qid. */
if (qid >= nr_queues || qid < 0) error("Bad queue ID.");
......@@ -2118,7 +2120,7 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
}
/* If unsuccessful, try stealing from the other queues. */
//if (s->flags & scheduler_flag_steal) {
// if (s->flags & scheduler_flag_steal) {
// int count = 0, qids[nr_queues];
// for (int k = 0; k < nr_queues; k++)
// if (s->queues[k].count > 0 || s->queues[k].count_incoming > 0) {
......@@ -2180,8 +2182,14 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
int nr_queues, unsigned int flags, int nodeID,
struct threadpool *tp) {
/* Init the lock. */
/* Init the locks. */
lock_init(&s->lock);
#ifdef WITH_MPI
lock_init(&s->lock_requests);
#endif
/* Globally accessible pointer. XXX yes we did this... */
scheduler_scheduler = s;
/* Allocate the queues. */
if (swift_memalign("queues", (void **)&s->queues, queue_struct_align,
......@@ -2219,14 +2227,6 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
s->tasks_ind = NULL;
pthread_key_create(&s->local_seed_pointer, NULL);
scheduler_reset(s, nr_tasks);
#ifdef WITH_MPI
/* Init the requests lock. */
lock_init(&s->lock_requests);
#endif
/* XXX yes we did this... */
myscheduler = s;
}
/**
......@@ -2431,7 +2431,8 @@ void scheduler_start_recv(struct scheduler *s, struct task *t) {
if (t->subtype != task_subtype_testsome) {
// XXX debugging.
if (t->req != MPI_REQUEST_NULL) error("MPI request is not MPI_REQUEST_NULL");
if (t->req != MPI_REQUEST_NULL)
error("MPI request is not MPI_REQUEST_NULL");
int err = MPI_Irecv(buff, count, type, t->ci->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
......@@ -2439,13 +2440,16 @@ void scheduler_start_recv(struct scheduler *s, struct task *t) {
mpi_error(err, "Failed to emit irecv for particle data.");
}
/* Record request and associated task. */
/* Record request and associated task. Need to lock this down so we don't
* have an invalid extra request for a while. */
if (lock_lock(&s->lock_requests) != 0) error("Failed to lock requests");
int ind = atomic_inc(&s->nr_requests);
//message("index = %d for %s/%s size: %zd, from: %d, tag: %lld", ind,
// message("index = %d for %s/%s size: %zd, from: %d, tag: %lld", ind,
// taskID_names[t->type], subtaskID_names[t->subtype],
// size, t->ci->nodeID, t->flags);
s->requests[ind] = t->req;
s->tasks_requests[ind] = t;
if (lock_unlock(&s->lock_requests) != 0) error("Failed to unlock requests");
}
#endif
......@@ -2474,7 +2478,6 @@ void scheduler_dump_queues(struct engine *e) {
}
MPI_Barrier(MPI_COMM_WORLD);
for (int i = 0; i < e->nr_nodes; i++) {
/* Rank 0 decides the index of the writing node, this happens
......
......@@ -55,8 +55,7 @@
#define scheduler_flag_steal (1 << 1)
/* XXX hack reference to a scheduler... */
extern struct scheduler *myscheduler;
extern struct scheduler *scheduler_scheduler;
/* Data of a scheduler. */
struct scheduler {
......
......@@ -544,34 +544,38 @@ int task_lock(struct task *t) {
#ifdef WITH_MPI
if (t->subtype == task_subtype_testsome) {
/* Check for any messages that could be received. */
struct scheduler *s = myscheduler;
/* Don't want this to change when MPI is using it. */
int nr_requests = s->nr_requests;
int outcount = 0;
int indices[nr_requests];
err = MPI_Testsome(nr_requests, s->requests, &outcount, indices,
MPI_STATUSES_IGNORE);
if (err != MPI_SUCCESS) {
/* Check for any messages that could be received.
* Don't want any of this to change when MPI is using it,
* but lets not block. */
struct scheduler *s = scheduler_scheduler;
if (lock_trylock(&s->lock_requests) == 0) {
int nr_requests = s->nr_requests;
int outcount = 0;
int indices[nr_requests];
err = MPI_Testsome(nr_requests, s->requests, &outcount, indices,
MPI_STATUSES_IGNORE);
if (err != MPI_SUCCESS) {
mpi_error(err, "Failed to test for recv messages");
}
}
/* Mark any released tasks as ready. */
for (int k = 0; k < outcount; k++) {
s->tasks_requests[indices[k]]->req = MPI_REQUEST_NULL;
s->tasks_requests[indices[k]]->recv_ready = 1;
}
/* Mark any released tasks as ready. */
for (int k = 0; k < outcount; k++) {
s->tasks_requests[indices[k]]->req = MPI_REQUEST_NULL;
s->tasks_requests[indices[k]]->recv_ready = 1;
}
/* XXX could remove from requests list? Would need a lock. */
/* XXX could remove from requests list?. */
/* Decrement total recvs we've seen. Careful with special values. */
if (outcount > 0) {
/* Decrement total recvs we've seen. Careful with special values. */
if (outcount > 0) {
atomic_sub(&s->nr_recv_tasks, outcount);
return 1;
res = 1;
}
if (lock_unlock(&s->lock_requests) != 0)
error("Failed to lock requests");
}
return 0;
return res;
}
/* Actual recv that is ready to run. */
......@@ -593,7 +597,7 @@ int task_lock(struct task *t) {
"%s).",
taskID_names[t->type], subtaskID_names[t->subtype], t->flags, buff);
}
//if (res)
// if (res)
// message("sent: %s/%s to %d", taskID_names[t->type],
// subtaskID_names[t->subtype], t->cj->nodeID);
return res;
......@@ -1251,7 +1255,7 @@ void task_dump_active(struct engine *e) {
/* Get destination rank of MPI requests. */
int paired = (t->cj != NULL);
int otherrank = (t->ci == NULL) ? -1: t->ci->nodeID;
int otherrank = (t->ci == NULL) ? -1 : t->ci->nodeID;
if (paired) otherrank = t->cj->nodeID;
fprintf(file_thread, "%i %i %s %s %i %i %lli %lli %i %i %i %i %lli\n",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment