Commit a03bdbd8 authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

first try to merge in the scheduler_active stuff from the scheduler_skip...

first try to merge in the scheduler_active stuff from the scheduler_skip branch. currently broken, apparently engine_collect_timestep isn't doing anything.
parent 9d84db69
......@@ -52,6 +52,7 @@
#include "gravity.h"
#include "hydro.h"
#include "hydro_properties.h"
#include "scheduler.h"
#include "space.h"
#include "timers.h"
......@@ -901,7 +902,7 @@ int cell_is_drift_needed(struct cell *c, int ti_current) {
*
* @return 1 If the space needs rebuilding. 0 otherwise.
*/
int cell_unskip_tasks(struct cell *c) {
int cell_unskip_tasks(struct cell *c, struct scheduler *s) {
/* Un-skip the density tasks involved with this cell. */
for (struct link *l = c->density; l != NULL; l = l->next) {
......@@ -914,11 +915,13 @@ int cell_unskip_tasks(struct cell *c) {
if (t->type == task_type_pair) {
if (!(ci->sorted & (1 << t->flags))) {
atomic_or(&ci->sorts->flags, (1 << t->flags));
ci->sorts->skip = 0;
if (atomic_cas(&ci->sorts->skip, 1, 0))
scheduler_add_active(s, ci->sorts);
}
if (!(cj->sorted & (1 << t->flags))) {
atomic_or(&cj->sorts->flags, (1 << t->flags));
cj->sorts->skip = 0;
if (atomic_cas(&cj->sorts->skip, 1, 0))
scheduler_add_active(s, cj->sorts);
}
}
......@@ -942,19 +945,22 @@ int cell_unskip_tasks(struct cell *c) {
/* Look for the local cell cj's send tasks. */
struct link *l = NULL;
for (l = cj->send_xv; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next);
l = l->next)
;
if (l == NULL) error("Missing link to send_xv task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
for (l = cj->send_rho; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next);
l = l->next)
;
if (l == NULL) error("Missing link to send_rho task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
for (l = cj->send_ti; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next);
l = l->next)
;
if (l == NULL) error("Missing link to send_ti task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
} else if (cj->nodeID != engine_rank) {
......@@ -965,35 +971,48 @@ int cell_unskip_tasks(struct cell *c) {
/* Look for the local cell ci's send tasks. */
struct link *l = NULL;
for (l = ci->send_xv; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next);
l = l->next)
;
if (l == NULL) error("Missing link to send_xv task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
for (l = ci->send_rho; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next);
l = l->next)
;
if (l == NULL) error("Missing link to send_rho task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
for (l = ci->send_ti; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next);
l = l->next)
;
if (l == NULL) error("Missing link to send_ti task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
}
#endif
}
}
/* Unskip all the other task types. */
for (struct link *l = c->gradient; l != NULL; l = l->next) l->t->skip = 0;
for (struct link *l = c->force; l != NULL; l = l->next) l->t->skip = 0;
for (struct link *l = c->grav; l != NULL; l = l->next) l->t->skip = 0;
if (c->extra_ghost != NULL) c->extra_ghost->skip = 0;
if (c->ghost != NULL) c->ghost->skip = 0;
if (c->init != NULL) c->init->skip = 0;
if (c->kick != NULL) c->kick->skip = 0;
if (c->cooling != NULL) c->cooling->skip = 0;
if (c->sourceterms != NULL) c->sourceterms->skip = 0;
if (c->grav_external != NULL) c->grav_external->skip = 0;
for (struct link *l = c->gradient; l != NULL; l = l->next)
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
for (struct link *l = c->force; l != NULL; l = l->next)
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
for (struct link *l = c->grav; l != NULL; l = l->next)
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
if (c->extra_ghost != NULL && atomic_cas(&c->extra_ghost->skip, 1, 0))
scheduler_add_active(s, c->extra_ghost);
if (c->ghost != NULL && atomic_cas(&c->ghost->skip, 1, 0))
scheduler_add_active(s, c->ghost);
if (c->init != NULL && atomic_cas(&c->init->skip, 1, 0))
scheduler_add_active(s, c->init);
if (c->kick != NULL && atomic_cas(&c->kick->skip, 1, 0))
scheduler_add_active(s, c->kick);
if (c->cooling != NULL && atomic_cas(&c->cooling->skip, 1, 0))
scheduler_add_active(s, c->cooling);
if (c->sourceterms != NULL && atomic_cas(&c->sourceterms->skip, 1, 0))
scheduler_add_active(s, c->sourceterms);
if (c->grav_external != NULL && atomic_cas(&c->grav_external->skip, 1, 0))
scheduler_add_active(s, c->grav_external);
return 0;
}
......@@ -38,6 +38,7 @@
/* Avoid cyclic inclusions */
struct space;
struct scheduler;
/* Max tag size set to 2^29 to take into account some MPI implementations
* that use 2^31 as the upper bound on MPI tags and the fact that
......@@ -240,6 +241,6 @@ int cell_are_neighbours(const struct cell *restrict ci,
void cell_check_multipole(struct cell *c, void *data);
void cell_clean(struct cell *c);
int cell_is_drift_needed(struct cell *c, int ti_current);
int cell_unskip_tasks(struct cell *c);
int cell_unskip_tasks(struct cell *c, struct scheduler *s);
#endif /* SWIFT_CELL_H */
......@@ -1381,6 +1381,7 @@ void engine_count_and_link_tasks(struct engine *e) {
if (t->type == task_type_sort && t->ci->split)
for (int j = 0; j < 8; j++)
if (t->ci->progeny[j] != NULL && t->ci->progeny[j]->sorts != NULL) {
t->ci->progeny[j]->sorts->skip = 0;
scheduler_addunlock(sched, t->ci->progeny[j]->sorts, t);
}
......@@ -2004,8 +2005,9 @@ void engine_marktasks_mapper(void *map_data, int num_elements,
void *extra_data) {
/* Unpack the arguments. */
struct task *tasks = (struct task *)map_data;
const int ti_end = ((int *)extra_data)[0];
int *rebuild_space = &((int *)extra_data)[1];
const int ti_end = ((size_t *)extra_data)[0];
size_t *rebuild_space = &((size_t *)extra_data)[1];
struct scheduler *s = (struct scheduler *)(((size_t *)extra_data)[2]);
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &tasks[ind];
......@@ -2017,7 +2019,10 @@ void engine_marktasks_mapper(void *map_data, int num_elements,
t->type == task_type_sourceterms || t->type == task_type_sub_self) {
/* Set this task's skip. */
t->skip = (t->ci->ti_end_min > ti_end);
if ((t->skip = (t->ci->ti_end_min > ti_end)))
continue;
else
scheduler_add_active(s, t);
}
/* Pair? */
......@@ -2037,16 +2042,22 @@ void engine_marktasks_mapper(void *map_data, int num_elements,
/* Set this task's skip. */
if ((t->skip = (ci->ti_end_min > ti_end && cj->ti_end_min > ti_end)) == 1)
continue;
else scheduler_add_active(s, t);
/* If this is not a density task, we don't have to do any of the below. */
if (t->subtype != task_subtype_density) continue;
/* Set the sort flags. */
if (t->type == task_type_pair && t->subtype != task_subtype_grav) {
if (t->type == task_type_pair) {
if (!(ci->sorted & (1 << t->flags))) {
atomic_or(&ci->sorts->flags, (1 << t->flags));
ci->sorts->skip = 0;
if (atomic_cas(&ci->sorts->skip, 1, 0))
scheduler_add_active(s, ci->sorts);
}
if (!(cj->sorted & (1 << t->flags))) {
atomic_or(&cj->sorts->flags, (1 << t->flags));
cj->sorts->skip = 0;
if (atomic_cas(&cj->sorts->skip, 1, 0))
scheduler_add_active(s, cj->sorts);
}
}
......@@ -2066,19 +2077,19 @@ void engine_marktasks_mapper(void *map_data, int num_elements,
l = l->next)
;
if (l == NULL) error("Missing link to send_xv task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
for (l = cj->send_rho; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_rho task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
for (l = cj->send_ti; l != NULL && l->t->cj->nodeID != ci->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_ti task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
} else if (cj->nodeID != engine_rank) {
......@@ -2092,19 +2103,19 @@ void engine_marktasks_mapper(void *map_data, int num_elements,
l = l->next)
;
if (l == NULL) error("Missing link to send_xv task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
for (l = ci->send_rho; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_rho task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
for (l = ci->send_ti; l != NULL && l->t->cj->nodeID != cj->nodeID;
l = l->next)
;
if (l == NULL) error("Missing link to send_ti task.");
l->t->skip = 0;
if (atomic_cas(&l->t->skip, 1, 0)) scheduler_add_active(s, l->t);
}
#endif
......@@ -2112,20 +2123,22 @@ void engine_marktasks_mapper(void *map_data, int num_elements,
/* Kick? */
else if (t->type == task_type_kick) {
t->skip = (t->ci->ti_end_min > ti_end);
t->ci->updated = 0;
t->ci->g_updated = 0;
if ((t->skip = (t->ci->ti_end_min > ti_end)))
continue;
else
scheduler_add_active(s, t);
}
/* Init? */
else if (t->type == task_type_init) {
/* Set this task's skip. */
t->skip = (t->ci->ti_end_min > ti_end);
if ((t->skip = (t->ci->ti_end_min > ti_end)))
continue;
else
scheduler_add_active(s, t);
}
/* None? */
else if (t->type == task_type_none)
t->skip = 1;
}
}
......@@ -2152,7 +2165,8 @@ int engine_marktasks(struct engine *e) {
} else {
/* Run through the tasks and mark as skip or not. */
int extra_data[2] = {e->ti_current, rebuild_space};
size_t extra_data[3] = {e->ti_current, rebuild_space, (size_t)&e->sched};
scheduler_clear_active(&e->sched);
threadpool_map(&e->threadpool, engine_marktasks_mapper, s->tasks,
s->nr_tasks, sizeof(struct task), 10000, extra_data);
rebuild_space = extra_data[1];
......@@ -2634,6 +2648,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs) {
}
/* Now, launch the calculation */
engine_print_task_counts(e);
TIMER_TIC;
engine_launch(e, e->nr_threads, mask, submask);
TIMER_TOC(timer_runners);
......
......@@ -758,14 +758,13 @@ static void runner_do_drift(struct cell *c, struct engine *e, int drift) {
/* Unskip any active tasks. */
if (c->ti_end_min == e->ti_current) {
const int forcerebuild = cell_unskip_tasks(c);
const int forcerebuild = cell_unskip_tasks(c, &e->sched);
if (forcerebuild) atomic_inc(&e->forcerebuild);
}
/* Do we really need to drift? */
if (drift) {
if (!e->drift_all && !cell_is_drift_needed(c, ti_current))
return;
if (!e->drift_all && !cell_is_drift_needed(c, ti_current)) return;
} else {
/* Not drifting, but may still need to recurse for task skipping. */
......@@ -813,8 +812,8 @@ static void runner_do_drift(struct cell *c, struct engine *e, int drift) {
/* Compute (square of) motion since last cell construction */
const float dx2 = gp->x_diff[0] * gp->x_diff[0] +
gp->x_diff[1] * gp->x_diff[1] +
gp->x_diff[2] * gp->x_diff[2];
gp->x_diff[1] * gp->x_diff[1] +
gp->x_diff[2] * gp->x_diff[2];
dx2_max = (dx2_max > dx2) ? dx2_max : dx2;
}
......@@ -831,8 +830,8 @@ static void runner_do_drift(struct cell *c, struct engine *e, int drift) {
/* Compute (square of) motion since last cell construction */
const float dx2 = xp->x_diff[0] * xp->x_diff[0] +
xp->x_diff[1] * xp->x_diff[1] +
xp->x_diff[2] * xp->x_diff[2];
xp->x_diff[1] * xp->x_diff[1] +
xp->x_diff[2] * xp->x_diff[2];
dx2_max = (dx2_max > dx2) ? dx2_max : dx2;
/* Maximal smoothing length */
......@@ -841,7 +840,7 @@ static void runner_do_drift(struct cell *c, struct engine *e, int drift) {
/* Now collect quantities for statistics */
const float half_dt =
(ti_current - (p->ti_begin + p->ti_end) / 2) * timeBase;
(ti_current - (p->ti_begin + p->ti_end) / 2) * timeBase;
const double x[3] = {p->x[0], p->x[1], p->x[2]};
const float v[3] = {xp->v_full[0] + p->a_hydro[0] * half_dt,
xp->v_full[1] + p->a_hydro[1] * half_dt,
......
......@@ -42,7 +42,6 @@
#include "atomic.h"
#include "const.h"
#include "cycle.h"
#include "engine.h"
#include "error.h"
#include "intrinsics.h"
#include "kernel_hydro.h"
......@@ -51,6 +50,24 @@
#include "task.h"
#include "timers.h"
/**
* @brief Add a task to the list of active tasks.
*
* @param s The #scheduler.
* @param t The task to be added.
*/
void scheduler_add_active(struct scheduler *s, struct task *t) {
int ind = atomic_inc(&s->active_count);
s->tid_active[ind] = t - s->tasks;
}
/**
* @brief Re-set the list of active tasks.
*/
void scheduler_clear_active(struct scheduler *s) { s->active_count = 0; }
/**
* @brief Add an unlock_task to the given task.
*
......@@ -864,6 +881,7 @@ void scheduler_reset(struct scheduler *s, int size) {
/* Free existing task lists if necessary. */
if (s->tasks != NULL) free(s->tasks);
if (s->tasks_ind != NULL) free(s->tasks_ind);
if (s->tid_active != NULL) free(s->tid_active);
/* Allocate the new lists. */
if (posix_memalign((void *)&s->tasks, task_align,
......@@ -872,6 +890,9 @@ void scheduler_reset(struct scheduler *s, int size) {
if ((s->tasks_ind = (int *)malloc(sizeof(int) * size)) == NULL)
error("Failed to allocate task lists.");
if ((s->tid_active = (int *)malloc(sizeof(int) * size)) == NULL)
error("Failed to allocate aactive task lists.");
}
/* Reset the counters. */
......@@ -883,6 +904,7 @@ void scheduler_reset(struct scheduler *s, int size) {
s->submask = 0;
s->nr_unlocks = 0;
s->completed_unlock_writes = 0;
s->active_count = 0;
/* Set the task pointers in the queues. */
for (int k = 0; k < s->nr_queues; k++) s->queues[k].tasks = s->tasks;
......@@ -892,11 +914,11 @@ void scheduler_reset(struct scheduler *s, int size) {
* @brief Compute the task weights
*
* @param s The #scheduler.
* @param verbose Are we talkative ?
* @param verbose Are we talkative?
*/
void scheduler_reweight(struct scheduler *s, int verbose) {
const ticks tic = getticks();
const int nr_tasks = s->nr_tasks;
int *tid = s->tasks_ind;
struct task *tasks = s->tasks;
......@@ -905,6 +927,7 @@ void scheduler_reweight(struct scheduler *s, int verbose) {
0.4025, 0.1897, 0.4025, 0.1897, 0.4025,
0.5788, 0.4025, 0.5788};
const float wscale = 0.001;
const ticks tic = getticks();
/* Run through the tasks backwards and set their weights. */
for (int k = nr_tasks - 1; k >= 0; k--) {
......@@ -1040,7 +1063,7 @@ void scheduler_start(struct scheduler *s, unsigned int mask,
s->mask = mask;
s->submask = submask | (1 << task_subtype_none);
/* Clear all the waits, rids and times. */
/* Clear all the waits, rids, and times. */
for (int k = 0; k < s->nr_tasks; k++) {
s->tasks[k].wait = 1;
s->tasks[k].rid = -1;
......@@ -1049,6 +1072,7 @@ void scheduler_start(struct scheduler *s, unsigned int mask,
}
/* Re-wait the tasks. */
message("sheduler_rewait_mapper...");
threadpool_map(s->threadpool, scheduler_rewait_mapper, s->tasks, s->nr_tasks,
sizeof(struct task), 1000, s);
......@@ -1095,8 +1119,17 @@ void scheduler_start(struct scheduler *s, unsigned int mask,
#endif
/* Loop over the tasks and enqueue whoever is ready. */
/* message("sheduler_enqueue_mapper...");
threadpool_map(s->threadpool, scheduler_enqueue_mapper, s->tasks_ind,
s->nr_tasks, sizeof(int), 1000, s);
s->nr_tasks, sizeof(int), 1000, s); */
for (int k = 0; k < s->active_count; k++) {
struct task *t = &s->tasks[s->tid_active[k]];
if (atomic_dec(&t->wait) == 1 && !t->skip && ((1 << t->type) & s->mask) &&
((1 << t->subtype) & s->submask)) {
scheduler_enqueue(s, t);
pthread_cond_signal(&s->sleep_cond);
}
}
/* To be safe, fire of one last sleep_cond in a safe way. */
pthread_mutex_lock(&s->sleep_mutex);
......
......@@ -83,6 +83,10 @@ struct scheduler {
/* The task indices. */
int *tasks_ind;
/* List of initial tasks. */
int *tid_active;
int active_count;
/* The task unlocks. */
struct task **volatile unlocks;
int *volatile unlock_ind;
......@@ -106,6 +110,8 @@ struct scheduler {
};
/* Function prototypes. */
void scheduler_add_active(struct scheduler *s, struct task *t);
void scheduler_clear_active(struct scheduler *s);
void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
int nr_queues, unsigned int flags, int nodeID,
struct threadpool *tp);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment