Commit 990a8e98 authored by Matthieu Schaller's avatar Matthieu Schaller
Browse files

The scheduler now takes a submask argument as well to only run through part of the task graph

parent 159c9532
......@@ -1664,9 +1664,10 @@ void engine_collect_kick(struct cell *c) {
* @param e The #engine.
* @param nr_runners The number of #runner to let loose.
* @param mask The task mask to launch.
* @param submask The sub-task mask to launch.
*/
void engine_launch(struct engine *e, int nr_runners, unsigned int mask) {
void engine_launch(struct engine *e, int nr_runners, unsigned int mask, unsigned int submask) {
/* Prepare the scheduler. */
atomic_inc(&e->sched.waiting);
......@@ -1679,7 +1680,7 @@ void engine_launch(struct engine *e, int nr_runners, unsigned int mask) {
/* Load the tasks. */
pthread_mutex_unlock(&e->barrier_mutex);
scheduler_start(&e->sched, mask);
scheduler_start(&e->sched, mask, submask);
pthread_mutex_lock(&e->barrier_mutex);
/* Remove the safeguard. */
......@@ -1713,16 +1714,27 @@ void engine_init_particles(struct engine *e) {
message("Initialising particles");
space_map_cells_pre(s, 1, cell_init_parts, NULL);
printParticle(e->s->parts, 1000, e->s->nr_parts);
printParticle(e->s->parts, 515050, e->s->nr_parts);
message("\n0th DENSITY CALC\n");
/* Now do a density calculation */
TIMER_TIC;
engine_launch(e, e->nr_threads,
(1 << task_type_sort) | (1 << task_type_self) |
(1 << task_type_pair) | (1 << task_type_sub) |
(1 << task_type_init) | (1 << task_type_ghost) |
(1 << task_type_send) | (1 << task_type_recv));
(1 << task_type_send) | (1 << task_type_recv),
1 << task_subtype_density);
TIMER_TOC(timer_runners);
printParticle(e->s->parts, 1000, e->s->nr_parts);
printParticle(e->s->parts, 515050, e->s->nr_parts);
abort();
/* Ready to go */
e->step = -1;
}
......@@ -1793,16 +1805,32 @@ if ( e->nodeID == 0 )
message( "nr_parts=%i." , nr_parts ); */
#endif
printf("%d %f %f %d\n", e->step, e->time, e->timeStep, updates);
fflush(stdout);
printParticle(e->s->parts, 1000, e->s->nr_parts);
printParticle(e->s->parts, 515050, e->s->nr_parts);
message("\nDRIFT\n");
/* Drift everybody */
engine_launch(e, e->nr_threads, 1 << task_type_drift, 0);
printParticle(e->s->parts, 1000, e->s->nr_parts);
printParticle(e->s->parts, 515050, e->s->nr_parts);
abort();
/* Move forward in time */
e->timeOld = e->time;
e->time = t_end_min;
e->step += 1;
printf("%d %f %f %d", e->step, e->time, t_end_max - t_end_min, updates);
e->timeStep = e->time - e->timeOld;
/* Drift everybody */
engine_launch(e, e->nr_threads, 1 << task_type_drift);
printf("%d %f %f %d\n", e->step, e->time, e->timeStep, updates);
fflush(stdout);
/* Re-distribute the particles amongst the nodes? */
if (e->forcerepart) engine_repartition(e);
......@@ -1813,10 +1841,11 @@ if ( e->nodeID == 0 )
TIMER_TIC;
engine_launch(e, e->nr_threads,
(1 << task_type_sort) | (1 << task_type_self) |
(1 << task_type_pair) | (1 << task_type_sub) |
(1 << task_type_init) | (1 << task_type_ghost) |
(1 << task_type_kick) | (1 << task_type_send) |
(1 << task_type_recv));
(1 << task_type_pair) | (1 << task_type_sub) |
(1 << task_type_init) | (1 << task_type_ghost) |
(1 << task_type_kick) | (1 << task_type_send) |
(1 << task_type_recv),
0);
TIMER_TOC(timer_runners);
......@@ -2107,6 +2136,7 @@ void engine_init(struct engine *e, struct space *s, float dt, int nr_threads,
e->nr_links = 0;
e->timeBegin = timeBegin;
e->timeEnd = timeEnd;
e->timeStep = 0.;
e->dt_min = dt_min;
e->dt_max = dt_max;
engine_rank = nodeID;
......
......@@ -155,7 +155,7 @@ void engine_barrier(struct engine *e, int tid);
void engine_init(struct engine *e, struct space *s, float dt, int nr_threads,
int nr_queues, int nr_nodes, int nodeID, int policy,
float timeBegin, float timeEnd, float dt_min, float dt_max);
void engine_launch(struct engine *e, int nr_runners, unsigned int mask);
void engine_launch(struct engine *e, int nr_runners, unsigned int mask, unsigned int submask);
void engine_prepare(struct engine *e);
void engine_print(struct engine *e);
void engine_init_particles(struct engine *e);
......
......@@ -510,6 +510,9 @@ void runner_doinit(struct runner *r, struct cell *c, int timer) {
/* Get a direct pointer on the part. */
p = &parts[i];
if(p->id == 1000) message("init 1000!");
if(p->id == 515050) message("init 515050!");
if (p->t_end <= t_end) {
......@@ -577,6 +580,9 @@ void runner_doghost(struct runner *r, struct cell *c) {
p = &parts[pid[i]];
xp = &xparts[pid[i]];
if(p->id == 1000) message("ghost 1000");
if(p->id == 515050) message("ghost 515050");
/* Is this part within the timestep? */
if (p->t_end <= t_end) {
......@@ -1018,7 +1024,7 @@ void *runner_main(void *data) {
if (t->subtype == task_subtype_density)
runner_doself1_density(r, ci);
else if (t->subtype == task_subtype_force)
runner_doself2_force(r, ci);
runner_doself2_force(r, ci);
else
error("Unknown task subtype.");
break;
......@@ -1026,7 +1032,7 @@ void *runner_main(void *data) {
if (t->subtype == task_subtype_density)
runner_dopair1_density(r, ci, cj);
else if (t->subtype == task_subtype_force)
runner_dopair2_force(r, ci, cj);
runner_dopair2_force(r, ci, cj);
else
error("Unknown task subtype.");
break;
......@@ -1085,7 +1091,7 @@ void *runner_main(void *data) {
space_split(e->s, t->ci);
break;
case task_type_rewait:
scheduler_do_rewait((struct task *)t->ci, (struct task *)t->cj, t->flags);
scheduler_do_rewait((struct task *)t->ci, (struct task *)t->cj, t->flags, t->rank);
break;
default:
error("Unknown task type.");
......
......@@ -46,8 +46,6 @@
#include "kernel.h"
#include "timers.h"
//extern struct task *store;
/**
* @brief Add an unlock_task to the given task.
*
......@@ -852,6 +850,7 @@ void scheduler_reset(struct scheduler *s, int size) {
s->tasks_next = 0;
s->waiting = 0;
s->mask = 0;
s->submask = 0;
s->nr_unlocks = 0;
/* Set the task pointers in the queues. */
......@@ -955,18 +954,18 @@ void scheduler_reweight(struct scheduler *s) {
*
* @param s The #scheduler.
* @param mask The task types to enqueue.
* @param submask The sub-task types to enqueue.
*/
void scheduler_start(struct scheduler *s, unsigned int mask) {
void scheduler_start(struct scheduler *s, unsigned int mask, unsigned int submask) {
int nr_tasks = s->nr_tasks, *tid = s->tasks_ind;
struct task *t, *tasks = s->tasks;
// ticks tic;
//store = NULL;
/* Store the mask */
/* Store the masks */
s->mask = mask | (1 << task_type_rewait);
s->submask = submask | (1 << task_subtype_none);
/* Clear all the waits and rids. */
// ticks tic = getticks();
......@@ -990,6 +989,7 @@ void scheduler_start(struct scheduler *s, unsigned int mask) {
to the task. Don't do this at home !
- ci and cj will give the range of tasks to which the waits will be applied
- the flags will be used to transfer the mask
- the rank will be used to transfer the submask
- the rest is unused.
*/
for (int k = 0; k < num_rewait_tasks; k++) {
......@@ -998,6 +998,7 @@ void scheduler_start(struct scheduler *s, unsigned int mask) {
rewait_tasks[k].cj =
(struct cell *)&s->tasks[(k + 1) * nr_tasks / s->nr_queues];
rewait_tasks[k].flags = s->mask;
rewait_tasks[k].rank = s->submask;
rewait_tasks[k].skip = 0;
rewait_tasks[k].wait = 0;
rewait_tasks[k].rid = -1;
......@@ -1017,15 +1018,20 @@ void scheduler_start(struct scheduler *s, unsigned int mask) {
/* message("waiting tasks took %.3f ms.",
(double)(getticks() - tic) / CPU_TPS * 1000); */
//scheduler_print_tasks(s, "tasks_start.dat");
s->mask = mask;
s->submask = submask | (1 << task_subtype_none);
message("mask: %d", s->mask);
message("submask: %d", s->submask);
/* Loop over the tasks and enqueue whoever is ready. */
// tic = getticks();
for (int k = 0; k < s->nr_tasks; k++) {
t = &tasks[tid[k]];
if (atomic_dec(&t->wait) == 1 && ((1 << t->type) & s->mask) && !t->skip) {
if (atomic_dec(&t->wait) == 1 &&
((1 << t->type) & s->mask) &&
((1 << t->subtype) & s->submask) &&
!t->skip) {
scheduler_enqueue(s, t);
pthread_cond_broadcast(&s->sleep_cond);
}
......@@ -1052,8 +1058,8 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
/* Fail if this task has already been enqueued before. */
if (t->rid >= 0) error("Task has already been enqueued.");
/* Ignore skipped tasks and tasks not in the mask. */
if (t->skip || (1 << t->type) & ~(s->mask)) {
/* Ignore skipped tasks and tasks not in the masks. */
if (t->skip || (1 << t->type) & ~(s->mask) || (1 << t->subtype) & ~(s->submask)) {
return;
}
......@@ -1062,11 +1068,6 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
for (int j = 0; j < t->nr_unlock_tasks; j++) {
struct task *t2 = t->unlock_tasks[j];
/* if (t2 == store) { */
/* message("Unlocked by task %s-%s address: %p", taskID_names[t->type], */
/* subtaskID_names[t->subtype], t); */
/* } */
if (atomic_dec(&t2->wait) == 1) scheduler_enqueue(s, t2);
}
}
......@@ -1161,20 +1162,11 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) {
/* Release whatever locks this task held. */
if (!t->implicit) task_unlock(t);
/* if (t == store) */
/* message("\nChecking task %s-%s address: %p", taskID_names[t->type], */
/* subtaskID_names[t->subtype], t); */
/* Loop through the dependencies and add them to a queue if
they are ready. */
for (int k = 0; k < t->nr_unlock_tasks; k++) {
struct task *t2 = t->unlock_tasks[k];
/* if (t2 == store) { */
/* message("Unlocked by task %s-%s address: %p", taskID_names[t->type], */
/* subtaskID_names[t->subtype], t); */
/* } */
int res = atomic_dec(&t2->wait);
if (res < 1) {
error("Negative wait!");
......@@ -1401,16 +1393,17 @@ void scheduler_print_tasks(struct scheduler *s, char *fileName) {
*
* @param t_begin Beginning of the #task range
* @param t_end End of the #task range
* @param mask The scheduler mask
* @param mask The scheduler task mask
* @param submask The scheduler subtask mask
*/
void scheduler_do_rewait(struct task *t_begin, struct task *t_end,
unsigned int mask) {
unsigned int mask, unsigned int submask) {
for (struct task *t2 = t_begin; t2 != t_end; t2++) {
if (t2->skip) continue;
/* Skip tasks not in the mask */
if (!((1 << t2->type) & mask)) continue;
if (!((1 << t2->type) & mask) || !((1 << t2->subtype) & submask)) continue;
/* Skip sort tasks that have already been performed */
if (t2->type == task_type_sort && t2->flags == 0) continue;
......
......@@ -59,9 +59,12 @@ struct scheduler {
/* Scheduler flags. */
unsigned int flags;
/* Scheduler mask */
/* Scheduler task mask */
unsigned int mask;
/* Scheduler sub-task mask */
unsigned int submask;
/* Number of queues in this scheduler. */
int nr_queues;
......@@ -111,7 +114,7 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
struct task *scheduler_gettask(struct scheduler *s, int qid,
const struct task* prev);
void scheduler_enqueue(struct scheduler *s, struct task *t);
void scheduler_start(struct scheduler *s, unsigned int mask);
void scheduler_start(struct scheduler *s, unsigned int mask, unsigned int submask);
void scheduler_reset(struct scheduler *s, int nr_tasks);
void scheduler_ranktasks(struct scheduler *s);
void scheduler_reweight(struct scheduler *s);
......@@ -126,6 +129,6 @@ void scheduler_set_unlocks(struct scheduler *s);
void scheduler_dump_queue(struct scheduler *s);
void scheduler_print_tasks(struct scheduler *s, char *fileName);
void scheduler_do_rewait(struct task *t_begin, struct task *t_end,
unsigned int mask);
unsigned int mask, unsigned int submask);
#endif /* SWIFT_SCHEDULER_H */
......@@ -472,7 +472,7 @@ void space_rebuild(struct space *s, double cell_max, int verbose) {
for (k = 0; k < s->nr_cells; k++)
scheduler_addtask(&s->e->sched, task_type_split_cell, task_subtype_none,
k, 0, &cells[k], NULL, 0);
engine_launch(s->e, s->e->nr_threads, 1 << task_type_split_cell);
engine_launch(s->e, s->e->nr_threads, 1 << task_type_split_cell, 0);
// message( "space_split took %.3f ms." , (double)(getticks() - tic) / CPU_TPS
// * 1000 );
......@@ -512,7 +512,7 @@ void space_parts_sort(struct space *s, int *ind, int N, int min, int max) {
space_sort_struct.waiting = 1;
// Launch the sorting tasks.
engine_launch(s->e, s->e->nr_threads, (1 << task_type_psort));
engine_launch(s->e, s->e->nr_threads, (1 << task_type_psort), 0);
/* Verify space_sort_struct. */
/* for (int i = 1; i < N; i++)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment