Commit 37557ef1 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'dynamic_unlocks' into 'master'

Dynamic unlocks

Get rid of the fixed-size unlocks array in each task, and the link-task madness that this led to.

Peter, could you give this a thorough run to make sure it doesn't break anything new? Thanks!

See merge request !66
parents 3aee3853 62962a2f
......@@ -74,7 +74,11 @@ int engine_rank;
struct link *engine_addlink(struct engine *e, struct link *l, struct task *t) {
struct link *res = &e->links[atomic_inc(&e->nr_links)];
const int ind = atomic_inc(&e->nr_links);
if (ind >= e->size_links) {
error("Link table overflow.");
}
struct link *res = &e->links[ind];
res->next = l;
res->t = t;
return res;
......@@ -314,7 +318,7 @@ void engine_repartition(struct engine *e) {
/* Nothing to do if only using a single node. Also avoids METIS
* bug that doesn't handle this case well. */
if ( nr_nodes == 1 ) return;
if (nr_nodes == 1) return;
/* Allocate the inds and weights. */
if ((inds = (idx_t *)malloc(sizeof(idx_t) * 26 *nr_cells)) == NULL ||
......@@ -585,7 +589,7 @@ void engine_repartition(struct engine *e) {
/* Check that the nodeIDs are ok. */
for (k = 0; k < nr_cells; k++)
if (nodeIDs[k] < 0 || nodeIDs[k] >= nr_nodes)
error("Got bad nodeID %"PRIDX" for cell %i.", nodeIDs[k], k);
error("Got bad nodeID %" PRIDX " for cell %i.", nodeIDs[k], k);
/* Check that the partition is complete and all nodes have some work. */
int present[nr_nodes];
......@@ -593,7 +597,7 @@ void engine_repartition(struct engine *e) {
for (i = 0; i < nr_nodes; i++) present[i] = 0;
for (i = 0; i < nr_cells; i++) present[nodeIDs[i]]++;
for (i = 0; i < nr_nodes; i++) {
if (! present[i]) {
if (!present[i]) {
failed = 1;
message("Node %d is not present after repartition", i);
}
......@@ -602,11 +606,11 @@ void engine_repartition(struct engine *e) {
/* If partition failed continue with the current one, but make this
* clear. */
if (failed) {
message("WARNING: METIS repartition has failed, continuing with "
"the current partition, load balance will not be optimal");
message(
"WARNING: METIS repartition has failed, continuing with "
"the current partition, load balance will not be optimal");
for (k = 0; k < nr_cells; k++) nodeIDs[k] = cells[k].nodeID;
}
}
/* Broadcast the result of the partition. */
......@@ -1106,7 +1110,8 @@ void engine_maketasks(struct engine *e) {
is the number of cells (s->tot_cells) times the number of neighbours (27)
times the number of interaction types (2, density and force). */
if (e->links != NULL) free(e->links);
if ((e->links = malloc(sizeof(struct link) * s->tot_cells * 27 * 2)) == NULL)
e->size_links = s->tot_cells * 27 * 2;
if ((e->links = malloc(sizeof(struct link) * e->size_links)) == NULL)
error("Failed to allocate cell-task links.");
e->nr_links = 0;
......@@ -1282,6 +1287,9 @@ void engine_maketasks(struct engine *e) {
#endif
/* Set the unlocks per task. */
scheduler_set_unlocks(sched);
/* Rank the tasks. */
scheduler_ranktasks(sched);
......@@ -1805,7 +1813,7 @@ void engine_step(struct engine *e) {
if (e->step == 0 || !(e->policy & engine_policy_fixdt)) {
TIMER_TIC
engine_launch(e, (e->nr_threads > 8) ? 8 : e->nr_threads,
(1 << task_type_kick1) | (1 << task_type_link));
(1 << task_type_kick1));
TIMER_TOC(timer_kick1);
}
......@@ -1836,8 +1844,7 @@ void engine_step(struct engine *e) {
(1 << task_type_ghost) | (1 << task_type_kick2) |
(1 << task_type_send) | (1 << task_type_recv) |
(1 << task_type_grav_pp) | (1 << task_type_grav_mm) |
(1 << task_type_grav_up) | (1 << task_type_grav_down) |
(1 << task_type_link));
(1 << task_type_grav_up) | (1 << task_type_grav_down));
TIMER_TOC(timer_runners);
......@@ -2226,13 +2233,14 @@ void engine_init(struct engine *e, struct space *s, float dt, int nr_threads,
s->cells[k].kick1 =
scheduler_addtask(&e->sched, task_type_kick1, task_subtype_none, 0, 0,
&s->cells[k], NULL, 0);
scheduler_ranktasks(&e->sched);
/* Create the sorting tasks. */
for (i = 0; i < e->nr_threads; i++)
scheduler_addtask(&e->sched, task_type_psort, task_subtype_none, i, 0, NULL,
NULL, 0);
scheduler_ranktasks(&e->sched);
/* Allocate and init the threads. */
if ((e->runners =
(struct runner *)malloc(sizeof(struct runner) * nr_threads)) == NULL)
......
......@@ -124,7 +124,7 @@ struct engine {
/* Linked list for cell-task association. */
struct link *links;
int nr_links;
int nr_links, size_links;
};
/* Function prototypes. */
......
......@@ -57,38 +57,34 @@
void scheduler_addunlock(struct scheduler *s, struct task *ta,
struct task *tb) {
/* Main loop. */
while (1) {
/* Follow the links. */
while (ta->nr_unlock_tasks == task_maxunlock + 1)
ta = ta->unlock_tasks[task_maxunlock];
/* Get the index of the next free task. */
int ind = atomic_inc(&ta->nr_unlock_tasks);
/* Is there room in this task? */
if (ind < task_maxunlock) {
ta->unlock_tasks[ind] = tb;
break;
}
/* Otherwise, generate a link task. */
else {
/* Lock the scheduler since re-allocating the unlocks is not
thread-safe. */
if (lock_lock(&s->lock) != 0) error("Unable to lock scheduler.");
/* Does the buffer need to be grown? */
if (s->nr_unlocks == s->size_unlocks) {
struct task **unlocks_new;
int *unlock_ind_new;
s->size_unlocks *= 2;
if ((unlocks_new = (struct task **)malloc(
sizeof(struct task *) *s->size_unlocks)) == NULL ||
(unlock_ind_new = (int *)malloc(sizeof(int) * s->size_unlocks)) == NULL)
error("Failed to re-allocate unlocks.");
memcpy(unlocks_new, s->unlocks, sizeof(struct task *) * s->nr_unlocks);
memcpy(unlock_ind_new, s->unlock_ind, sizeof(int) * s->nr_unlocks);
free(s->unlocks);
free(s->unlock_ind);
s->unlocks = unlocks_new;
s->unlock_ind = unlock_ind_new;
}
/* Only one thread should have to do this. */
if (ind == task_maxunlock) {
ta->unlock_tasks[task_maxunlock] =
scheduler_addtask(s, task_type_link, task_subtype_none, ta->flags,
0, ta->ci, ta->cj, 0);
ta->unlock_tasks[task_maxunlock]->implicit = 1;
}
/* Write the unlock to the scheduler. */
const int ind = atomic_inc(&s->nr_unlocks);
s->unlocks[ind] = tb;
s->unlock_ind[ind] = ta - s->tasks;
/* Otherwise, reduce the count. */
else
atomic_dec(&ta->nr_unlock_tasks);
}
}
/* Release the scheduler. */
if (lock_unlock(&s->lock) != 0) error("Unable to unlock scheduler.");
}
/**
......@@ -695,6 +691,63 @@ struct task *scheduler_addtask(struct scheduler *s, int type, int subtype,
return t;
}
/**
* @brief Set the unlock pointers in each task.
*
* @param s The #scheduler.
*/
void scheduler_set_unlocks(struct scheduler *s) {
/* Store the counts for each task. */
int *counts;
if ((counts = (int *)malloc(sizeof(int) * s->nr_tasks)) == NULL)
error("Failed to allocate temporary counts array.");
bzero(counts, sizeof(int) * s->nr_tasks);
for (int k = 0; k < s->nr_unlocks; k++) counts[s->unlock_ind[k]] += 1;
/* Compute the offset for each unlock block. */
int *offsets;
if ((offsets = (int *)malloc(sizeof(int) * (s->nr_tasks + 1))) == NULL)
error("Failed to allocate temporary offsets array.");
offsets[0] = 0;
for (int k = 0; k < s->nr_tasks; k++) offsets[k + 1] = offsets[k] + counts[k];
/* Create and fill a temporary array with the sorted unlocks. */
struct task **unlocks;
if ((unlocks = (struct task **)malloc(sizeof(struct task *) *
s->size_unlocks)) == NULL)
error("Failed to allocate temporary unlocks array.");
for (int k = 0; k < s->nr_unlocks; k++) {
const int ind = s->unlock_ind[k];
unlocks[offsets[ind]] = s->unlocks[k];
offsets[ind] += 1;
}
/* Swap the unlocks. */
free(s->unlocks);
s->unlocks = unlocks;
/* Re-set the offsets. */
offsets[0] = 0;
for (int k = 1; k < s->nr_tasks; k++)
offsets[k] = offsets[k - 1] + counts[k - 1];
for (int k = 0; k < s->nr_tasks; k++)
for (int j = offsets[k]; j < offsets[k + 1]; j++) s->unlock_ind[j] = k;
/* Set the unlocks in the tasks. */
for (int k = 0; k < s->nr_tasks; k++) {
struct task *t = &s->tasks[k];
t->nr_unlock_tasks = counts[k];
t->unlock_tasks = &s->unlocks[offsets[k]];
for (int j = offsets[k]; j < offsets[k + 1]; j++) s->unlock_ind[j] = k;
}
/* Clean up. */
free(counts);
free(offsets);
}
/**
* @brief Sort the tasks in topological order over all queues.
*
......@@ -784,6 +837,7 @@ void scheduler_reset(struct scheduler *s, int size) {
s->tasks_next = 0;
s->waiting = 0;
s->mask = 0;
s->nr_unlocks = 0;
/* Set the task pointers in the queues. */
for (k = 0; k < s->nr_queues; k++) s->queues[k].tasks = s->tasks;
......@@ -894,7 +948,7 @@ void scheduler_start(struct scheduler *s, unsigned int mask) {
/* Clear all the waits and rids. */
// ticks tic = getticks();
for (int k = 0; k < s->nr_tasks; k++) {
s->tasks[k].wait = 0;
s->tasks[k].wait = 1;
s->tasks[k].rid = -1;
}
......@@ -929,19 +983,13 @@ void scheduler_start(struct scheduler *s, unsigned int mask) {
/* message("waiting tasks took %.3f ms.",
(double)(getticks() - tic) / CPU_TPS * 1000); */
/* Don't enqueue link tasks directly. */
s->mask &= ~(1 << task_type_link);
/* Loop over the tasks and enqueue whoever is ready. */
// tic = getticks();
for (int k = 0; k < nr_tasks; k++) {
for (int k = 0; k < s->nr_tasks; k++) {
t = &tasks[tid[k]];
if (((1 << t->type) & s->mask) && !t->skip) {
if (t->wait == 0) {
scheduler_enqueue(s, t);
pthread_cond_broadcast(&s->sleep_cond);
} else
break;
if (atomic_dec(&t->wait) == 1 && ((1 << t->type) & s->mask) && !t->skip) {
scheduler_enqueue(s, t);
pthread_cond_broadcast(&s->sleep_cond);
}
}
// message( "enqueueing tasks took %.3f ms." , (double)( getticks() - tic ) /
......@@ -962,9 +1010,11 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
int err;
#endif
/* Fail if this task has already been enqueued before. */
if (t->rid >= 0) error("Task has already been enqueued.");
/* Ignore skipped tasks and tasks not in the mask. */
if (t->skip || ((1 << t->type) & ~(s->mask) && t->type != task_type_link) ||
atomic_cas(&t->rid, -1, 0) != -1) {
if (t->skip || (1 << t->type) & ~(s->mask)) {
return;
}
......@@ -1237,6 +1287,15 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_queues,
pthread_mutex_init(&s->sleep_mutex, NULL) != 0)
error("Failed to initialize sleep barrier.");
/* Init the unlocks. */
if ((s->unlocks = (struct task **)malloc(
sizeof(struct task *) *scheduler_init_nr_unlocks)) == NULL ||
(s->unlock_ind =
(int *)malloc(sizeof(int) * scheduler_init_nr_unlocks)) == NULL)
error("Failed to allocate unlocks.");
s->nr_unlocks = 0;
s->size_unlocks = scheduler_init_nr_unlocks;
/* Set the scheduler variables. */
s->nr_queues = nr_queues;
s->flags = flags;
......
......@@ -32,7 +32,7 @@
/* Some constants. */
#define scheduler_maxwait 3
#define scheduler_maxunlock 40
#define scheduler_init_nr_unlocks 10000
#define scheduler_dosub 1
#define scheduler_maxsteal 10
#define scheduler_maxtries 2
......@@ -72,6 +72,11 @@ struct scheduler {
/* The task indices. */
int *tasks_ind;
/* The task unlocks. */
struct task **unlocks;
int *unlock_ind;
int nr_unlocks, size_unlocks;
/* Lock for this scheduler. */
lock_type lock;
......@@ -103,5 +108,6 @@ void scheduler_splittasks(struct scheduler *s);
struct task *scheduler_done(struct scheduler *s, struct task *t);
struct task *scheduler_unlock(struct scheduler *s, struct task *t);
void scheduler_addunlock(struct scheduler *s, struct task *ta, struct task *tb);
void scheduler_set_unlocks(struct scheduler *s);
#endif /* SWIFT_SCHEDULER_H */
......@@ -45,7 +45,7 @@
const char *taskID_names[task_type_count] = {
"none", "sort", "self", "pair", "sub",
"ghost", "kick1", "kick2", "send", "recv",
"link", "grav_pp", "grav_mm", "grav_up", "grav_down",
"grav_pp", "grav_mm", "grav_up", "grav_down",
"psort", "split_cell", "rewait"};
/**
......
......@@ -39,7 +39,6 @@ enum task_types {
task_type_kick2,
task_type_send,
task_type_recv,
task_type_link,
task_type_grav_pp,
task_type_grav_mm,
task_type_grav_up,
......@@ -83,7 +82,7 @@ struct task {
ticks tic, toc;
int nr_unlock_tasks;
struct task *unlock_tasks[task_maxunlock + 1];
struct task **unlock_tasks;
};
/* Function prototypes. */
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment