Commit e1db896f authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'no_task_mask' into 'master'

No task mask

Implementation of #235. There is no such thing as a task mask any more. All tasks in the scheduler get enqueued and executed provided they have not had their skip flag set.

For the special case of the 0th time-step (where we don't compute forces nor kick) there is an extra function in the engine that will set these tasks to be skipped.

See merge request !276
parents 91001c5d 89fc7fd5
......@@ -2503,16 +2503,34 @@ void engine_print_stats(struct engine *e) {
clocks_getunit());
}
/**
* @brief Sets all the force and kick tasks to be skipped.
*
* @param e The #engine to act on.
*/
void engine_skip_force_and_kick(struct engine *e) {
struct task *tasks = e->sched.tasks;
const int nr_tasks = e->sched.nr_tasks;
for (int i = 0; i < nr_tasks; ++i) {
struct task *t = &tasks[i];
/* Skip everything that updates the particles */
if (t->subtype == task_subtype_force || t->type == task_type_kick ||
t->type == task_type_cooling || t->type == task_type_sourceterms)
t->skip = 1;
}
}
/**
* @brief Launch the runners.
*
* @param e The #engine.
* @param nr_runners The number of #runner to let loose.
* @param mask The task mask to launch.
* @param submask The sub-task mask to launch.
*/
void engine_launch(struct engine *e, int nr_runners, unsigned int mask,
unsigned int submask) {
void engine_launch(struct engine *e, int nr_runners) {
const ticks tic = getticks();
......@@ -2527,7 +2545,7 @@ void engine_launch(struct engine *e, int nr_runners, unsigned int mask,
/* Load the tasks. */
pthread_mutex_unlock(&e->barrier_mutex);
scheduler_start(&e->sched, mask, submask);
scheduler_start(&e->sched);
pthread_mutex_lock(&e->barrier_mutex);
/* Remove the safeguard. */
......@@ -2567,61 +2585,12 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs) {
engine_marktasks(e);
/* Build the masks corresponding to the policy */
unsigned int mask = 0;
unsigned int submask = 0;
/* We always have sort tasks */
mask |= 1 << task_type_sort;
mask |= 1 << task_type_init;
/* Add the tasks corresponding to hydro operations to the masks */
if (e->policy & engine_policy_hydro) {
mask |= 1 << task_type_self;
mask |= 1 << task_type_pair;
mask |= 1 << task_type_sub_self;
mask |= 1 << task_type_sub_pair;
mask |= 1 << task_type_ghost;
submask |= 1 << task_subtype_density;
}
/* Add the tasks corresponding to self-gravity to the masks */
if (e->policy & engine_policy_self_gravity) {
mask |= 1 << task_type_grav_up;
mask |= 1 << task_type_grav_mm;
mask |= 1 << task_type_grav_gather_m;
mask |= 1 << task_type_grav_fft;
mask |= 1 << task_type_self;
mask |= 1 << task_type_pair;
mask |= 1 << task_type_sub_self;
mask |= 1 << task_type_sub_pair;
submask |= 1 << task_subtype_grav;
}
/* Add the tasks corresponding to external gravity to the masks */
if (e->policy & engine_policy_external_gravity) {
mask |= 1 << task_type_self;
mask |= 1 << task_type_sub_self;
submask |= 1 << task_subtype_external_grav;
}
/* Add MPI tasks if need be */
if (e->policy & engine_policy_mpi) {
mask |= 1 << task_type_send;
mask |= 1 << task_type_recv;
submask |= 1 << task_subtype_tend;
}
/* No time integration. We just want the density and ghosts */
engine_skip_force_and_kick(e);
/* Now, launch the calculation */
TIMER_TIC;
engine_launch(e, e->nr_threads, mask, submask);
engine_launch(e, e->nr_threads);
TIMER_TOC(timer_runners);
/* Apply some conversions (e.g. internal energy -> entropy) */
......@@ -2631,8 +2600,11 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs) {
space_map_cells_pre(s, 0, cell_convert_hydro, NULL);
/* Correct what we did (e.g. in PE-SPH, need to recompute rho_bar) */
if (hydro_need_extra_init_loop)
engine_launch(e, e->nr_threads, mask, submask);
if (hydro_need_extra_init_loop) {
engine_marktasks(e);
engine_skip_force_and_kick(e);
engine_launch(e, e->nr_threads);
}
}
clocks_gettime(&time2);
......@@ -2710,7 +2682,7 @@ void engine_step(struct engine *e) {
/* Drift only the necessary particles, that means all particles
* if we are about to repartition. */
int repart = (e->forcerepart != REPART_NONE);
const int repart = (e->forcerepart != REPART_NONE);
e->drift_all = repart || e->drift_all;
engine_drift(e);
......@@ -2723,85 +2695,11 @@ void engine_step(struct engine *e) {
/* Restore the default drifting policy */
e->drift_all = (e->policy & engine_policy_drift_all);
/* Build the masks corresponding to the policy */
unsigned int mask = 0, submask = 0;
/* We always have sort tasks and init tasks */
mask |= 1 << task_type_sort;
mask |= 1 << task_type_init;
/* Add the correct kick task */
if (e->policy & engine_policy_fixdt) {
mask |= 1 << task_type_kick_fixdt;
} else {
mask |= 1 << task_type_kick;
}
/* Add the tasks corresponding to hydro operations to the masks */
if (e->policy & engine_policy_hydro) {
mask |= 1 << task_type_self;
mask |= 1 << task_type_pair;
mask |= 1 << task_type_sub_self;
mask |= 1 << task_type_sub_pair;
mask |= 1 << task_type_ghost;
submask |= 1 << task_subtype_density;
submask |= 1 << task_subtype_force;
#ifdef EXTRA_HYDRO_LOOP
mask |= 1 << task_type_extra_ghost;
submask |= 1 << task_subtype_gradient;
#endif
}
/* Add the tasks corresponding to self-gravity to the masks */
if (e->policy & engine_policy_self_gravity) {
mask |= 1 << task_type_grav_up;
mask |= 1 << task_type_grav_mm;
mask |= 1 << task_type_grav_gather_m;
mask |= 1 << task_type_grav_fft;
mask |= 1 << task_type_self;
mask |= 1 << task_type_pair;
mask |= 1 << task_type_sub_self;
mask |= 1 << task_type_sub_pair;
submask |= 1 << task_subtype_grav;
}
/* Add the tasks corresponding to external gravity to the masks */
if (e->policy & engine_policy_external_gravity) {
mask |= 1 << task_type_self;
mask |= 1 << task_type_sub_self;
submask |= 1 << task_subtype_external_grav;
}
/* Add the tasks corresponding to cooling to the masks */
if (e->policy & engine_policy_cooling) {
mask |= 1 << task_type_cooling;
}
/* Add the tasks corresponding to sourceterms to the masks */
if (e->policy & engine_policy_sourceterms) {
mask |= 1 << task_type_sourceterms;
}
/* Add MPI tasks if need be */
if (e->policy & engine_policy_mpi) {
mask |= 1 << task_type_send;
mask |= 1 << task_type_recv;
submask |= 1 << task_subtype_tend;
}
if (e->verbose) engine_print_task_counts(e);
/* Send off the runners. */
TIMER_TIC;
engine_launch(e, e->nr_threads, mask, submask);
engine_launch(e, e->nr_threads);
TIMER_TOC(timer_runners);
/* Save some statistics */
......
......@@ -230,8 +230,7 @@ void engine_init(struct engine *e, struct space *s,
const struct external_potential *potential,
const struct cooling_function_data *cooling,
struct sourceterms *sourceterms);
void engine_launch(struct engine *e, int nr_runners, unsigned int mask,
unsigned int submask);
void engine_launch(struct engine *e, int nr_runners);
void engine_prepare(struct engine *e, int nodrift);
void engine_print(struct engine *e);
void engine_init_particles(struct engine *e, int flag_entropy_ICs);
......
......@@ -42,6 +42,7 @@
#include "atomic.h"
#include "const.h"
#include "cycle.h"
#include "engine.h"
#include "error.h"
#include "intrinsics.h"
#include "kernel_hydro.h"
......@@ -53,7 +54,6 @@
/**
* @brief Re-set the list of active tasks.
*/
void scheduler_clear_active(struct scheduler *s) { s->active_count = 0; }
/**
......@@ -63,7 +63,6 @@ void scheduler_clear_active(struct scheduler *s) { s->active_count = 0; }
* @param ta The unlocking #task.
* @param tb The #task that will be unlocked.
*/
void scheduler_addunlock(struct scheduler *s, struct task *ta,
struct task *tb) {
/* Get an index at which to store this unlock. */
......@@ -113,7 +112,6 @@ void scheduler_addunlock(struct scheduler *s, struct task *ta,
* @param t The #task
* @param s The #scheduler we are working in.
*/
static void scheduler_splittask(struct task *t, struct scheduler *s) {
/* Static constants. */
......@@ -652,7 +650,6 @@ static void scheduler_splittask(struct task *t, struct scheduler *s) {
* @param num_elements the number of tasks.
* @param extra_data The #scheduler we are working in.
*/
void scheduler_splittasks_mapper(void *map_data, int num_elements,
void *extra_data) {
......@@ -666,6 +663,11 @@ void scheduler_splittasks_mapper(void *map_data, int num_elements,
}
}
/**
* @brief Splits all the tasks in the scheduler that are too large.
*
* @param s The #scheduler.
*/
void scheduler_splittasks(struct scheduler *s) {
/* Call the mapper on each current task. */
......@@ -680,12 +682,11 @@ void scheduler_splittasks(struct scheduler *s) {
* @param type The type of the task.
* @param subtype The sub-type of the task.
* @param flags The flags of the task.
* @param wait
* @param wait The number of unsatisfied dependencies of this task.
* @param ci The first cell to interact.
* @param cj The second cell to interact.
* @param tight
*/
struct task *scheduler_addtask(struct scheduler *s, enum task_types type,
enum task_subtypes subtype, int flags, int wait,
struct cell *ci, struct cell *cj, int tight) {
......@@ -732,7 +733,6 @@ struct task *scheduler_addtask(struct scheduler *s, enum task_types type,
*
* @param s The #scheduler.
*/
void scheduler_set_unlocks(struct scheduler *s) {
/* Store the counts for each task. */
......@@ -799,7 +799,6 @@ void scheduler_set_unlocks(struct scheduler *s) {
*
* @param s The #scheduler.
*/
void scheduler_ranktasks(struct scheduler *s) {
struct task *tasks = s->tasks;
......@@ -865,7 +864,6 @@ void scheduler_ranktasks(struct scheduler *s) {
* @param s The #scheduler.
* @param size The maximum number of tasks in the #scheduler.
*/
void scheduler_reset(struct scheduler *s, int size) {
/* Do we need to re-allocate? */
......@@ -893,8 +891,6 @@ void scheduler_reset(struct scheduler *s, int size) {
s->nr_tasks = 0;
s->tasks_next = 0;
s->waiting = 0;
s->mask = 0;
s->submask = 0;
s->nr_unlocks = 0;
s->completed_unlock_writes = 0;
s->active_count = 0;
......@@ -909,7 +905,6 @@ void scheduler_reset(struct scheduler *s, int size) {
* @param s The #scheduler.
* @param verbose Are we talkative?
*/
void scheduler_reweight(struct scheduler *s, int verbose) {
const int nr_tasks = s->nr_tasks;
......@@ -998,19 +993,15 @@ void scheduler_reweight(struct scheduler *s, int verbose) {
* @brief #threadpool_map function which runs through the task
* graph and re-computes the task wait counters.
*/
void scheduler_rewait_mapper(void *map_data, int num_elements,
void *extra_data) {
struct scheduler *s = (struct scheduler *)extra_data;
struct task *tasks = (struct task *)map_data;
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &tasks[ind];
if (t->skip || !((1 << t->type) & s->mask) ||
!((1 << t->subtype) & s->submask))
continue;
if (t->skip) continue;
/* Skip sort tasks that have already been performed */
if (t->type == task_type_sort && t->flags == 0) {
......@@ -1032,8 +1023,7 @@ void scheduler_enqueue_mapper(void *map_data, int num_elements,
struct task *tasks = s->tasks;
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &tasks[tid[ind]];
if (atomic_dec(&t->wait) == 1 && !t->skip && ((1 << t->type) & s->mask) &&
((1 << t->subtype) & s->submask)) {
if (atomic_dec(&t->wait) == 1 && !t->skip) {
scheduler_enqueue(s, t);
}
}
......@@ -1044,76 +1034,66 @@ void scheduler_enqueue_mapper(void *map_data, int num_elements,
* @brief Start the scheduler, i.e. fill the queues with ready tasks.
*
* @param s The #scheduler.
* @param mask The task types to enqueue.
* @param submask The sub-task types to enqueue.
*/
void scheduler_start(struct scheduler *s) {
void scheduler_start(struct scheduler *s, unsigned int mask,
unsigned int submask) {
/* Store the masks */
s->mask = mask;
s->submask = submask | (1 << task_subtype_none);
/* Clear all the waits, rids, and times. */
for (int k = 0; k < s->nr_tasks; k++) {
s->tasks[k].wait = 1;
if (((1 << s->tasks[k].type) & mask) == 0 ||
((1 << s->tasks[k].subtype) & s->submask) == 0)
s->tasks[k].skip = 1;
}
/* Clear all the waits. */
for (int k = 0; k < s->nr_tasks; k++) s->tasks[k].wait = 1;
/* Re-wait the tasks. */
threadpool_map(s->threadpool, scheduler_rewait_mapper, s->tasks, s->nr_tasks,
sizeof(struct task), 1000, s);
sizeof(struct task), 1000, NULL);
/* Check we have not missed an active task */
#ifdef SWIFT_DEBUG_CHECKS
const int ti_current = s->space->e->ti_current;
for (int k = 0; k < s->nr_tasks; k++) {
struct task *t = &s->tasks[k];
struct cell *ci = t->ci;
struct cell *cj = t->cj;
if (cj == NULL) { /* self */
if (ci->ti_end_min == ti_current && t->skip && t->type != task_type_sort)
error(
"Task (type='%s/%s') should not have been skipped ti_current=%d "
"c->ti_end_min=%d",
taskID_names[t->type], subtaskID_names[t->subtype], ti_current,
ci->ti_end_min);
/* Special treatment for sort tasks */
if (ci->ti_end_min == ti_current && t->skip &&
t->type == task_type_sort && t->flags == 0)
error(
"Task (type='%s/%s') should not have been skipped ti_current=%d "
"c->ti_end_min=%d t->flags=%d",
taskID_names[t->type], subtaskID_names[t->subtype], ti_current,
ci->ti_end_min, t->flags);
} else { /* pair */
if ((ci->ti_end_min == ti_current || cj->ti_end_min == ti_current) &&
t->skip)
error(
"Task (type='%s/%s') should not have been skipped ti_current=%d "
"ci->ti_end_min=%d cj->ti_end_min=%d",
taskID_names[t->type], subtaskID_names[t->subtype], ti_current,
ci->ti_end_min, cj->ti_end_min);
if (ti_current > 0) {
for (int k = 0; k < s->nr_tasks; k++) {
struct task *t = &s->tasks[k];
struct cell *ci = t->ci;
struct cell *cj = t->cj;
if (cj == NULL) { /* self */
if (ci->ti_end_min == ti_current && t->skip &&
t->type != task_type_sort)
error(
"Task (type='%s/%s') should not have been skipped ti_current=%d "
"c->ti_end_min=%d",
taskID_names[t->type], subtaskID_names[t->subtype], ti_current,
ci->ti_end_min);
/* Special treatment for sort tasks */
if (ci->ti_end_min == ti_current && t->skip &&
t->type == task_type_sort && t->flags == 0)
error(
"Task (type='%s/%s') should not have been skipped ti_current=%d "
"c->ti_end_min=%d t->flags=%d",
taskID_names[t->type], subtaskID_names[t->subtype], ti_current,
ci->ti_end_min, t->flags);
} else { /* pair */
if ((ci->ti_end_min == ti_current || cj->ti_end_min == ti_current) &&
t->skip)
error(
"Task (type='%s/%s') should not have been skipped ti_current=%d "
"ci->ti_end_min=%d cj->ti_end_min=%d",
taskID_names[t->type], subtaskID_names[t->subtype], ti_current,
ci->ti_end_min, cj->ti_end_min);
}
}
}
#endif
/* Loop over the tasks and enqueue whoever is ready. */
for (int k = 0; k < s->active_count; k++) {
struct task *t = &s->tasks[s->tid_active[k]];
if (atomic_dec(&t->wait) == 1 && !t->skip && ((1 << t->type) & s->mask) &&
((1 << t->subtype) & s->submask)) {
if (atomic_dec(&t->wait) == 1 && !t->skip) {
scheduler_enqueue(s, t);
pthread_cond_signal(&s->sleep_cond);
}
......@@ -1134,17 +1114,13 @@ void scheduler_start(struct scheduler *s, unsigned int mask,
* @param s The #scheduler.
* @param t The #task.
*/
void scheduler_enqueue(struct scheduler *s, struct task *t) {
/* The target queue for this task. */
int qid = -1;
/* Ignore skipped tasks and tasks not in the masks. */
if (t->skip || (1 << t->type) & ~(s->mask) ||
(1 << t->subtype) & ~(s->submask)) {
return;
}
/* Ignore skipped tasks */
if (t->skip) return;
/* If this is an implicit task, just pretend it's done. */
if (t->implicit) {
......@@ -1247,7 +1223,6 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
* @return A pointer to the next task, if a suitable one has
* been identified.
*/
struct task *scheduler_done(struct scheduler *s, struct task *t) {
/* Release whatever locks this task held. */
......@@ -1295,7 +1270,6 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) {
* @return A pointer to the next task, if a suitable one has
* been identified.
*/
struct task *scheduler_unlock(struct scheduler *s, struct task *t) {
/* Loop through the dependencies and add them to a queue if
......@@ -1336,7 +1310,6 @@ struct task *scheduler_unlock(struct scheduler *s, struct task *t) {
*
* @return A pointer to a #task or @c NULL if there are no available tasks.
*/
struct task *scheduler_gettask(struct scheduler *s, int qid,
const struct task *prev) {
......@@ -1421,7 +1394,6 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
* @param nodeID The MPI rank
* @param tp Parallel processing threadpool.
*/
void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
int nr_queues, unsigned int flags, int nodeID,
struct threadpool *tp) {
......
......@@ -60,12 +60,6 @@ struct scheduler {
/* Scheduler flags. */
unsigned int flags;
/* Scheduler task mask */
unsigned int mask;
/* Scheduler sub-task mask */
unsigned int submask;
/* Number of queues in this scheduler. */
int nr_queues;
......@@ -117,7 +111,6 @@ struct scheduler {
* @param s The #scheduler.
* @param t The task to be added.
*/
__attribute__((always_inline)) INLINE static void scheduler_activate(
struct scheduler *s, struct task *t) {
if (atomic_cas(&t->skip, 1, 0)) {
......@@ -134,8 +127,7 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
struct task *scheduler_gettask(struct scheduler *s, int qid,
const struct task *prev);
void scheduler_enqueue(struct scheduler *s, struct task *t);
void scheduler_start(struct scheduler *s, unsigned int mask,
unsigned int submask);
void scheduler_start(struct scheduler *s);
void scheduler_reset(struct scheduler *s, int nr_tasks);
void scheduler_ranktasks(struct scheduler *s);
void scheduler_reweight(struct scheduler *s, int verbose);
......
......@@ -372,32 +372,6 @@ int task_lock(struct task *t) {
return 1;
}
/**
* @brief Prints the list of tasks contained in a given mask
*
* @param mask The mask to analyse
*/
void task_print_mask(unsigned int mask) {
printf("task_print_mask: The tasks to run are [");
for (int k = 1; k < task_type_count; k++)
printf(" %s=%s", taskID_names[k], (mask & (1 << k)) ? "yes" : "no");
printf(" ]\n");
}
/**
* @brief Prints the list of subtasks contained in a given submask
*
* @param submask The submask to analyse
*/
void task_print_submask(unsigned int submask) {
printf("task_print_submask: The subtasks to run are [");
for (int k = 1; k < task_subtype_count; k++)
printf(" %s=%s", subtaskID_names[k], (submask & (1 << k)) ? "yes" : "no");
printf(" ]\n");
}
/**
* @brief Print basic information about a task.
*
......
......@@ -164,8 +164,6 @@ struct task {
void task_unlock(struct task *t);
float task_overlap(const struct task *ta, const struct task *tb);
int task_lock(struct task *t);
void task_print_mask(unsigned int mask);
void task_print_submask(unsigned int submask);
void task_do_rewait(struct task *t);
void task_print(const struct task *t);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment