diff --git a/src/Makefile.am b/src/Makefile.am index 4d6a67c7569464486a017d8306c3e54730ebb3b2..ace9b6ff553b25d305060ba2e40affd7bdb277e9 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -42,7 +42,8 @@ endif include_HEADERS = space.h runner.h queue.h task.h lock.h cell.h part.h const.h \ engine.h swift.h serial_io.h timers.h debug.h scheduler.h proxy.h parallel_io.h \ common_io.h single_io.h multipole.h map.h tools.h partition.h clocks.h parser.h \ - physical_constants.h physical_constants_cgs.h potentials.h version.h hydro_properties.h + physical_constants.h physical_constants_cgs.h potentials.h version.h \ + hydro_properties.h threadpool.h # Common source files AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \ @@ -50,7 +51,7 @@ AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \ units.c common_io.c single_io.c multipole.c version.c map.c \ kernel_hydro.c tools.c part.c partition.c clocks.c parser.c \ physical_constants.c potentials.c hydro_properties.c \ - runner_doiact_fft.c + runner_doiact_fft.c threadpool.c # Include files for distribution, not installation. nobase_noinst_HEADERS = approx_math.h atomic.h cycle.h error.h inline.h kernel_hydro.h kernel_gravity.h \ diff --git a/src/atomic.h b/src/atomic.h index 0b87a0f77e17bafc64a2a59b3c70bda782fc14d4..be24f96e5a9d2e955132f0d6d34bdfa58bc1649c 100644 --- a/src/atomic.h +++ b/src/atomic.h @@ -23,6 +23,7 @@ #include "inline.h" #define atomic_add(v, i) __sync_fetch_and_add(v, i) +#define atomic_or(v, i) __sync_fetch_and_or(v, i) #define atomic_inc(v) atomic_add(v, 1) #define atomic_dec(v) atomic_add(v, -1) #define atomic_cas(v, o, n) __sync_val_compare_and_swap(v, o, n) diff --git a/src/cell.h b/src/cell.h index d794579e64854ef80f1310809cad7270fa4dcbd2..43f542254ecb692b5c4d86beddd892a4e242cdc9 100644 --- a/src/cell.h +++ b/src/cell.h @@ -122,7 +122,7 @@ struct cell { int nr_density, nr_gradient, nr_force, nr_grav; /* The hierarchical tasks. */ - struct task *extra_ghost, *ghost, *init, *drift, *kick; + struct task *extra_ghost, *ghost, *init, *kick; /* Task receiving data. */ struct task *recv_xv, *recv_rho, *recv_gradient, *recv_ti; diff --git a/src/debug.c b/src/debug.c index ca77745ec9f905d85403445ff3d8ef2de16034e7..dd8e50bb0f3ad8be8a4746d33f226a0b527fa75a 100644 --- a/src/debug.c +++ b/src/debug.c @@ -24,17 +24,20 @@ #include "../config.h" /* Some standard headers. */ +#include <float.h> #include <stdio.h> /* This object's header. */ #include "debug.h" /* Local includes. */ -#include "config.h" +#include "cell.h" #include "const.h" +#include "engine.h" #include "hydro.h" #include "inline.h" #include "part.h" +#include "space.h" /* Import the right hydro definition */ #if defined(MINIMAL_SPH) @@ -139,6 +142,54 @@ void printgParticle_single(struct gpart *gp) { printf("\n"); } +/** + * @brief Check that the cells and particles of a space have consistent h_max + * values. + * + * @param s the space. + * @result 1 or 0 + */ +int checkSpacehmax(struct space *s) { + + /* Loop over local cells. */ + float cell_h_max = 0.0f; + for (int k = 0; k < s->nr_cells; k++) { + if (s->cells[k].nodeID == s->e->nodeID && s->cells[k].h_max > cell_h_max) { + cell_h_max = s->cells[k].h_max; + } + } + + /* Now all particles. */ + float part_h_max = 0.0f; + for (int k = 0; k < s->nr_parts; k++) { + if (s->parts[k].h > part_h_max) { + part_h_max = s->parts[k].h; + } + } + + /* If within some epsilon we are OK. */ + if (abs(cell_h_max - part_h_max) <= FLT_EPSILON) return 1; + + /* There is a problem. Hunt it down. */ + for (int k = 0; k < s->nr_cells; k++) { + if (s->cells[k].nodeID == s->e->nodeID) { + if (s->cells[k].h_max > part_h_max) { + message("cell %d is inconsistent (%f > %f)", k, s->cells[k].h_max, + part_h_max); + } + } + } + + for (int k = 0; k < s->nr_parts; k++) { + if (s->parts[k].h > cell_h_max) { + message("part %lld is inconsistent (%f > %f)", s->parts[k].id, + s->parts[k].h, cell_h_max); + } + } + + return 0; +} + #ifdef HAVE_METIS /** diff --git a/src/debug.h b/src/debug.h index 367241201977d9b79a8c2913dbae5d08f1148529..22b63820745ca7282b7699f0be09e493238d83c2 100644 --- a/src/debug.h +++ b/src/debug.h @@ -22,6 +22,7 @@ /* Includes. */ #include "cell.h" #include "part.h" +#include "space.h" void printParticle(const struct part *parts, struct xpart *xparts, long long int id, size_t N); @@ -30,6 +31,8 @@ void printgParticle(const struct gpart *gparts, const struct part *parts, void printParticle_single(const struct part *p, const struct xpart *xp); void printgParticle_single(struct gpart *gp); +int checkSpacehmax(struct space *s); + #ifdef HAVE_METIS #include "metis.h" void dumpMETISGraph(const char *prefix, idx_t nvtxs, idx_t ncon, idx_t *xadj, diff --git a/src/engine.c b/src/engine.c index 5f62a4faa4564425f73f507d03f788240716aa40..7394859e4c9e18040be85cf7a3ac170fa0b16afd 100644 --- a/src/engine.c +++ b/src/engine.c @@ -94,21 +94,24 @@ static cpu_set_t entry_affinity; * @brief Link a density/force task to a cell. * * @param e The #engine. - * @param l The #link. + * @param l A pointer to the #link, will be modified atomically. * @param t The #task. * * @return The new #link pointer. */ -struct link *engine_addlink(struct engine *e, struct link *l, struct task *t) { +void engine_addlink(struct engine *e, struct link **l, struct task *t) { + + /* Get the next free link. */ const int ind = atomic_inc(&e->nr_links); if (ind >= e->size_links) { error("Link table overflow."); } struct link *res = &e->links[ind]; - res->next = l; + + /* Set it atomically. */ res->t = t; - return res; + res->next = atomic_swap(l, res); } /** @@ -144,11 +147,6 @@ void engine_make_gravity_hierarchical_tasks(struct engine *e, struct cell *c, c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0, c, NULL, 0); - /* Add the drift task. */ - if (c->drift == NULL) - c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0, - 0, c, NULL, 0); - /* Add the kick task that matches the policy. */ if (is_fixdt) { if (c->kick == NULL) @@ -206,11 +204,6 @@ void engine_make_hydro_hierarchical_tasks(struct engine *e, struct cell *c, c->init = scheduler_addtask(s, task_type_init, task_subtype_none, 0, 0, c, NULL, 0); - /* Add the drift task. */ - if (c->drift == NULL) - c->drift = scheduler_addtask(s, task_type_drift, task_subtype_none, 0, - 0, c, NULL, 0); - /* Add the kick task that matches the policy. */ if (is_fixdt) { if (c->kick == NULL) @@ -754,12 +747,12 @@ void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj, } /* Add them to the local cell. */ - ci->send_xv = engine_addlink(e, ci->send_xv, t_xv); - ci->send_rho = engine_addlink(e, ci->send_rho, t_rho); + engine_addlink(e, &ci->send_xv, t_xv); + engine_addlink(e, &ci->send_rho, t_rho); #ifdef EXTRA_HYDRO_LOOP - ci->send_gradient = engine_addlink(e, ci->send_gradient, t_gradient); + engine_addlink(e, &ci->send_gradient, t_gradient); #endif - if (t_ti != NULL) ci->send_ti = engine_addlink(e, ci->send_ti, t_ti); + if (t_ti != NULL) engine_addlink(e, &ci->send_ti, t_ti); } /* Recurse? */ @@ -784,6 +777,7 @@ void engine_addtasks_send(struct engine *e, struct cell *ci, struct cell *cj, * @param t_gradient The recv_gradient #task, if it has already been created. * @param t_ti The recv_ti #task, if required and has already been created. */ + void engine_addtasks_recv(struct engine *e, struct cell *c, struct task *t_xv, struct task *t_rho, struct task *t_gradient, struct task *t_ti) { @@ -1103,7 +1097,7 @@ void engine_exchange_strays(struct engine *e, size_t offset_parts, struct xpart *xparts_new = NULL; if (posix_memalign((void **)&parts_new, part_align, sizeof(struct part) * s->size_parts) != 0 || - posix_memalign((void **)&xparts_new, part_align, + posix_memalign((void **)&xparts_new, xpart_align, sizeof(struct xpart) * s->size_parts) != 0) error("Failed to allocate new part data."); memcpy(parts_new, s->parts, sizeof(struct part) * offset_parts); @@ -1372,12 +1366,11 @@ void engine_make_hydroloop_tasks(struct engine *e) { void engine_count_and_link_tasks(struct engine *e) { struct scheduler *sched = &e->sched; - const int nr_tasks = sched->nr_tasks; - for (int k = 0; k < nr_tasks; k++) { + for (int ind = 0; ind < sched->nr_tasks; ind++) { + + struct task *t = &sched->tasks[ind]; - /* Get the current task. */ - struct task *t = &sched->tasks[k]; if (t->skip) continue; /* Link sort tasks together. */ @@ -1392,31 +1385,31 @@ void engine_count_and_link_tasks(struct engine *e) { if (t->type == task_type_self) { atomic_inc(&t->ci->nr_tasks); if (t->subtype == task_subtype_density) { - t->ci->density = engine_addlink(e, t->ci->density, t); + engine_addlink(e, &t->ci->density, t); atomic_inc(&t->ci->nr_density); } } else if (t->type == task_type_pair) { atomic_inc(&t->ci->nr_tasks); atomic_inc(&t->cj->nr_tasks); if (t->subtype == task_subtype_density) { - t->ci->density = engine_addlink(e, t->ci->density, t); + engine_addlink(e, &t->ci->density, t); atomic_inc(&t->ci->nr_density); - t->cj->density = engine_addlink(e, t->cj->density, t); + engine_addlink(e, &t->cj->density, t); atomic_inc(&t->cj->nr_density); } } else if (t->type == task_type_sub_self) { atomic_inc(&t->ci->nr_tasks); if (t->subtype == task_subtype_density) { - t->ci->density = engine_addlink(e, t->ci->density, t); + engine_addlink(e, &t->ci->density, t); atomic_inc(&t->ci->nr_density); } } else if (t->type == task_type_sub_pair) { atomic_inc(&t->ci->nr_tasks); atomic_inc(&t->cj->nr_tasks); if (t->subtype == task_subtype_density) { - t->ci->density = engine_addlink(e, t->ci->density, t); + engine_addlink(e, &t->ci->density, t); atomic_inc(&t->ci->nr_density); - t->cj->density = engine_addlink(e, t->cj->density, t); + engine_addlink(e, &t->cj->density, t); atomic_inc(&t->cj->nr_density); } } @@ -1595,13 +1588,11 @@ static inline void engine_make_hydro_loops_dependencies(struct scheduler *sched, void engine_make_extra_hydroloop_tasks(struct engine *e) { struct scheduler *sched = &e->sched; + int nr_tasks = sched->nr_tasks; const int nodeID = e->nodeID; - const int nr_tasks = sched->nr_tasks; - - for (int k = 0; k < nr_tasks; k++) { - /* Get a pointer to the task. */ - struct task *t = &sched->tasks[k]; + for (int ind = 0; ind < nr_tasks; ind++) { + struct task *t = &sched->tasks[ind]; /* Skip? */ if (t->skip) continue; @@ -1632,7 +1623,7 @@ void engine_make_extra_hydroloop_tasks(struct engine *e) { sched, task_type_self, task_subtype_force, 0, 0, t->ci, NULL, 0); /* Add the link between the new loop and the cell */ - t->ci->force = engine_addlink(e, t->ci->force, t2); + engine_addlink(e, &t->ci->force, t2); atomic_inc(&t->ci->nr_force); /* Now, build all the dependencies for the hydro */ @@ -1676,9 +1667,9 @@ void engine_make_extra_hydroloop_tasks(struct engine *e) { sched, task_type_pair, task_subtype_force, 0, 0, t->ci, t->cj, 0); /* Add the link between the new loop and both cells */ - t->ci->force = engine_addlink(e, t->ci->force, t2); + engine_addlink(e, &t->ci->force, t2); atomic_inc(&t->ci->nr_force); - t->cj->force = engine_addlink(e, t->cj->force, t2); + engine_addlink(e, &t->cj->force, t2); atomic_inc(&t->cj->nr_force); /* Now, build all the dependencies for the hydro for the cells */ @@ -1727,7 +1718,7 @@ void engine_make_extra_hydroloop_tasks(struct engine *e) { t->flags, 0, t->ci, t->cj, 0); /* Add the link between the new loop and the cell */ - t->ci->force = engine_addlink(e, t->ci->force, t2); + engine_addlink(e, &t->ci->force, t2); atomic_inc(&t->ci->nr_force); /* Now, build all the dependencies for the hydro for the cells */ @@ -1778,9 +1769,9 @@ void engine_make_extra_hydroloop_tasks(struct engine *e) { t->flags, 0, t->ci, t->cj, 0); /* Add the link between the new loop and both cells */ - t->ci->force = engine_addlink(e, t->ci->force, t2); + engine_addlink(e, &t->ci->force, t2); atomic_inc(&t->ci->nr_force); - t->cj->force = engine_addlink(e, t->cj->force, t2); + engine_addlink(e, &t->cj->force, t2); atomic_inc(&t->cj->nr_force); /* Now, build all the dependencies for the hydro for the cells */ @@ -1855,14 +1846,6 @@ void engine_maketasks(struct engine *e) { /* Re-set the scheduler. */ scheduler_reset(sched, s->tot_cells * engine_maxtaskspercell); - /* Add the space sorting tasks. */ - for (int i = 0; i < e->nr_threads; i++) { - scheduler_addtask(sched, task_type_part_sort, task_subtype_none, i, 0, NULL, - NULL, 0); - scheduler_addtask(sched, task_type_gpart_sort, task_subtype_none, i, 0, - NULL, NULL, 0); - } - /* Construct the firt hydro loop over neighbours */ if (e->policy & engine_policy_hydro) engine_make_hydroloop_tasks(e); @@ -1892,7 +1875,7 @@ void engine_maketasks(struct engine *e) { /* Count the number of tasks associated with each cell and store the density tasks in each cell, and make each sort depend on the sorts of its progeny. */ - if (e->policy & engine_policy_hydro) engine_count_and_link_tasks(e); + engine_count_and_link_tasks(e); /* Append hierarchical tasks to each cells */ if (e->policy & engine_policy_hydro) @@ -1907,7 +1890,7 @@ void engine_maketasks(struct engine *e) { /* Run through the tasks and make force tasks for each density task. Each force task depends on the cell ghosts and unlocks the kick task of its super-cell. */ - if (e->policy & engine_policy_hydro) engine_make_extra_hydroloop_tasks(e); + engine_make_extra_hydroloop_tasks(e); /* Add the dependencies for the self-gravity stuff */ if (e->policy & engine_policy_self_gravity) engine_link_gravity_tasks(e); @@ -1956,219 +1939,241 @@ void engine_maketasks(struct engine *e) { /** * @brief Mark tasks to be skipped and set the sort flags accordingly. + * Threadpool mapper function for fixdt version. * - * @return 1 if the space has to be rebuilt, 0 otherwise. + * @param map_data pointer to the tasks + * @param num_elements number of tasks + * @param extra_data pointer to int that will define if a rebuild is needed. */ -int engine_marktasks(struct engine *e) { - - struct scheduler *s = &e->sched; - const int ti_end = e->ti_current; - const int nr_tasks = s->nr_tasks; - const int *const ind = s->tasks_ind; - struct task *tasks = s->tasks; - const ticks tic = getticks(); - - /* Much less to do here if we're on a fixed time-step. */ - if (e->policy & engine_policy_fixdt) { +void engine_marktasks_fixdt_mapper(void *map_data, int num_elements, + void *extra_data) { + /* Unpack the arguments. */ + struct task *tasks = (struct task *)map_data; + int *rebuild_space = (int *)extra_data; - /* Run through the tasks and mark as skip or not. */ - for (int k = 0; k < nr_tasks; k++) { + for (int ind = 0; ind < num_elements; ind++) { + struct task *t = &tasks[ind]; - /* Get a handle on the kth task. */ - struct task *t = &tasks[ind[k]]; + /* Pair? */ + if (t->type == task_type_pair || t->type == task_type_sub_pair) { - /* Pair? */ - if (t->type == task_type_pair || t->type == task_type_sub_pair) { + /* Local pointers. */ + const struct cell *ci = t->ci; + const struct cell *cj = t->cj; - /* Local pointers. */ - const struct cell *ci = t->ci; - const struct cell *cj = t->cj; + /* Too much particle movement? */ + if (t->tight && + (fmaxf(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin || + ci->dx_max > space_maxreldx * ci->h_max || + cj->dx_max > space_maxreldx * cj->h_max)) + *rebuild_space = 1; - /* Too much particle movement? */ - if (t->tight && - (fmaxf(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin || - ci->dx_max > space_maxreldx * ci->h_max || - cj->dx_max > space_maxreldx * cj->h_max)) - return 1; + } - } + /* Sort? */ + else if (t->type == task_type_sort) { - /* Sort? */ - else if (t->type == task_type_sort) { + /* If all the sorts have been done, make this task implicit. */ + if (!(t->flags & (t->flags ^ t->ci->sorted))) t->implicit = 1; + } + } +} - /* If all the sorts have been done, make this task implicit. */ - if (!(t->flags & (t->flags ^ t->ci->sorted))) t->implicit = 1; - } +/** + * @brief Mark any sort tasks as initially skipped. + * Threadpool mapper function. + * + * @param map_data pointer to the tasks + * @param num_elements number of tasks + * @param extra_data unused + */ +void engine_marktasks_sorts_mapper(void *map_data, int num_elements, + void *extra_data) { + /* Unpack the arguments. */ + struct task *tasks = (struct task *)map_data; + for (int ind = 0; ind < num_elements; ind++) { + struct task *t = &tasks[ind]; + if (t->type == task_type_sort) { + t->flags = 0; + t->skip = 1; } + } +} - /* Multiple-timestep case */ - } else { +/** + * @brief Mark tasks to be skipped and set the sort flags accordingly. + * Threadpool mapper function. + * + * @param map_data pointer to the tasks + * @param num_elements number of tasks + * @param extra_data pointer to int that will define if a rebuild is needed. + */ +void engine_marktasks_mapper(void *map_data, int num_elements, + void *extra_data) { + /* Unpack the arguments. */ + struct task *tasks = (struct task *)map_data; + const int ti_end = ((int *)extra_data)[0]; + int *rebuild_space = &((int *)extra_data)[1]; + + for (int ind = 0; ind < num_elements; ind++) { + struct task *t = &tasks[ind]; + + /* Single-cell task? */ + if (t->type == task_type_self || t->type == task_type_ghost || + t->type == task_type_sub_self) { + + /* Set this task's skip. */ + t->skip = (t->ci->ti_end_min > ti_end); + } - /* Run through the tasks and mark as skip or not. */ - for (int k = 0; k < nr_tasks; k++) { + /* Pair? */ + else if (t->type == task_type_pair || t->type == task_type_sub_pair) { - /* Get a handle on the kth task. */ - struct task *t = &tasks[k]; + /* Local pointers. */ + const struct cell *ci = t->ci; + const struct cell *cj = t->cj; - /* Sort-task? */ - if (t->type == task_type_sort) { + /* Too much particle movement? */ + if (t->tight && + (fmaxf(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin || + ci->dx_max > space_maxreldx * ci->h_max || + cj->dx_max > space_maxreldx * cj->h_max)) + *rebuild_space = 1; - /* Re-set the flags. */ - t->flags = 0; - t->skip = 1; + /* Set this task's skip. */ + if ((t->skip = (ci->ti_end_min > ti_end && cj->ti_end_min > ti_end)) == 1) + continue; + /* Set the sort flags. */ + if (t->type == task_type_pair && t->subtype != task_subtype_grav) { + if (!(ci->sorted & (1 << t->flags))) { + atomic_or(&ci->sorts->flags, (1 << t->flags)); + ci->sorts->skip = 0; + } + if (!(cj->sorted & (1 << t->flags))) { + atomic_or(&cj->sorts->flags, (1 << t->flags)); + cj->sorts->skip = 0; + } } - /* Send/recv-task? */ - else if (t->type == task_type_send || t->type == task_type_recv) { - t->skip = 1; + /* Activate the send/recv flags. */ + if (ci->nodeID != engine_rank) { + + /* Activate the tasks to recv foreign cell ci's data. */ + ci->recv_xv->skip = 0; + ci->recv_rho->skip = 0; + ci->recv_ti->skip = 0; + + /* Look for the local cell cj's send tasks. */ + struct link *l = NULL; + for (l = cj->send_xv; l != NULL && l->t->cj->nodeID != ci->nodeID; + l = l->next) + ; + if (l == NULL) error("Missing link to send_xv task."); + l->t->skip = 0; + + for (l = cj->send_rho; l != NULL && l->t->cj->nodeID != ci->nodeID; + l = l->next) + ; + if (l == NULL) error("Missing link to send_rho task."); + l->t->skip = 0; + + for (l = cj->send_ti; l != NULL && l->t->cj->nodeID != ci->nodeID; + l = l->next) + ; + if (l == NULL) error("Missing link to send_ti task."); + l->t->skip = 0; + + } else if (cj->nodeID != engine_rank) { + + /* Activate the tasks to recv foreign cell cj's data. */ + cj->recv_xv->skip = 0; + cj->recv_rho->skip = 0; + cj->recv_ti->skip = 0; + /* Look for the local cell ci's send tasks. */ + struct link *l = NULL; + for (l = ci->send_xv; l != NULL && l->t->cj->nodeID != cj->nodeID; + l = l->next) + ; + if (l == NULL) error("Missing link to send_xv task."); + l->t->skip = 0; + + for (l = ci->send_rho; l != NULL && l->t->cj->nodeID != cj->nodeID; + l = l->next) + ; + if (l == NULL) error("Missing link to send_rho task."); + l->t->skip = 0; + + for (l = ci->send_ti; l != NULL && l->t->cj->nodeID != cj->nodeID; + l = l->next) + ; + if (l == NULL) error("Missing link to send_ti task."); + l->t->skip = 0; } } - /* Run through the tasks and mark as skip or not. */ - for (int k = 0; k < nr_tasks; k++) { + /* Kick? */ + else if (t->type == task_type_kick) { + t->skip = (t->ci->ti_end_min > ti_end); + t->ci->updated = 0; + t->ci->g_updated = 0; + } - /* Get a handle on the kth task. */ - struct task *t = &tasks[k]; + /* Init? */ + else if (t->type == task_type_init) { + /* Set this task's skip. */ + t->skip = (t->ci->ti_end_min > ti_end); + } - /* Skip sorts, sends, and recvs. */ - if (t->type == task_type_sort || t->type == task_type_send || - t->type == task_type_recv) { - continue; - } + /* None? */ + else if (t->type == task_type_none) + t->skip = 1; + } +} - /* Single-cell task? */ - else if (t->type == task_type_self || t->type == task_type_ghost || - t->type == task_type_sub_self) { +/** + * @brief Mark tasks to be skipped and set the sort flags accordingly. + * + * @return 1 if the space has to be rebuilt, 0 otherwise. + */ +int engine_marktasks(struct engine *e) { - /* Set this task's skip. */ - t->skip = (t->ci->ti_end_min > ti_end); - } + struct scheduler *s = &e->sched; + const ticks tic = getticks(); + int rebuild_space = 0; - /* Pair? */ - else if (t->type == task_type_pair || t->type == task_type_sub_pair) { - - /* Local pointers. */ - const struct cell *ci = t->ci; - const struct cell *cj = t->cj; - - /* Too much particle movement? */ - if (t->tight && - (fmaxf(ci->h_max, cj->h_max) + ci->dx_max + cj->dx_max > cj->dmin || - ci->dx_max > space_maxreldx * ci->h_max || - cj->dx_max > space_maxreldx * cj->h_max)) - return 1; - - /* Set this task's skip. */ - if ((t->skip = (ci->ti_end_min > ti_end && cj->ti_end_min > ti_end)) == - 1) - continue; - - /* Set the sort flags. */ - if (t->type == task_type_pair && t->subtype != task_subtype_grav) { - if (!(ci->sorted & (1 << t->flags))) { - ci->sorts->flags |= (1 << t->flags); - ci->sorts->skip = 0; - } - if (!(cj->sorted & (1 << t->flags))) { - cj->sorts->flags |= (1 << t->flags); - cj->sorts->skip = 0; - } - } + /* Much less to do here if we're on a fixed time-step. */ + if (e->policy & engine_policy_fixdt) { - /* Activate the send/recv flags. */ - if (ci->nodeID != e->nodeID) { - - /* Activate the tasks to recv foreign cell ci's data. */ - ci->recv_xv->skip = 0; - ci->recv_rho->skip = 0; - ci->recv_gradient->skip = 0; - ci->recv_ti->skip = 0; - - /* Look for the local cell cj's send tasks. */ - struct link *l = NULL; - for (l = cj->send_xv; l != NULL && l->t->cj->nodeID != ci->nodeID; - l = l->next) - ; - if (l == NULL) error("Missing link to send_xv task."); - l->t->skip = 0; - - for (l = cj->send_rho; l != NULL && l->t->cj->nodeID != ci->nodeID; - l = l->next) - ; - if (l == NULL) error("Missing link to send_rho task."); - l->t->skip = 0; - - for (l = cj->send_gradient; - l != NULL && l->t->cj->nodeID != ci->nodeID; l = l->next) - ; - if (l == NULL) error("Missing link to send_gradient task."); - l->t->skip = 0; - - for (l = cj->send_ti; l != NULL && l->t->cj->nodeID != ci->nodeID; - l = l->next) - ; - if (l == NULL) error("Missing link to send_ti task."); - l->t->skip = 0; - - } else if (cj->nodeID != e->nodeID) { - - /* Activate the tasks to recv foreign cell cj's data. */ - cj->recv_xv->skip = 0; - cj->recv_rho->skip = 0; - cj->recv_gradient->skip = 0; - cj->recv_ti->skip = 0; - - /* Look for the local cell ci's send tasks. */ - struct link *l = NULL; - for (l = ci->send_xv; l != NULL && l->t->cj->nodeID != cj->nodeID; - l = l->next) - ; - if (l == NULL) error("Missing link to send_xv task."); - l->t->skip = 0; - - for (l = ci->send_rho; l != NULL && l->t->cj->nodeID != cj->nodeID; - l = l->next) - ; - if (l == NULL) error("Missing link to send_rho task."); - l->t->skip = 0; - - for (l = ci->send_gradient; - l != NULL && l->t->cj->nodeID != cj->nodeID; l = l->next) - ; - if (l == NULL) error("Missing link to send_gradient task."); - l->t->skip = 0; - - for (l = ci->send_ti; l != NULL && l->t->cj->nodeID != cj->nodeID; - l = l->next) - ; - if (l == NULL) error("Missing link to send_ti task."); - l->t->skip = 0; - } + /* Run through the tasks and mark as skip or not. */ + threadpool_map(&e->threadpool, engine_marktasks_fixdt_mapper, s->tasks, + s->nr_tasks, sizeof(struct task), 1000, &rebuild_space); + return rebuild_space; - } + /* Multiple-timestep case */ + } else { - /* Kick? */ - else if (t->type == task_type_kick) { - t->skip = (t->ci->ti_end_min > ti_end); - t->ci->updated = 0; - t->ci->g_updated = 0; - } + /* Run through the tasks and mark as skip or not. */ + int extra_data[2] = {e->ti_current, rebuild_space}; + threadpool_map(&e->threadpool, engine_marktasks_sorts_mapper, s->tasks, + s->nr_tasks, sizeof(struct task), 10000, NULL); - /* Drift? */ - else if (t->type == task_type_drift) - t->skip = 0; +#ifdef WITH_MPI + if (e->policy & engine_policy_mpi) { - /* Init? */ - else if (t->type == task_type_init) { - /* Set this task's skip. */ - t->skip = (t->ci->ti_end_min > ti_end); + /* Skip all sends and recvs, we will unmark if needed. */ + for (int k = 0; k < s->nr_tasks; k++) { + struct task *t = &s->tasks[k]; + if (t->type == task_type_send || t->type == task_type_recv) { + t->skip = 1; + } } - - /* None? */ - else if (t->type == task_type_none) - t->skip = 1; } +#endif + + threadpool_map(&e->threadpool, engine_marktasks_mapper, s->tasks, + s->nr_tasks, sizeof(struct task), 10000, extra_data); + rebuild_space = extra_data[1]; } if (e->verbose) @@ -2176,7 +2181,7 @@ int engine_marktasks(struct engine *e) { clocks_getunit()); /* All is well... */ - return 0; + return rebuild_space; } /** @@ -2216,6 +2221,7 @@ void engine_print_task_counts(struct engine *e) { * * @param e The #engine. */ + void engine_rebuild(struct engine *e) { const ticks tic = getticks(); @@ -2420,64 +2426,6 @@ void engine_collect_timestep(struct engine *e) { e->g_updates = g_updates; } -/** - * @brief Mapping function to collect the data from the drift. - * - * @param c A super-cell. - */ -void engine_collect_drift(struct cell *c) { - - /* Skip super-cells (Their values are already set) */ - if (c->drift != NULL) return; - - /* Counters for the different quantities. */ - double e_kin = 0.0, e_int = 0.0, e_pot = 0.0, entropy = 0.0, mass = 0.0; - double mom[3] = {0.0, 0.0, 0.0}, ang_mom[3] = {0.0, 0.0, 0.0}; - - /* Only do something is the cell is non-empty */ - if (c->count != 0 || c->gcount != 0) { - - /* If this cell is not split, I'm in trouble. */ - if (!c->split) error("Cell has no super-cell."); - - /* Collect the values from the progeny. */ - for (int k = 0; k < 8; k++) { - struct cell *cp = c->progeny[k]; - if (cp != NULL) { - - /* Recurse */ - engine_collect_drift(cp); - - /* And update */ - mass += cp->mass; - e_kin += cp->e_kin; - e_int += cp->e_int; - e_pot += cp->e_pot; - entropy += cp->entropy; - mom[0] += cp->mom[0]; - mom[1] += cp->mom[1]; - mom[2] += cp->mom[2]; - ang_mom[0] += cp->ang_mom[0]; - ang_mom[1] += cp->ang_mom[1]; - ang_mom[2] += cp->ang_mom[2]; - } - } - } - - /* Store the collected values in the cell. */ - c->mass = mass; - c->e_kin = e_kin; - c->e_int = e_int; - c->e_pot = e_pot; - c->entropy = entropy; - c->mom[0] = mom[0]; - c->mom[1] = mom[1]; - c->mom[2] = mom[2]; - c->ang_mom[0] = ang_mom[0]; - c->ang_mom[1] = ang_mom[1]; - c->ang_mom[2] = ang_mom[2]; -} - /** * @brief Print the conserved quantities statistics to a log file * @@ -2494,11 +2442,6 @@ void engine_print_stats(struct engine *e) { for (int k = 0; k < s->nr_cells; k++) if (s->cells[k].nodeID == e->nodeID) { struct cell *c = &s->cells[k]; - - /* Make the top-cells recurse */ - engine_collect_drift(c); - - /* And aggregate */ mass += c->mass; e_kin += c->e_kin; e_int += c->e_int; @@ -2678,6 +2621,8 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs) { /* Ready to go */ e->step = -1; e->wallclock_time = (float)clocks_diff(&time1, &time2); + + if (e->verbose) message("took %.3f %s.", e->wallclock_time, clocks_getunit()); } /** @@ -2710,7 +2655,8 @@ void engine_step(struct engine *e) { snapshot_drift_time = e->timeStep; /* Drift everybody to the snapshot position */ - engine_launch(e, e->nr_threads, 1 << task_type_drift, 0); + threadpool_map(&e->threadpool, runner_do_drift_mapper, e->s->cells, + e->s->nr_cells, sizeof(struct cell), 1, e); /* Dump... */ engine_dump_snapshot(e); @@ -2728,7 +2674,8 @@ void engine_step(struct engine *e) { e->timeStep = (e->ti_current - e->ti_old) * e->timeBase + snapshot_drift_time; /* Drift everybody */ - engine_launch(e, e->nr_threads, 1 << task_type_drift, 0); + threadpool_map(&e->threadpool, runner_do_drift_mapper, e->s->cells, + e->s->nr_cells, sizeof(struct cell), 1, e); if (e->nodeID == 0) { @@ -2992,6 +2939,7 @@ void engine_split(struct engine *e, struct partition *initial_partition) { part_relink_parts(s->gparts, s->nr_gparts, s->parts); #ifdef SWIFT_DEBUG_CHECKS + /* Verify that the links are correct */ for (size_t k = 0; k < s->nr_gparts; ++k) { @@ -3011,6 +2959,7 @@ void engine_split(struct engine *e, struct partition *initial_partition) { if (s->parts[k].gpart != NULL && s->parts[k].gpart->id_or_neg_offset != -k) error("Linking problem !"); } + #endif #else @@ -3442,6 +3391,9 @@ void engine_init(struct engine *e, struct space *s, part_create_mpi_types(); #endif + /* Initialize the threadpool. */ + threadpool_init(&e->threadpool, e->nr_threads); + /* First of all, init the barrier and lock it. */ if (pthread_mutex_init(&e->barrier_mutex, NULL) != 0) error("Failed to initialize barrier mutex."); @@ -3456,18 +3408,7 @@ void engine_init(struct engine *e, struct space *s, /* Init the scheduler with enough tasks for the initial sorting tasks. */ const int nr_tasks = 2 * s->tot_cells + 2 * e->nr_threads; scheduler_init(&e->sched, e->s, nr_tasks, nr_queues, scheduler_flag_steal, - e->nodeID); - - /* Create the sorting tasks. */ - for (int i = 0; i < e->nr_threads; i++) { - scheduler_addtask(&e->sched, task_type_part_sort, task_subtype_none, i, 0, - NULL, NULL, 0); - - scheduler_addtask(&e->sched, task_type_gpart_sort, task_subtype_none, i, 0, - NULL, NULL, 0); - } - - scheduler_ranktasks(&e->sched); + e->nodeID, &e->threadpool); /* Allocate and init the threads. */ if ((e->runners = (struct runner *)malloc(sizeof(struct runner) * diff --git a/src/engine.h b/src/engine.h index d708198c32b67c5118bbd7f4676f1ea0fe821c7d..e0c0f6a92e98a3c9a59f7db6a8ae930442ea5cac 100644 --- a/src/engine.h +++ b/src/engine.h @@ -109,6 +109,9 @@ struct engine { /* The task scheduler. */ struct scheduler sched; + /* Common threadpool for all the engine's tasks. */ + struct threadpool threadpool; + /* The minimum and maximum allowed dt */ double dt_min, dt_max; @@ -234,7 +237,6 @@ void engine_rebuild(struct engine *e); void engine_repartition(struct engine *e); void engine_makeproxies(struct engine *e); void engine_redistribute(struct engine *e); -struct link *engine_addlink(struct engine *e, struct link *l, struct task *t); void engine_print_policy(struct engine *e); int engine_is_done(struct engine *e); void engine_pin(); diff --git a/src/gravity/Default/gravity.h b/src/gravity/Default/gravity.h index 755d4be527e4a81b2b4d2b3b829ea16a74ccc7c5..f9b67c96331e7572b0200093f0c32eee5d2391cd 100644 --- a/src/gravity/Default/gravity.h +++ b/src/gravity/Default/gravity.h @@ -17,6 +17,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_DEFAULT_GRAVITY_H +#define SWIFT_DEFAULT_GRAVITY_H #include <float.h> #include "potentials.h" @@ -151,3 +153,5 @@ __attribute__((always_inline)) INLINE static void external_gravity( */ __attribute__((always_inline)) INLINE static void gravity_kick_extra( struct gpart* gp, float dt, float half_dt) {} + +#endif /* SWIFT_DEFAULT_GRAVITY_H */ diff --git a/src/gravity/Default/gravity_debug.h b/src/gravity/Default/gravity_debug.h index 7cf375a1fdf7bccc4131dc415ab2d4acbbf2d3bc..c284f543b3be06297600c010e302423eb683adc9 100644 --- a/src/gravity/Default/gravity_debug.h +++ b/src/gravity/Default/gravity_debug.h @@ -16,6 +16,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_DEFAULT_GRAVITY_DEBUG_H +#define SWIFT_DEFAULT_GRAVITY_DEBUG_H __attribute__((always_inline)) INLINE static void gravity_debug_particle( const struct gpart* p) { @@ -27,3 +29,5 @@ __attribute__((always_inline)) INLINE static void gravity_debug_particle( p->a_grav[0], p->a_grav[1], p->a_grav[2], p->mass, p->ti_begin, p->ti_end); } + +#endif /* SWIFT_DEFAULT_GRAVITY_DEBUG_H */ diff --git a/src/gravity/Default/gravity_iact.h b/src/gravity/Default/gravity_iact.h index aa285ce3245aeac608aa022924a613a7a7b00375..7c7c5b9d244534ef3f6b5f509062019bfcd5f9fb 100644 --- a/src/gravity/Default/gravity_iact.h +++ b/src/gravity/Default/gravity_iact.h @@ -17,8 +17,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ -#ifndef SWIFT_RUNNER_IACT_GRAV_H -#define SWIFT_RUNNER_IACT_GRAV_H +#ifndef SWIFT_DEFAULT_GRAVITY_IACT_H +#define SWIFT_DEFAULT_GRAVITY_IACT_H /* Includes. */ #include "const.h" @@ -188,4 +188,4 @@ __attribute__((always_inline)) INLINE static void runner_iact_grav_pm( #endif } -#endif /* SWIFT_RUNNER_IACT_GRAV_H */ +#endif /* SWIFT_DEFAULT_GRAVITY_IACT_H */ diff --git a/src/gravity/Default/gravity_io.h b/src/gravity/Default/gravity_io.h index e0fd15a91b6e20048f6844b03ea1dde40114d7ec..26eed8a0a680e13130dd257d8b31e6cd00392f6d 100644 --- a/src/gravity/Default/gravity_io.h +++ b/src/gravity/Default/gravity_io.h @@ -16,6 +16,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_DEFAULT_GRAVITY_IO_H +#define SWIFT_DEFAULT_GRAVITY_IO_H #include "io_properties.h" @@ -68,3 +70,5 @@ void darkmatter_write_particles(struct gpart* gparts, struct io_props* list, list[4] = io_make_output_field("Acceleration", FLOAT, 3, UNIT_CONV_ACCELERATION, gparts, a_grav); } + +#endif /* SWIFT_DEFAULT_GRAVITY_IO_H */ diff --git a/src/gravity/Default/gravity_part.h b/src/gravity/Default/gravity_part.h index 77e6543429993bea355c04dd28fcb5b04eee5519..1850ff0a1644d3593f78f150646eae8b2f074e1e 100644 --- a/src/gravity/Default/gravity_part.h +++ b/src/gravity/Default/gravity_part.h @@ -16,6 +16,9 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_DEFAULT_GRAVITY_PART_H +#define SWIFT_DEFAULT_GRAVITY_PART_H + /* Some standard headers. */ #include <stdlib.h> @@ -51,3 +54,5 @@ struct gpart { long long id_or_neg_offset; } __attribute__((aligned(gpart_align))); + +#endif /* SWIFT_DEFAULT_GRAVITY_PART_H */ diff --git a/src/hydro/Default/hydro.h b/src/hydro/Default/hydro.h index 51e09a5d943a7f3782e484289faddece2567f15d..021599cd2daf61ff35e5f29e3f13b2ad61c8947a 100644 --- a/src/hydro/Default/hydro.h +++ b/src/hydro/Default/hydro.h @@ -16,6 +16,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_DEFAULT_HYDRO_H +#define SWIFT_DEFAULT_HYDRO_H #include "adiabatic_index.h" #include "approx_math.h" @@ -341,3 +343,5 @@ __attribute__((always_inline)) INLINE static void hydro_kick_extra( */ __attribute__((always_inline)) INLINE static void hydro_convert_quantities( struct part *restrict p) {} + +#endif /* SWIFT_DEFAULT_HYDRO_H */ diff --git a/src/hydro/Default/hydro_debug.h b/src/hydro/Default/hydro_debug.h index 9b8136e8349398e4e9e804459a5de23d21043564..d02d3ef82c1b3d751731f49850c06df4b146b164 100644 --- a/src/hydro/Default/hydro_debug.h +++ b/src/hydro/Default/hydro_debug.h @@ -16,6 +16,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_DEFAULT_HYDRO_DEBUG_H +#define SWIFT_DEFAULT_HYDRO_DEBUG_H __attribute__((always_inline)) INLINE static void hydro_debug_particle( const struct part* p, const struct xpart* xp) { @@ -29,3 +31,5 @@ __attribute__((always_inline)) INLINE static void hydro_debug_particle( p->h, (int)p->density.wcount, p->mass, p->rho_dh, p->rho, p->ti_begin, p->ti_end); } + +#endif /* SWIFT_DEFAULT_HYDRO_DEBUG_H */ diff --git a/src/hydro/Default/hydro_iact.h b/src/hydro/Default/hydro_iact.h index 4975cac35aaa3d4b4b9f99319dfd061998014453..51fa7d07229f86918ef2d7019a9708110cef02e3 100644 --- a/src/hydro/Default/hydro_iact.h +++ b/src/hydro/Default/hydro_iact.h @@ -17,8 +17,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ -#ifndef SWIFT_RUNNER_IACT_H -#define SWIFT_RUNNER_IACT_H +#ifndef SWIFT_DEFAULT_HYDRO_IACT_H +#define SWIFT_DEFAULT_HYDRO_IACT_H #include "adiabatic_index.h" @@ -932,4 +932,4 @@ __attribute__((always_inline)) INLINE static void runner_iact_nonsym_vec_force( #endif } -#endif /* SWIFT_RUNNER_IACT_H */ +#endif /* SWIFT_DEFAULT_HYDRO_IACT_H */ diff --git a/src/hydro/Default/hydro_io.h b/src/hydro/Default/hydro_io.h index 1c3e70753d417eb59d0fda5a7acfd1c24ab93045..bb35c914bcab8787f609b4dd49acd0cc883b4263 100644 --- a/src/hydro/Default/hydro_io.h +++ b/src/hydro/Default/hydro_io.h @@ -16,6 +16,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_DEFAULT_HYDRO_IO_H +#define SWIFT_DEFAULT_HYDRO_IO_H #include "io_properties.h" #include "kernel_hydro.h" @@ -112,3 +114,5 @@ void writeSPHflavour(hid_t h_grpsph) { * @return 1 if entropy is in 'internal energy', 0 otherwise. */ int writeEntropyFlag() { return 0; } + +#endif /* SWIFT_DEFAULT_HYDRO_IO_H */ diff --git a/src/hydro/Default/hydro_part.h b/src/hydro/Default/hydro_part.h index a3972dc7ebd1b04a18ccdb6f815d24e36aff270c..a2f4453dc69ed06ca4f315b6be29844c177d0435 100644 --- a/src/hydro/Default/hydro_part.h +++ b/src/hydro/Default/hydro_part.h @@ -16,6 +16,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_DEFAULT_HYDRO_PART_H +#define SWIFT_DEFAULT_HYDRO_PART_H /* Extra particle data not needed during the SPH loops over neighbours. */ struct xpart { @@ -117,3 +119,5 @@ struct part { struct gpart* gpart; } __attribute__((aligned(part_align))); + +#endif /* SWIFT_DEFAULT_HYDRO_PART_H */ diff --git a/src/hydro/Gadget2/hydro.h b/src/hydro/Gadget2/hydro.h index d2d4450fa12374a8a8dec624c5e54ba3d47b99aa..e9d626cb8c147c0cf4fa8d27f8bab31d2471beae 100644 --- a/src/hydro/Gadget2/hydro.h +++ b/src/hydro/Gadget2/hydro.h @@ -16,6 +16,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_GADGET2_HYDRO_H +#define SWIFT_GADGET2_HYDRO_H #include "adiabatic_index.h" #include "dimension.h" @@ -361,3 +363,5 @@ __attribute__((always_inline)) INLINE static void hydro_convert_quantities( /* We read u in the entropy field. We now get S from u */ p->entropy = gas_entropy_from_internal_energy(p->rho, p->entropy); } + +#endif /* SWIFT_GADGET2_HYDRO_H */ diff --git a/src/hydro/Gadget2/hydro_debug.h b/src/hydro/Gadget2/hydro_debug.h index 0280ae7d7a0cdac4567b8e9d4c3a50faf20e93f6..7c8a6eaba96929b01f1901393d7d0498d58badf4 100644 --- a/src/hydro/Gadget2/hydro_debug.h +++ b/src/hydro/Gadget2/hydro_debug.h @@ -16,6 +16,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_GADGET2_HYDRO_DEBUG_H +#define SWIFT_GADGET2_HYDRO_DEBUG_H __attribute__((always_inline)) INLINE static void hydro_debug_particle( const struct part* p, const struct xpart* xp) { @@ -34,3 +36,5 @@ __attribute__((always_inline)) INLINE static void hydro_debug_particle( p->density.rot_v[1], p->density.rot_v[2], p->force.balsara, p->force.v_sig, p->force.h_dt, p->ti_begin, p->ti_end); } + +#endif /* SWIFT_GADGET2_HYDRO_DEBUG_H */ diff --git a/src/hydro/Gadget2/hydro_iact.h b/src/hydro/Gadget2/hydro_iact.h index 6766e98e6a6ecda12372bee7354b1cd4ca090885..b9807b6332e08012d47ad63652377f4fe5337bf9 100644 --- a/src/hydro/Gadget2/hydro_iact.h +++ b/src/hydro/Gadget2/hydro_iact.h @@ -17,8 +17,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ -#ifndef SWIFT_RUNNER_IACT_LEGACY_H -#define SWIFT_RUNNER_IACT_LEGACY_H +#ifndef SWIFT_GADGET2_HYDRO_IACT_H +#define SWIFT_GADGET2_HYDRO_IACT_H /** * @brief SPH interaction functions following the Gadget-2 version of SPH. @@ -913,4 +913,4 @@ __attribute__((always_inline)) INLINE static void runner_iact_nonsym_vec_force( #endif } -#endif /* SWIFT_RUNNER_IACT_LEGACY_H */ +#endif /* SWIFT_GADGET2_HYDRO_IACT_H */ diff --git a/src/hydro/Gadget2/hydro_io.h b/src/hydro/Gadget2/hydro_io.h index 96543c933048ed9404d5602f1a1986b7cc34ed28..433aef64c388c8bc4989e883f10a8f0d3eeb30e9 100644 --- a/src/hydro/Gadget2/hydro_io.h +++ b/src/hydro/Gadget2/hydro_io.h @@ -16,6 +16,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_GADGET2_HYDRO_IO_H +#define SWIFT_GADGET2_HYDRO_IO_H #include "adiabatic_index.h" #include "hydro.h" @@ -121,3 +123,5 @@ void writeSPHflavour(hid_t h_grpsph) { * @return 1 if entropy is in 'internal energy', 0 otherwise. */ int writeEntropyFlag() { return 0; } + +#endif /* SWIFT_GADGET2_HYDRO_IO_H */ diff --git a/src/hydro/Gadget2/hydro_part.h b/src/hydro/Gadget2/hydro_part.h index 0cd830c51a1836c0a56e7b4097990bf4d07a101f..484792438d2717413c1ca8d4f429eac2e6d21b20 100644 --- a/src/hydro/Gadget2/hydro_part.h +++ b/src/hydro/Gadget2/hydro_part.h @@ -16,6 +16,8 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. * ******************************************************************************/ +#ifndef SWIFT_GADGET2_HYDRO_PART_H +#define SWIFT_GADGET2_HYDRO_PART_H /* Extra particle data not needed during the SPH loops over neighbours. */ struct xpart { @@ -109,3 +111,5 @@ struct part { struct gpart* gpart; } __attribute__((aligned(part_align))); + +#endif /* SWIFT_GADGET2_HYDRO_PART_H */ diff --git a/src/lock.h b/src/lock.h index ca7f01ee029cd1c57ed8fd0f3237ea54cb43e9a7..b2dd2eac9d0ca5d7807907e31cf3fa31894f9aed 100644 --- a/src/lock.h +++ b/src/lock.h @@ -34,6 +34,7 @@ #define lock_trylock(l) (pthread_spin_lock(l) != 0) #define lock_unlock(l) (pthread_spin_unlock(l) != 0) #define lock_unlock_blind(l) pthread_spin_unlock(l) + #elif defined(PTHREAD_LOCK) #include <pthread.h> #define swift_lock_type pthread_mutex_t @@ -43,6 +44,7 @@ #define lock_trylock(l) (pthread_mutex_trylock(l) != 0) #define lock_unlock(l) (pthread_mutex_unlock(l) != 0) #define lock_unlock_blind(l) pthread_mutex_unlock(l) + #else #define swift_lock_type volatile int #define lock_init(l) (*(l) = 0) diff --git a/src/partition.c b/src/partition.c index 11de3a62be65179bb128ff89d9fbf48861a2175b..e216e12a5a23457b39b53070de3d84a2f257b927 100644 --- a/src/partition.c +++ b/src/partition.c @@ -455,8 +455,8 @@ static void repart_edge_metis(int partweights, int bothweights, int nodeID, /* Skip un-interesting tasks. */ if (t->type != task_type_self && t->type != task_type_pair && t->type != task_type_sub_self && t->type != task_type_sub_self && - t->type != task_type_ghost && t->type != task_type_drift && - t->type != task_type_kick && t->type != task_type_init) + t->type != task_type_ghost && t->type != task_type_kick && + t->type != task_type_init) continue; /* Get the task weight. This can be slightly negative on multiple board @@ -494,8 +494,7 @@ static void repart_edge_metis(int partweights, int bothweights, int nodeID, int cid = ci - cells; /* Different weights for different tasks. */ - if (t->type == task_type_ghost || t->type == task_type_drift || - t->type == task_type_kick) { + if (t->type == task_type_ghost || t->type == task_type_kick) { /* Particle updates add only to vertex weight. */ if (taskvweights) weights_v[cid] += w; diff --git a/src/runner.c b/src/runner.c index d8c42920675aa662106a23d754dd11e790844914..b851ea84ddcd3f4bc28028105a65546ff0760b10 100644 --- a/src/runner.c +++ b/src/runner.c @@ -626,33 +626,27 @@ void runner_do_ghost(struct runner *r, struct cell *c) { } /** - * @brief Drift particles and g-particles forward in time + * @brief Drift particles and g-particles in a cell forward in time * - * @param r The runner thread. * @param c The cell. - * @param timer Are we timing this ? + * @param e The engine. */ -void runner_do_drift(struct runner *r, struct cell *c, int timer) { +static void runner_do_drift(struct cell *c, struct engine *e) { - const double timeBase = r->e->timeBase; - const double dt = (r->e->ti_current - r->e->ti_old) * timeBase; - const int ti_old = r->e->ti_old; - const int ti_current = r->e->ti_current; - struct part *restrict parts = c->parts; - struct xpart *restrict xparts = c->xparts; - struct gpart *restrict gparts = c->gparts; + const double timeBase = e->timeBase; + const double dt = (e->ti_current - e->ti_old) * timeBase; + const int ti_old = e->ti_old; + const int ti_current = e->ti_current; + + struct part *const parts = c->parts; + struct xpart *const xparts = c->xparts; + struct gpart *const gparts = c->gparts; float dx_max = 0.f, dx2_max = 0.f, h_max = 0.f; double e_kin = 0.0, e_int = 0.0, e_pot = 0.0, entropy = 0.0, mass = 0.0; double mom[3] = {0.0, 0.0, 0.0}; double ang_mom[3] = {0.0, 0.0, 0.0}; - TIMER_TIC - -#ifdef TASK_VERBOSE - OUT; -#endif - /* No children? */ if (!c->split) { @@ -661,7 +655,7 @@ void runner_do_drift(struct runner *r, struct cell *c, int timer) { for (size_t k = 0; k < nr_gparts; k++) { /* Get a handle on the gpart. */ - struct gpart *restrict gp = &gparts[k]; + struct gpart *const gp = &gparts[k]; /* Drift... */ drift_gpart(gp, dt, timeBase, ti_old, ti_current); @@ -678,8 +672,8 @@ void runner_do_drift(struct runner *r, struct cell *c, int timer) { for (size_t k = 0; k < nr_parts; k++) { /* Get a handle on the part. */ - struct part *restrict p = &parts[k]; - struct xpart *restrict xp = &xparts[k]; + struct part *const p = &parts[k]; + struct xpart *const xp = &xparts[k]; /* Drift... */ drift_part(p, xp, dt, timeBase, ti_old, ti_current); @@ -732,15 +726,14 @@ void runner_do_drift(struct runner *r, struct cell *c, int timer) { /* Otherwise, aggregate data from children. */ else { - /* Loop over the progeny. */ + /* Loop over the progeny and collect their data. */ for (int k = 0; k < 8; k++) if (c->progeny[k] != NULL) { + struct cell *cp = c->progeny[k]; - /* Recurse */ - struct cell *restrict cp = c->progeny[k]; - runner_do_drift(r, cp, 0); + /* Recurse. */ + runner_do_drift(cp, e); - /* Collect */ dx_max = fmaxf(dx_max, cp->dx_max); h_max = fmaxf(h_max, cp->h_max); mass += cp->mass; @@ -771,8 +764,28 @@ void runner_do_drift(struct runner *r, struct cell *c, int timer) { c->ang_mom[0] = ang_mom[0]; c->ang_mom[1] = ang_mom[1]; c->ang_mom[2] = ang_mom[2]; +} + +/** + * @brief Mapper function to drift particles and g-particles forward in time. + * + * @param map_data An array of #cell%s. + * @param num_elements Chunk size. + * @param extra_data Pointer to an #engine. + */ + +void runner_do_drift_mapper(void *map_data, int num_elements, + void *extra_data) { - if (timer) TIMER_TOC(timer_drift); + struct engine *e = (struct engine *)extra_data; + struct cell *cells = (struct cell *)map_data; + + for (int ind = 0; ind < num_elements; ind++) { + struct cell *c = &cells[ind]; + + /* Only drift local particles. */ + if (c != NULL && c->nodeID == e->nodeID) runner_do_drift(c, e); + } } /** @@ -1182,9 +1195,6 @@ void *runner_main(void *data) { case task_type_extra_ghost: runner_do_extra_ghost(r, ci); break; - case task_type_drift: - runner_do_drift(r, ci, 1); - break; case task_type_kick: runner_do_kick(r, ci, 1); break; @@ -1218,19 +1228,6 @@ void *runner_main(void *data) { case task_type_grav_external: runner_do_grav_external(r, t->ci, 1); break; - case task_type_part_sort: - space_do_parts_sort(); - break; - case task_type_gpart_sort: - space_do_gparts_sort(); - break; - case task_type_split_cell: - space_do_split(e->s, t->ci); - break; - case task_type_rewait: - scheduler_do_rewait((struct task *)t->ci, (struct task *)t->cj, - t->flags, t->rank); - break; default: error("Unknown task type."); } diff --git a/src/runner.h b/src/runner.h index 6838b959955c4e54e208b8d2d16339e7fdb1740f..71e849d30233a07dbfadeb945a2b7c8ba56ab4a6 100644 --- a/src/runner.h +++ b/src/runner.h @@ -51,8 +51,8 @@ void runner_do_sort(struct runner *r, struct cell *c, int flag, int clock); void runner_do_gsort(struct runner *r, struct cell *c, int flag, int clock); void runner_do_kick(struct runner *r, struct cell *c, int timer); void runner_do_kick_fixdt(struct runner *r, struct cell *c, int timer); -void runner_do_drift(struct runner *r, struct cell *c, int timer); void runner_do_init(struct runner *r, struct cell *c, int timer); void *runner_main(void *data); +void runner_do_drift_mapper(void *map_data, int num_elements, void *extra_data); #endif /* SWIFT_RUNNER_H */ diff --git a/src/scheduler.c b/src/scheduler.c index 6a0d886bd5458028c5c05812f10c204bc8946a1a..adc63e55092ff9bc69e6a2f98d6dfd5c399857f4 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -2,6 +2,7 @@ * This file is part of SWIFT. * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk) * Matthieu Schaller (matthieu.schaller@durham.ac.uk) + * 2016 Peter W. Draper (p.w.draper@durham.ac.uk) * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published @@ -59,102 +60,80 @@ void scheduler_addunlock(struct scheduler *s, struct task *ta, struct task *tb) { - - /* Lock the scheduler since re-allocating the unlocks is not - thread-safe. */ - if (lock_lock(&s->lock) != 0) error("Unable to lock scheduler."); + /* Get an index at which to store this unlock. */ + const int ind = atomic_inc(&s->nr_unlocks); /* Does the buffer need to be grown? */ - if (s->nr_unlocks == s->size_unlocks) { + if (ind == s->size_unlocks) { + /* Allocate the new buffer. */ struct task **unlocks_new; int *unlock_ind_new; - s->size_unlocks *= 2; + const int size_unlocks_new = s->size_unlocks * 2; if ((unlocks_new = (struct task **)malloc(sizeof(struct task *) * - s->size_unlocks)) == NULL || - (unlock_ind_new = (int *)malloc(sizeof(int) * s->size_unlocks)) == NULL) + size_unlocks_new)) == NULL || + (unlock_ind_new = (int *)malloc(sizeof(int) * size_unlocks_new)) == + NULL) error("Failed to re-allocate unlocks."); - memcpy(unlocks_new, s->unlocks, sizeof(struct task *) * s->nr_unlocks); - memcpy(unlock_ind_new, s->unlock_ind, sizeof(int) * s->nr_unlocks); + + /* Wait for all writes to the old buffer to complete. */ + while (s->completed_unlock_writes < ind) + ; + + /* Copy the buffers. */ + memcpy(unlocks_new, s->unlocks, sizeof(struct task *) * ind); + memcpy(unlock_ind_new, s->unlock_ind, sizeof(int) * ind); free(s->unlocks); free(s->unlock_ind); s->unlocks = unlocks_new; s->unlock_ind = unlock_ind_new; + + /* Publish the new buffer size. */ + s->size_unlocks = size_unlocks_new; } + /* Wait for there to actually be space at my index. */ + while (ind > s->size_unlocks) + ; + /* Write the unlock to the scheduler. */ - const int ind = atomic_inc(&s->nr_unlocks); s->unlocks[ind] = tb; s->unlock_ind[ind] = ta - s->tasks; - - /* Release the scheduler. */ - if (lock_unlock(&s->lock) != 0) error("Unable to unlock scheduler."); + atomic_inc(&s->completed_unlock_writes); } /** - * @brief Split tasks that may be too large. + * @brief Split a task if too large. * + * @param t The #task * @param s The #scheduler we are working in. */ -void scheduler_splittasks(struct scheduler *s) { +static void scheduler_splittask(struct task *t, struct scheduler *s) { - const int pts[7][8] = { + /* Static constants. */ + const static int pts[7][8] = { {-1, 12, 10, 9, 4, 3, 1, 0}, {-1, -1, 11, 10, 5, 4, 2, 1}, {-1, -1, -1, 12, 7, 6, 4, 3}, {-1, -1, -1, -1, 8, 7, 5, 4}, {-1, -1, -1, -1, -1, 12, 10, 9}, {-1, -1, -1, -1, -1, -1, 11, 10}, {-1, -1, -1, -1, -1, -1, -1, 12}}; - const float sid_scale[13] = {0.1897, 0.4025, 0.1897, 0.4025, 0.5788, - 0.4025, 0.1897, 0.4025, 0.1897, 0.4025, - 0.5788, 0.4025, 0.5788}; - - /* Loop through the tasks... */ - int tid = 0, redo = 0; - struct task *t_old = NULL; - while (1) { - - /* Get a pointer on the task. */ - struct task *t = t_old; - if (redo) { - redo = 0; - } else { - const int ind = atomic_inc(&tid); - if (ind < s->nr_tasks) - t_old = t = &s->tasks[s->tasks_ind[ind]]; - else - break; - } + const static float sid_scale[13] = {0.1897, 0.4025, 0.1897, 0.4025, 0.5788, + 0.4025, 0.1897, 0.4025, 0.1897, 0.4025, + 0.5788, 0.4025, 0.5788}; - /* Skip sorting tasks. */ - if (t->type == task_type_part_sort) continue; + /* Iterate on this task until we're done with it. */ + int redo = 1; + while (redo) { - if (t->type == task_type_gpart_sort) continue; + /* Reset the redo flag. */ + redo = 0; - /* Empty task? */ - if (t->ci == NULL || (t->type == task_type_pair && t->cj == NULL)) { + /* Non-splittable task? */ + if ((t->ci == NULL || (t->type == task_type_pair && t->cj == NULL)) || + ((t->type == task_type_kick) && t->ci->nodeID != s->nodeID) || + ((t->type == task_type_init) && t->ci->nodeID != s->nodeID)) { t->type = task_type_none; t->skip = 1; - continue; - } - - /* Non-local kick task? */ - if ((t->type == task_type_kick) && t->ci->nodeID != s->nodeID) { - t->type = task_type_none; - t->skip = 1; - continue; - } - - /* Non-local drift task? */ - if ((t->type == task_type_drift) && t->ci->nodeID != s->nodeID) { - t->type = task_type_none; - t->skip = 1; - continue; - } - - /* Non-local init task? */ - if ((t->type == task_type_init) && t->ci->nodeID != s->nodeID) { - t->type = task_type_none; - t->skip = 1; - continue; + break; } /* Self-interaction? */ @@ -166,7 +145,7 @@ void scheduler_splittasks(struct scheduler *s) { /* Foreign task? */ if (ci->nodeID != s->nodeID) { t->skip = 1; - continue; + break; } /* Is this cell even split? */ @@ -179,36 +158,38 @@ void scheduler_splittasks(struct scheduler *s) { /* convert to a self-subtask. */ t->type = task_type_sub_self; - } - - /* Otherwise, make tasks explicitly. */ - else { + /* Otherwise, make tasks explicitly. */ + } else { /* Take a step back (we're going to recycle the current task)... */ redo = 1; - /* Add the self task. */ + /* Add the self tasks. */ int first_child = 0; while (ci->progeny[first_child] == NULL) first_child++; t->ci = ci->progeny[first_child]; for (int k = first_child + 1; k < 8; k++) if (ci->progeny[k] != NULL) - scheduler_addtask(s, task_type_self, t->subtype, 0, 0, - ci->progeny[k], NULL, 0); + scheduler_splittask( + scheduler_addtask(s, task_type_self, t->subtype, 0, 0, + ci->progeny[k], NULL, 0), + s); /* Make a task for each pair of progeny. */ for (int j = 0; j < 8; j++) if (ci->progeny[j] != NULL) for (int k = j + 1; k < 8; k++) if (ci->progeny[k] != NULL) - scheduler_addtask(s, task_type_pair, t->subtype, pts[j][k], 0, - ci->progeny[j], ci->progeny[k], 0); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, + pts[j][k], 0, ci->progeny[j], + ci->progeny[k], 0), + s); } } - } - /* Hydro Pair interaction? */ - else if (t->type == task_type_pair && t->subtype != task_subtype_grav) { + /* Pair interaction? */ + } else if (t->type == task_type_pair && t->subtype != task_subtype_grav) { /* Get a handle on the cells involved. */ struct cell *ci = t->ci; @@ -219,7 +200,7 @@ void scheduler_splittasks(struct scheduler *s) { /* Foreign task? */ if (ci->nodeID != s->nodeID && cj->nodeID != s->nodeID) { t->skip = 1; - continue; + break; } /* Get the sort ID, use space_getsid and not t->flags @@ -234,16 +215,14 @@ void scheduler_splittasks(struct scheduler *s) { /* Replace by a single sub-task? */ if (scheduler_dosub && - ci->count * cj->count * sid_scale[sid] < space_subsize && + ci->count * sid_scale[sid] < space_subsize / cj->count && sid != 0 && sid != 2 && sid != 6 && sid != 8) { /* Make this task a sub task. */ t->type = task_type_sub_pair; - } - - /* Otherwise, split it. */ - else { + /* Otherwise, split it. */ + } else { /* Take a step back (we're going to recycle the current task)... */ redo = 1; @@ -262,12 +241,18 @@ void scheduler_splittasks(struct scheduler *s) { t->cj = cj->progeny[0]; t->flags = 1; t->tight = 1; - t = scheduler_addtask(s, task_type_pair, t->subtype, 1, 0, - ci->progeny[7], cj->progeny[1], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, - ci->progeny[6], cj->progeny[1], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, - ci->progeny[7], cj->progeny[0], 1); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 1, 0, + ci->progeny[7], cj->progeny[1], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, + ci->progeny[6], cj->progeny[1], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, + ci->progeny[7], cj->progeny[0], 1), + s); break; case 2: /* ( 1 , 1 , -1 ) */ @@ -282,12 +267,18 @@ void scheduler_splittasks(struct scheduler *s) { t->cj = cj->progeny[0]; t->flags = 3; t->tight = 1; - t = scheduler_addtask(s, task_type_pair, t->subtype, 3, 0, - ci->progeny[7], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, - ci->progeny[5], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 6, 0, - ci->progeny[7], cj->progeny[0], 1); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 3, 0, + ci->progeny[7], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, + ci->progeny[5], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 6, 0, + ci->progeny[7], cj->progeny[0], 1), + s); break; case 4: /* ( 1 , 0 , 0 ) */ @@ -295,36 +286,66 @@ void scheduler_splittasks(struct scheduler *s) { t->cj = cj->progeny[0]; t->flags = 4; t->tight = 1; - t = scheduler_addtask(s, task_type_pair, t->subtype, 5, 0, - ci->progeny[5], cj->progeny[0], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, - ci->progeny[6], cj->progeny[0], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, - ci->progeny[7], cj->progeny[0], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 3, 0, - ci->progeny[4], cj->progeny[1], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 4, 0, - ci->progeny[5], cj->progeny[1], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 6, 0, - ci->progeny[6], cj->progeny[1], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, - ci->progeny[7], cj->progeny[1], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 1, 0, - ci->progeny[4], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, - ci->progeny[5], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 4, 0, - ci->progeny[6], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 5, 0, - ci->progeny[7], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, - ci->progeny[4], cj->progeny[3], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 1, 0, - ci->progeny[5], cj->progeny[3], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 3, 0, - ci->progeny[6], cj->progeny[3], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 4, 0, - ci->progeny[7], cj->progeny[3], 1); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 5, 0, + ci->progeny[5], cj->progeny[0], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, + ci->progeny[6], cj->progeny[0], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, + ci->progeny[7], cj->progeny[0], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 3, 0, + ci->progeny[4], cj->progeny[1], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 4, 0, + ci->progeny[5], cj->progeny[1], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 6, 0, + ci->progeny[6], cj->progeny[1], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, + ci->progeny[7], cj->progeny[1], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 1, 0, + ci->progeny[4], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, + ci->progeny[5], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 4, 0, + ci->progeny[6], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 5, 0, + ci->progeny[7], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, + ci->progeny[4], cj->progeny[3], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 1, 0, + ci->progeny[5], cj->progeny[3], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 3, 0, + ci->progeny[6], cj->progeny[3], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 4, 0, + ci->progeny[7], cj->progeny[3], 1), + s); break; case 5: /* ( 1 , 0 , -1 ) */ @@ -332,12 +353,18 @@ void scheduler_splittasks(struct scheduler *s) { t->cj = cj->progeny[1]; t->flags = 5; t->tight = 1; - t = scheduler_addtask(s, task_type_pair, t->subtype, 5, 0, - ci->progeny[6], cj->progeny[3], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, - ci->progeny[4], cj->progeny[3], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, - ci->progeny[6], cj->progeny[1], 1); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 5, 0, + ci->progeny[6], cj->progeny[3], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, + ci->progeny[4], cj->progeny[3], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, + ci->progeny[6], cj->progeny[1], 1), + s); break; case 6: /* ( 1 , -1 , 1 ) */ @@ -352,12 +379,18 @@ void scheduler_splittasks(struct scheduler *s) { t->cj = cj->progeny[3]; t->flags = 6; t->tight = 1; - t = scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, - ci->progeny[5], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, - ci->progeny[4], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, - ci->progeny[5], cj->progeny[3], 1); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, + ci->progeny[5], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, + ci->progeny[4], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, + ci->progeny[5], cj->progeny[3], 1), + s); break; case 8: /* ( 1 , -1 , -1 ) */ @@ -372,12 +405,18 @@ void scheduler_splittasks(struct scheduler *s) { t->cj = cj->progeny[0]; t->flags = 9; t->tight = 1; - t = scheduler_addtask(s, task_type_pair, t->subtype, 9, 0, - ci->progeny[7], cj->progeny[4], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, - ci->progeny[3], cj->progeny[4], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, - ci->progeny[7], cj->progeny[0], 1); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 9, 0, + ci->progeny[7], cj->progeny[4], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, + ci->progeny[3], cj->progeny[4], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, + ci->progeny[7], cj->progeny[0], 1), + s); break; case 10: /* ( 0 , 1 , 0 ) */ @@ -385,36 +424,66 @@ void scheduler_splittasks(struct scheduler *s) { t->cj = cj->progeny[0]; t->flags = 10; t->tight = 1; - t = scheduler_addtask(s, task_type_pair, t->subtype, 11, 0, - ci->progeny[3], cj->progeny[0], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, - ci->progeny[6], cj->progeny[0], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 6, 0, - ci->progeny[7], cj->progeny[0], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 9, 0, - ci->progeny[2], cj->progeny[1], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 10, 0, - ci->progeny[3], cj->progeny[1], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, - ci->progeny[6], cj->progeny[1], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, - ci->progeny[7], cj->progeny[1], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 1, 0, - ci->progeny[2], cj->progeny[4], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, - ci->progeny[3], cj->progeny[4], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 10, 0, - ci->progeny[6], cj->progeny[4], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 11, 0, - ci->progeny[7], cj->progeny[4], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, - ci->progeny[2], cj->progeny[5], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 1, 0, - ci->progeny[3], cj->progeny[5], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 9, 0, - ci->progeny[6], cj->progeny[5], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 10, 0, - ci->progeny[7], cj->progeny[5], 1); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 11, 0, + ci->progeny[3], cj->progeny[0], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, + ci->progeny[6], cj->progeny[0], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 6, 0, + ci->progeny[7], cj->progeny[0], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 9, 0, + ci->progeny[2], cj->progeny[1], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 10, 0, + ci->progeny[3], cj->progeny[1], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, + ci->progeny[6], cj->progeny[1], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 7, 0, + ci->progeny[7], cj->progeny[1], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 1, 0, + ci->progeny[2], cj->progeny[4], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, + ci->progeny[3], cj->progeny[4], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 10, 0, + ci->progeny[6], cj->progeny[4], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 11, 0, + ci->progeny[7], cj->progeny[4], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, + ci->progeny[2], cj->progeny[5], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 1, 0, + ci->progeny[3], cj->progeny[5], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 9, 0, + ci->progeny[6], cj->progeny[5], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 10, 0, + ci->progeny[7], cj->progeny[5], 1), + s); break; case 11: /* ( 0 , 1 , -1 ) */ @@ -422,12 +491,18 @@ void scheduler_splittasks(struct scheduler *s) { t->cj = cj->progeny[1]; t->flags = 11; t->tight = 1; - t = scheduler_addtask(s, task_type_pair, t->subtype, 11, 0, - ci->progeny[6], cj->progeny[5], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, - ci->progeny[2], cj->progeny[5], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 6, 0, - ci->progeny[6], cj->progeny[1], 1); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 11, 0, + ci->progeny[6], cj->progeny[5], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, + ci->progeny[2], cj->progeny[5], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 6, 0, + ci->progeny[6], cj->progeny[1], 1), + s); break; case 12: /* ( 0 , 0 , 1 ) */ @@ -435,45 +510,73 @@ void scheduler_splittasks(struct scheduler *s) { t->cj = cj->progeny[0]; t->flags = 12; t->tight = 1; - t = scheduler_addtask(s, task_type_pair, t->subtype, 11, 0, - ci->progeny[3], cj->progeny[0], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 5, 0, - ci->progeny[5], cj->progeny[0], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, - ci->progeny[7], cj->progeny[0], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 9, 0, - ci->progeny[1], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 12, 0, - ci->progeny[3], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, - ci->progeny[5], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 5, 0, - ci->progeny[7], cj->progeny[2], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 3, 0, - ci->progeny[1], cj->progeny[4], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 6, 0, - ci->progeny[3], cj->progeny[4], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 12, 0, - ci->progeny[5], cj->progeny[4], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 11, 0, - ci->progeny[7], cj->progeny[4], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, - ci->progeny[1], cj->progeny[6], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 3, 0, - ci->progeny[3], cj->progeny[6], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 9, 0, - ci->progeny[5], cj->progeny[6], 1); - t = scheduler_addtask(s, task_type_pair, t->subtype, 12, 0, - ci->progeny[7], cj->progeny[6], 1); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 11, 0, + ci->progeny[3], cj->progeny[0], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 5, 0, + ci->progeny[5], cj->progeny[0], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 2, 0, + ci->progeny[7], cj->progeny[0], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 9, 0, + ci->progeny[1], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 12, 0, + ci->progeny[3], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 8, 0, + ci->progeny[5], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 5, 0, + ci->progeny[7], cj->progeny[2], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 3, 0, + ci->progeny[1], cj->progeny[4], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 6, 0, + ci->progeny[3], cj->progeny[4], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 12, 0, + ci->progeny[5], cj->progeny[4], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 11, 0, + ci->progeny[7], cj->progeny[4], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, + ci->progeny[1], cj->progeny[6], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 3, 0, + ci->progeny[3], cj->progeny[6], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 9, 0, + ci->progeny[5], cj->progeny[6], 1), + s); + scheduler_splittask( + scheduler_addtask(s, task_type_pair, t->subtype, 12, 0, + ci->progeny[7], cj->progeny[6], 1), + s); break; - } + } /* switch(sid) */ } - } /* split this task? */ - - /* Otherwise, break it up if it is too large? */ - else if (scheduler_doforcesplit && ci->split && cj->split && - (ci->count > space_maxsize / cj->count)) { + /* Otherwise, break it up if it is too large? */ + } else if (scheduler_doforcesplit && ci->split && cj->split && + (ci->count > space_maxsize / cj->count)) { // message( "force splitting pair with %i and %i parts." , ci->count , // cj->count ); @@ -485,34 +588,34 @@ void scheduler_splittasks(struct scheduler *s) { if (ci->progeny[j] != NULL) for (int k = 0; k < 8; k++) if (cj->progeny[k] != NULL) { - t = scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, + struct task *tl = + scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, ci->progeny[j], cj->progeny[k], 0); - t->flags = space_getsid(s->space, &t->ci, &t->cj, shift); + scheduler_splittask(tl, s); + tl->flags = space_getsid(s->space, &t->ci, &t->cj, shift); } - } - - /* Otherwise, if not spilt, stitch-up the sorting. */ - else { + /* Otherwise, if not spilt, stitch-up the sorting. */ + } else { /* Create the sort for ci. */ - // lock_lock( &ci->lock ); + lock_lock(&ci->lock); if (ci->sorts == NULL) ci->sorts = scheduler_addtask(s, task_type_sort, task_subtype_none, 1 << sid, 0, ci, NULL, 0); else ci->sorts->flags |= (1 << sid); - // lock_unlock_blind( &ci->lock ); + lock_unlock_blind(&ci->lock); scheduler_addunlock(s, ci->sorts, t); /* Create the sort for cj. */ - // lock_lock( &cj->lock ); + lock_lock(&cj->lock); if (cj->sorts == NULL) cj->sorts = scheduler_addtask(s, task_type_sort, task_subtype_none, 1 << sid, 0, cj, NULL, 0); else cj->sorts->flags |= (1 << sid); - // lock_unlock_blind( &cj->lock ); + lock_unlock_blind(&cj->lock); scheduler_addunlock(s, cj->sorts, t); } @@ -528,8 +631,35 @@ void scheduler_splittasks(struct scheduler *s) { if (ci->gcount == 0) t->type = task_type_none; } /* gravity interaction? */ + } /* iterate over the current task. */ +} + +/** + * @brief Mapper function to split tasks that may be too large. + * + * @param map_data the tasks to process + * @param num_elements the number of tasks. + * @param extra_data The #scheduler we are working in. + */ - } /* loop over all tasks. */ +void scheduler_splittasks_mapper(void *map_data, int num_elements, + void *extra_data) { + + /* Extract the parameters. */ + struct scheduler *s = (struct scheduler *)extra_data; + struct task *tasks = (struct task *)map_data; + + for (int ind = 0; ind < num_elements; ind++) { + struct task *t = &tasks[ind]; + scheduler_splittask(t, s); + } +} + +void scheduler_splittasks(struct scheduler *s) { + + /* Call the mapper on each current task. */ + threadpool_map(s->threadpool, scheduler_splittasks_mapper, s->tasks, + s->nr_tasks, sizeof(struct task), 1000, s); } /** @@ -626,17 +756,25 @@ void scheduler_set_unlocks(struct scheduler *s) { offsets[0] = 0; for (int k = 1; k < s->nr_tasks; k++) offsets[k] = offsets[k - 1] + counts[k - 1]; - for (int k = 0; k < s->nr_tasks; k++) - for (int j = offsets[k]; j < offsets[k + 1]; j++) s->unlock_ind[j] = k; /* Set the unlocks in the tasks. */ for (int k = 0; k < s->nr_tasks; k++) { struct task *t = &s->tasks[k]; t->nr_unlock_tasks = counts[k]; t->unlock_tasks = &s->unlocks[offsets[k]]; - for (int j = offsets[k]; j < offsets[k + 1]; j++) s->unlock_ind[j] = k; } + /* Verify that there are no duplicate unlocks. */ + /* for (int k = 0; k < s->nr_tasks; k++) { + struct task *t = &s->tasks[k]; + for (int i = 0; i < t->nr_unlock_tasks; i++) { + for (int j = i + 1; j < t->nr_unlock_tasks; j++) { + if (t->unlock_tasks[i] == t->unlock_tasks[j]) + error("duplicate unlock!"); + } + } + } */ + /* Clean up. */ free(counts); free(offsets); @@ -655,42 +793,48 @@ void scheduler_ranktasks(struct scheduler *s) { const int nr_tasks = s->nr_tasks; /* Run through the tasks and get all the waits right. */ - for (int k = 0; k < nr_tasks; k++) { - tid[k] = k; - for (int j = 0; j < tasks[k].nr_unlock_tasks; j++) - tasks[k].unlock_tasks[j]->wait += 1; + for (int i = 0; i < nr_tasks; i++) { + struct task *t = &tasks[i]; + + // Increment the waits of the dependances + for (int k = 0; k < t->nr_unlock_tasks; k++) { + t->unlock_tasks[k]->wait++; + } } + /* Load the tids of tasks with no waits. */ + int left = 0; + for (int k = 0; k < nr_tasks; k++) + if (tasks[k].wait == 0) { + tid[left] = k; + left += 1; + } + /* Main loop. */ - for (int j = 0, rank = 0, left = 0; left < nr_tasks; rank++) { - - /* Load the tids of tasks with no waits. */ - for (int k = left; k < nr_tasks; k++) - if (tasks[tid[k]].wait == 0) { - int temp = tid[j]; - tid[j] = tid[k]; - tid[k] = temp; - j += 1; - } + for (int j = 0, rank = 0; left < nr_tasks; rank++) { /* Did we get anything? */ if (j == left) error("Unsatisfiable task dependencies detected."); + const int left_old = left; /* Unlock the next layer of tasks. */ - for (int i = left; i < j; i++) { - struct task *t = &tasks[tid[i]]; + for (; j < left_old; j++) { + struct task *t = &tasks[tid[j]]; t->rank = rank; - tid[i] = t - tasks; - if (tid[i] >= nr_tasks) error("Task index overshoot."); /* message( "task %i of type %s has rank %i." , i , (t->type == task_type_self) ? "self" : (t->type == task_type_pair) ? "pair" : "sort" , rank ); */ - for (int k = 0; k < t->nr_unlock_tasks; k++) - t->unlock_tasks[k]->wait -= 1; + for (int k = 0; k < t->nr_unlock_tasks; k++) { + struct task *u = t->unlock_tasks[k]; + if (--u->wait == 0) { + tid[left] = u - tasks; + left += 1; + } + } } - /* The new left (no, not tony). */ - left = j; + /* Move back to the old left (like Sanders). */ + j = left_old; } #ifdef SWIFT_DEBUG_CHECKS @@ -724,9 +868,6 @@ void scheduler_reset(struct scheduler *s, int size) { error("Failed to allocate task lists."); } - /* Reset the task data. */ - bzero(s->tasks, sizeof(struct task) * size); - /* Reset the counters. */ s->size = size; s->nr_tasks = 0; @@ -735,6 +876,7 @@ void scheduler_reset(struct scheduler *s, int size) { s->mask = 0; s->submask = 0; s->nr_unlocks = 0; + s->completed_unlock_writes = 0; /* Set the task pointers in the queues. */ for (int k = 0; k < s->nr_queues; k++) s->queues[k].tasks = s->tasks; @@ -810,9 +952,6 @@ void scheduler_reweight(struct scheduler *s) { case task_type_kick: t->weight += wscale * t->ci->count; break; - case task_type_drift: - t->weight += wscale * t->ci->count; - break; case task_type_init: t->weight += wscale * t->ci->count; break; @@ -832,6 +971,52 @@ void scheduler_reweight(struct scheduler *s) { message( "task weights are in [ %i , %i ]." , min , max ); */ } +/** + * @brief #threadpool_map function which runs through the task + * graph and re-computes the task wait counters. + */ + +void scheduler_rewait_mapper(void *map_data, int num_elements, + void *extra_data) { + + struct scheduler *s = (struct scheduler *)extra_data; + struct task *tasks = (struct task *)map_data; + + for (int ind = 0; ind < num_elements; ind++) { + struct task *t = &tasks[ind]; + + if (t->skip || !((1 << t->type) & s->mask) || + !((1 << t->subtype) & s->submask)) + continue; + + /* Skip sort tasks that have already been performed */ + if (t->type == task_type_sort && t->flags == 0) { + error("Empty sort task encountered."); + } + + /* Sets the waits of the dependances */ + for (int k = 0; k < t->nr_unlock_tasks; k++) { + struct task *u = t->unlock_tasks[k]; + atomic_inc(&u->wait); + } + } +} + +void scheduler_enqueue_mapper(void *map_data, int num_elements, + void *extra_data) { + struct scheduler *s = (struct scheduler *)extra_data; + const int *tid = (int *)map_data; + struct task *tasks = s->tasks; + for (int ind = 0; ind < num_elements; ind++) { + struct task *t = &tasks[tid[ind]]; + if (atomic_dec(&t->wait) == 1 && !t->skip && ((1 << t->type) & s->mask) && + ((1 << t->subtype) & s->submask)) { + scheduler_enqueue(s, t); + } + } + pthread_cond_broadcast(&s->sleep_cond); +} + /** * @brief Start the scheduler, i.e. fill the queues with ready tasks. * @@ -843,88 +1028,33 @@ void scheduler_reweight(struct scheduler *s) { void scheduler_start(struct scheduler *s, unsigned int mask, unsigned int submask) { - const int nr_tasks = s->nr_tasks; - int *tid = s->tasks_ind; - struct task *tasks = s->tasks; - // ticks tic; + // ticks tic = getticks(); /* Store the masks */ - s->mask = mask | (1 << task_type_rewait); + s->mask = mask; s->submask = submask | (1 << task_subtype_none); /* Clear all the waits and rids. */ - // ticks tic = getticks(); for (int k = 0; k < s->nr_tasks; k++) { s->tasks[k].wait = 1; s->tasks[k].rid = -1; } - // message( "waiting tasks took %.3f %s." , - // clocks_from_ticks(getticks() - tic), clocks_getunit() ); - - /* Enqueue a set of extraenous tasks to set the task waits. */ - struct task *rewait_tasks = &s->tasks[s->nr_tasks]; - const int num_rewait_tasks = s->nr_queues > s->size - s->nr_tasks - ? s->size - s->nr_tasks - : s->nr_queues; - - /* Remember that engine_launch may fiddle with this value. */ - const int waiting_old = s->waiting; - - /* We are going to use the task structure in a modified way to pass - information to the task. Don't do this at home ! - - ci and cj will give the range of tasks to which the waits will be applied - - the flags will be used to transfer the mask - - the rank will be used to transfer the submask - - the rest is unused. - */ - for (int k = 0; k < num_rewait_tasks; k++) { - rewait_tasks[k].type = task_type_rewait; - rewait_tasks[k].ci = (struct cell *)&s->tasks[k * nr_tasks / s->nr_queues]; - rewait_tasks[k].cj = - (struct cell *)&s->tasks[(k + 1) * nr_tasks / s->nr_queues]; - rewait_tasks[k].flags = s->mask; - rewait_tasks[k].rank = s->submask; - rewait_tasks[k].skip = 0; - rewait_tasks[k].wait = 0; - rewait_tasks[k].rid = -1; - rewait_tasks[k].weight = 1; - rewait_tasks[k].implicit = 0; - rewait_tasks[k].nr_unlock_tasks = 0; - scheduler_enqueue(s, &rewait_tasks[k]); - pthread_cond_broadcast(&s->sleep_cond); - } - - /* Wait for the rewait tasks to have executed. */ - pthread_mutex_lock(&s->sleep_mutex); - pthread_cond_broadcast(&s->sleep_cond); - while (s->waiting > waiting_old) { - pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex); - } - pthread_mutex_unlock(&s->sleep_mutex); - /* message("waiting tasks took %.3f %s.", - clocks_from_ticks(getticks() - tic), clocks_getunit());*/ - s->mask = mask; - s->submask = submask | (1 << task_subtype_none); + /* Re-wait the tasks. */ + threadpool_map(s->threadpool, scheduler_rewait_mapper, s->tasks, s->nr_tasks, + sizeof(struct task), 1000, s); /* Loop over the tasks and enqueue whoever is ready. */ - // tic = getticks(); - for (int k = 0; k < s->nr_tasks; k++) { - struct task *t = &tasks[tid[k]]; - if (atomic_dec(&t->wait) == 1 && ((1 << t->type) & s->mask) && - ((1 << t->subtype) & s->submask) && !t->skip) { - scheduler_enqueue(s, t); - pthread_cond_broadcast(&s->sleep_cond); - } - } + threadpool_map(s->threadpool, scheduler_enqueue_mapper, s->tasks_ind, + s->nr_tasks, sizeof(int), 1000, s); /* To be safe, fire of one last sleep_cond in a safe way. */ pthread_mutex_lock(&s->sleep_mutex); pthread_cond_broadcast(&s->sleep_cond); pthread_mutex_unlock(&s->sleep_mutex); - // message( "enqueueing tasks took %.3f %s." , - // clocks_from_ticks( getticks() - tic ), clocks_getunit()); + /* message("enqueueing tasks took %.3f %s." , + clocks_from_ticks( getticks() - tic ), clocks_getunit()); */ } /** @@ -952,7 +1082,6 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { if (t->implicit) { for (int j = 0; j < t->nr_unlock_tasks; j++) { struct task *t2 = t->unlock_tasks[j]; - if (atomic_dec(&t2->wait) == 1) scheduler_enqueue(s, t2); } } @@ -971,7 +1100,6 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { case task_type_sort: case task_type_ghost: case task_type_kick: - case task_type_drift: case task_type_init: qid = t->ci->super->owner; break; @@ -1214,10 +1342,12 @@ struct task *scheduler_gettask(struct scheduler *s, int qid, * @param nr_queues The number of queues in this scheduler. * @param flags The #scheduler flags. * @param nodeID The MPI rank + * @param tp Parallel processing threadpool. */ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks, - int nr_queues, unsigned int flags, int nodeID) { + int nr_queues, unsigned int flags, int nodeID, + struct threadpool *tp) { /* Init the lock. */ lock_init(&s->lock); @@ -1249,6 +1379,7 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks, s->flags = flags; s->space = space; s->nodeID = nodeID; + s->threadpool = tp; /* Init the tasks array. */ s->size = 0; @@ -1282,34 +1413,6 @@ void scheduler_print_tasks(const struct scheduler *s, const char *fileName) { fclose(file); } -/** - * @brief Sets the waits of the dependants of a range of task - * - * @param t_begin Beginning of the #task range - * @param t_end End of the #task range - * @param mask The scheduler task mask - * @param submask The scheduler subtask mask - */ -void scheduler_do_rewait(struct task *t_begin, struct task *t_end, - unsigned int mask, unsigned int submask) { - for (struct task *t2 = t_begin; t2 != t_end; t2++) { - - if (t2->skip) continue; - - /* Skip tasks not in the mask */ - if (!((1 << t2->type) & mask) || !((1 << t2->subtype) & submask)) continue; - - /* Skip sort tasks that have already been performed */ - if (t2->type == task_type_sort && t2->flags == 0) continue; - - /* Sets the waits of the dependances */ - for (int k = 0; k < t2->nr_unlock_tasks; k++) { - struct task *t3 = t2->unlock_tasks[k]; - atomic_inc(&t3->wait); - } - } -} - /** * @brief Frees up the memory allocated for this #scheduler */ diff --git a/src/scheduler.h b/src/scheduler.h index fcff27abfe7eaddead3e7c0f67ae544907ce6ce6..3e51197de148ee8f0bf3090551568715d3000e98 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -36,6 +36,7 @@ #include "lock.h" #include "queue.h" #include "task.h" +#include "threadpool.h" /* Some constants. */ #define scheduler_maxwait 3 @@ -83,9 +84,9 @@ struct scheduler { int *tasks_ind; /* The task unlocks. */ - struct task **unlocks; - int *unlock_ind; - int nr_unlocks, size_unlocks; + struct task **volatile unlocks; + int *volatile unlock_ind; + volatile int nr_unlocks, size_unlocks, completed_unlock_writes; /* Lock for this scheduler. */ swift_lock_type lock; @@ -97,13 +98,17 @@ struct scheduler { /* The space associated with this scheduler. */ struct space *space; + /* Threadpool to use internally for mundane parallel work. */ + struct threadpool *threadpool; + /* The node we are working on. */ int nodeID; }; /* Function prototypes. */ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks, - int nr_queues, unsigned int flags, int nodeID); + int nr_queues, unsigned int flags, int nodeID, + struct threadpool *tp); struct task *scheduler_gettask(struct scheduler *s, int qid, const struct task *prev); void scheduler_enqueue(struct scheduler *s, struct task *t); @@ -122,8 +127,6 @@ void scheduler_addunlock(struct scheduler *s, struct task *ta, struct task *tb); void scheduler_set_unlocks(struct scheduler *s); void scheduler_dump_queue(struct scheduler *s); void scheduler_print_tasks(const struct scheduler *s, const char *fileName); -void scheduler_do_rewait(struct task *t_begin, struct task *t_end, - unsigned int mask, unsigned int submask); void scheduler_clean(struct scheduler *s); #endif /* SWIFT_SCHEDULER_H */ diff --git a/src/space.c b/src/space.c index f4cb21e3ca6f82f04dedf8c39f9fde6d6364fc32..8b23a4e4abc6c9d97953d7fa866ba12bba2de4f4 100644 --- a/src/space.c +++ b/src/space.c @@ -50,11 +50,9 @@ #include "lock.h" #include "minmax.h" #include "runner.h" +#include "threadpool.h" #include "tools.h" -/* Shared sort structure. */ -struct parallel_sort space_sort_struct; - /* Split size. */ int space_splitsize = space_splitsize_default; int space_subsize = space_subsize_default; @@ -167,19 +165,20 @@ void space_regrid(struct space *s, double cell_max, int verbose) { struct cell *restrict c; ticks tic = getticks(); - /* Run through the parts and get the current h_max. */ + /* Run through the cells and get the current h_max. */ // tic = getticks(); float h_max = s->cell_min / kernel_gamma / space_stretch; if (nr_parts > 0) { if (s->cells != NULL) { for (int k = 0; k < s->nr_cells; k++) { - if (s->cells[k].h_max > h_max) h_max = s->cells[k].h_max; + if (s->cells[k].nodeID == engine_rank && s->cells[k].h_max > h_max) { + h_max = s->cells[k].h_max; + } } } else { for (size_t k = 0; k < nr_parts; k++) { if (s->parts[k].h > h_max) h_max = s->parts[k].h; } - s->h_max = h_max; } } @@ -363,7 +362,6 @@ void space_regrid(struct space *s, double cell_max, int verbose) { s->cells[k].init = NULL; s->cells[k].extra_ghost = NULL; s->cells[k].ghost = NULL; - s->cells[k].drift = NULL; s->cells[k].kick = NULL; s->cells[k].super = &s->cells[k]; } @@ -678,10 +676,8 @@ void space_split(struct space *s, struct cell *cells, int verbose) { const ticks tic = getticks(); - for (int k = 0; k < s->nr_cells; k++) - scheduler_addtask(&s->e->sched, task_type_split_cell, task_subtype_none, k, - 0, &cells[k], NULL, 0); - engine_launch(s->e, s->e->nr_threads, 1 << task_type_split_cell, 0); + threadpool_map(&s->e->threadpool, space_split_mapper, cells, s->nr_cells, + sizeof(struct cell), 1, s); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), @@ -705,29 +701,32 @@ void space_parts_sort(struct space *s, int *ind, size_t N, int min, int max, const ticks tic = getticks(); - /*Populate the global parallel_sort structure with the input data */ - space_sort_struct.parts = s->parts; - space_sort_struct.xparts = s->xparts; - space_sort_struct.ind = ind; - space_sort_struct.stack_size = 2 * (max - min + 1) + 10 + s->e->nr_threads; - if ((space_sort_struct.stack = malloc(sizeof(struct qstack) * - space_sort_struct.stack_size)) == NULL) + /* Populate a parallel_sort structure with the input data */ + struct parallel_sort sort_struct; + sort_struct.parts = s->parts; + sort_struct.xparts = s->xparts; + sort_struct.ind = ind; + sort_struct.stack_size = 2 * (max - min + 1) + 10 + s->e->nr_threads; + if ((sort_struct.stack = + malloc(sizeof(struct qstack) * sort_struct.stack_size)) == NULL) error("Failed to allocate sorting stack."); - for (int i = 0; i < space_sort_struct.stack_size; i++) - space_sort_struct.stack[i].ready = 0; + for (int i = 0; i < sort_struct.stack_size; i++) + sort_struct.stack[i].ready = 0; /* Add the first interval. */ - space_sort_struct.stack[0].i = 0; - space_sort_struct.stack[0].j = N - 1; - space_sort_struct.stack[0].min = min; - space_sort_struct.stack[0].max = max; - space_sort_struct.stack[0].ready = 1; - space_sort_struct.first = 0; - space_sort_struct.last = 1; - space_sort_struct.waiting = 1; - - /* Launch the sorting tasks. */ - engine_launch(s->e, s->e->nr_threads, (1 << task_type_part_sort), 0); + sort_struct.stack[0].i = 0; + sort_struct.stack[0].j = N - 1; + sort_struct.stack[0].min = min; + sort_struct.stack[0].max = max; + sort_struct.stack[0].ready = 1; + sort_struct.first = 0; + sort_struct.last = 1; + sort_struct.waiting = 1; + + /* Launch the sorting tasks with a stride of zero such that the same + map data is passed to each thread. */ + threadpool_map(&s->e->threadpool, space_parts_sort_mapper, &sort_struct, + s->e->threadpool.num_threads, 0, 1, NULL); #ifdef SWIFT_DEBUG_CHECKS /* Verify space_sort_struct. */ @@ -739,37 +738,40 @@ void space_parts_sort(struct space *s, int *ind, size_t N, int min, int max, #endif /* Clean up. */ - free(space_sort_struct.stack); + free(sort_struct.stack); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), clocks_getunit()); } -void space_do_parts_sort() { +void space_parts_sort_mapper(void *map_data, int num_elements, + void *extra_data) { + + /* Unpack the mapping data. */ + struct parallel_sort *sort_struct = (struct parallel_sort *)map_data; /* Pointers to the sorting data. */ - int *ind = space_sort_struct.ind; - struct part *parts = space_sort_struct.parts; - struct xpart *xparts = space_sort_struct.xparts; + int *ind = sort_struct->ind; + struct part *parts = sort_struct->parts; + struct xpart *xparts = sort_struct->xparts; /* Main loop. */ - while (space_sort_struct.waiting) { + while (sort_struct->waiting) { /* Grab an interval off the queue. */ - int qid = - atomic_inc(&space_sort_struct.first) % space_sort_struct.stack_size; + int qid = atomic_inc(&sort_struct->first) % sort_struct->stack_size; /* Wait for the entry to be ready, or for the sorting do be done. */ - while (!space_sort_struct.stack[qid].ready) - if (!space_sort_struct.waiting) return; + while (!sort_struct->stack[qid].ready) + if (!sort_struct->waiting) return; /* Get the stack entry. */ - ptrdiff_t i = space_sort_struct.stack[qid].i; - ptrdiff_t j = space_sort_struct.stack[qid].j; - int min = space_sort_struct.stack[qid].min; - int max = space_sort_struct.stack[qid].max; - space_sort_struct.stack[qid].ready = 0; + ptrdiff_t i = sort_struct->stack[qid].i; + ptrdiff_t j = sort_struct->stack[qid].j; + int min = sort_struct->stack[qid].min; + int max = sort_struct->stack[qid].max; + sort_struct->stack[qid].ready = 0; /* Loop over sub-intervals. */ while (1) { @@ -819,18 +821,16 @@ void space_do_parts_sort() { /* Recurse on the left? */ if (jj > i && pivot > min) { - qid = atomic_inc(&space_sort_struct.last) % - space_sort_struct.stack_size; - while (space_sort_struct.stack[qid].ready) + qid = atomic_inc(&sort_struct->last) % sort_struct->stack_size; + while (sort_struct->stack[qid].ready) ; - space_sort_struct.stack[qid].i = i; - space_sort_struct.stack[qid].j = jj; - space_sort_struct.stack[qid].min = min; - space_sort_struct.stack[qid].max = pivot; - if (atomic_inc(&space_sort_struct.waiting) >= - space_sort_struct.stack_size) + sort_struct->stack[qid].i = i; + sort_struct->stack[qid].j = jj; + sort_struct->stack[qid].min = min; + sort_struct->stack[qid].max = pivot; + if (atomic_inc(&sort_struct->waiting) >= sort_struct->stack_size) error("Qstack overflow."); - space_sort_struct.stack[qid].ready = 1; + sort_struct->stack[qid].ready = 1; } /* Recurse on the right? */ @@ -844,18 +844,16 @@ void space_do_parts_sort() { /* Recurse on the right? */ if (pivot + 1 < max) { - qid = atomic_inc(&space_sort_struct.last) % - space_sort_struct.stack_size; - while (space_sort_struct.stack[qid].ready) + qid = atomic_inc(&sort_struct->last) % sort_struct->stack_size; + while (sort_struct->stack[qid].ready) ; - space_sort_struct.stack[qid].i = jj + 1; - space_sort_struct.stack[qid].j = j; - space_sort_struct.stack[qid].min = pivot + 1; - space_sort_struct.stack[qid].max = max; - if (atomic_inc(&space_sort_struct.waiting) >= - space_sort_struct.stack_size) + sort_struct->stack[qid].i = jj + 1; + sort_struct->stack[qid].j = j; + sort_struct->stack[qid].min = pivot + 1; + sort_struct->stack[qid].max = max; + if (atomic_inc(&sort_struct->waiting) >= sort_struct->stack_size) error("Qstack overflow."); - space_sort_struct.stack[qid].ready = 1; + sort_struct->stack[qid].ready = 1; } /* Recurse on the left? */ @@ -868,7 +866,7 @@ void space_do_parts_sort() { } /* loop over sub-intervals. */ - atomic_dec(&space_sort_struct.waiting); + atomic_dec(&sort_struct->waiting); } /* main loop. */ } @@ -889,28 +887,31 @@ void space_gparts_sort(struct space *s, int *ind, size_t N, int min, int max, const ticks tic = getticks(); - /*Populate the global parallel_sort structure with the input data */ - space_sort_struct.gparts = s->gparts; - space_sort_struct.ind = ind; - space_sort_struct.stack_size = 2 * (max - min + 1) + 10 + s->e->nr_threads; - if ((space_sort_struct.stack = malloc(sizeof(struct qstack) * - space_sort_struct.stack_size)) == NULL) + /*Populate a global parallel_sort structure with the input data */ + struct parallel_sort sort_struct; + sort_struct.gparts = s->gparts; + sort_struct.ind = ind; + sort_struct.stack_size = 2 * (max - min + 1) + 10 + s->e->nr_threads; + if ((sort_struct.stack = + malloc(sizeof(struct qstack) * sort_struct.stack_size)) == NULL) error("Failed to allocate sorting stack."); - for (int i = 0; i < space_sort_struct.stack_size; i++) - space_sort_struct.stack[i].ready = 0; + for (int i = 0; i < sort_struct.stack_size; i++) + sort_struct.stack[i].ready = 0; /* Add the first interval. */ - space_sort_struct.stack[0].i = 0; - space_sort_struct.stack[0].j = N - 1; - space_sort_struct.stack[0].min = min; - space_sort_struct.stack[0].max = max; - space_sort_struct.stack[0].ready = 1; - space_sort_struct.first = 0; - space_sort_struct.last = 1; - space_sort_struct.waiting = 1; - - /* Launch the sorting tasks. */ - engine_launch(s->e, s->e->nr_threads, (1 << task_type_gpart_sort), 0); + sort_struct.stack[0].i = 0; + sort_struct.stack[0].j = N - 1; + sort_struct.stack[0].min = min; + sort_struct.stack[0].max = max; + sort_struct.stack[0].ready = 1; + sort_struct.first = 0; + sort_struct.last = 1; + sort_struct.waiting = 1; + + /* Launch the sorting tasks with a stride of zero such that the same + map data is passed to each thread. */ + threadpool_map(&s->e->threadpool, space_gparts_sort_mapper, &sort_struct, + s->e->threadpool.num_threads, 0, 1, NULL); #ifdef SWIFT_DEBUG_CHECKS /* Verify space_sort_struct. */ @@ -922,36 +923,39 @@ void space_gparts_sort(struct space *s, int *ind, size_t N, int min, int max, #endif /* Clean up. */ - free(space_sort_struct.stack); + free(sort_struct.stack); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), clocks_getunit()); } -void space_do_gparts_sort() { +void space_gparts_sort_mapper(void *map_data, int num_elements, + void *extra_data) { + + /* Unpack the mapping data. */ + struct parallel_sort *sort_struct = (struct parallel_sort *)map_data; /* Pointers to the sorting data. */ - int *ind = space_sort_struct.ind; - struct gpart *gparts = space_sort_struct.gparts; + int *ind = sort_struct->ind; + struct gpart *gparts = sort_struct->gparts; /* Main loop. */ - while (space_sort_struct.waiting) { + while (sort_struct->waiting) { /* Grab an interval off the queue. */ - int qid = - atomic_inc(&space_sort_struct.first) % space_sort_struct.stack_size; + int qid = atomic_inc(&sort_struct->first) % sort_struct->stack_size; /* Wait for the entry to be ready, or for the sorting do be done. */ - while (!space_sort_struct.stack[qid].ready) - if (!space_sort_struct.waiting) return; + while (!sort_struct->stack[qid].ready) + if (!sort_struct->waiting) return; /* Get the stack entry. */ - ptrdiff_t i = space_sort_struct.stack[qid].i; - ptrdiff_t j = space_sort_struct.stack[qid].j; - int min = space_sort_struct.stack[qid].min; - int max = space_sort_struct.stack[qid].max; - space_sort_struct.stack[qid].ready = 0; + ptrdiff_t i = sort_struct->stack[qid].i; + ptrdiff_t j = sort_struct->stack[qid].j; + int min = sort_struct->stack[qid].min; + int max = sort_struct->stack[qid].max; + sort_struct->stack[qid].ready = 0; /* Loop over sub-intervals. */ while (1) { @@ -998,18 +1002,16 @@ void space_do_gparts_sort() { /* Recurse on the left? */ if (jj > i && pivot > min) { - qid = atomic_inc(&space_sort_struct.last) % - space_sort_struct.stack_size; - while (space_sort_struct.stack[qid].ready) + qid = atomic_inc(&sort_struct->last) % sort_struct->stack_size; + while (sort_struct->stack[qid].ready) ; - space_sort_struct.stack[qid].i = i; - space_sort_struct.stack[qid].j = jj; - space_sort_struct.stack[qid].min = min; - space_sort_struct.stack[qid].max = pivot; - if (atomic_inc(&space_sort_struct.waiting) >= - space_sort_struct.stack_size) + sort_struct->stack[qid].i = i; + sort_struct->stack[qid].j = jj; + sort_struct->stack[qid].min = min; + sort_struct->stack[qid].max = pivot; + if (atomic_inc(&sort_struct->waiting) >= sort_struct->stack_size) error("Qstack overflow."); - space_sort_struct.stack[qid].ready = 1; + sort_struct->stack[qid].ready = 1; } /* Recurse on the right? */ @@ -1023,18 +1025,16 @@ void space_do_gparts_sort() { /* Recurse on the right? */ if (pivot + 1 < max) { - qid = atomic_inc(&space_sort_struct.last) % - space_sort_struct.stack_size; - while (space_sort_struct.stack[qid].ready) + qid = atomic_inc(&sort_struct->last) % sort_struct->stack_size; + while (sort_struct->stack[qid].ready) ; - space_sort_struct.stack[qid].i = jj + 1; - space_sort_struct.stack[qid].j = j; - space_sort_struct.stack[qid].min = pivot + 1; - space_sort_struct.stack[qid].max = max; - if (atomic_inc(&space_sort_struct.waiting) >= - space_sort_struct.stack_size) + sort_struct->stack[qid].i = jj + 1; + sort_struct->stack[qid].j = j; + sort_struct->stack[qid].min = pivot + 1; + sort_struct->stack[qid].max = max; + if (atomic_inc(&sort_struct->waiting) >= sort_struct->stack_size) error("Qstack overflow."); - space_sort_struct.stack[qid].ready = 1; + sort_struct->stack[qid].ready = 1; } /* Recurse on the left? */ @@ -1047,7 +1047,7 @@ void space_do_gparts_sort() { } /* loop over sub-intervals. */ - atomic_dec(&space_sort_struct.waiting); + atomic_dec(&sort_struct->waiting); } /* main loop. */ } @@ -1228,73 +1228,113 @@ void space_map_cells_pre(struct space *s, int full, } /** - * @brief Split cells that contain too many particles. - * - * @param s The #space we are working in. - * @param c The #cell under consideration. + * @brief #threadpool mapper function to split cells if they contain + * too many particles. */ -void space_do_split(struct space *s, struct cell *c) { - - const int count = c->count; - const int gcount = c->gcount; - int maxdepth = 0; - float h_max = 0.0f; - int ti_end_min = max_nr_timesteps, ti_end_max = 0; - struct cell *temp; - struct part *parts = c->parts; - struct gpart *gparts = c->gparts; - struct xpart *xparts = c->xparts; - - /* Check the depth. */ - if (c->depth > s->maxdepth) s->maxdepth = c->depth; - - /* Split or let it be? */ - if (count > space_splitsize || gcount > space_splitsize) { - - /* No longer just a leaf. */ - c->split = 1; - - /* Create the cell's progeny. */ - for (int k = 0; k < 8; k++) { - temp = space_getcell(s); - temp->count = 0; - temp->gcount = 0; - temp->loc[0] = c->loc[0]; - temp->loc[1] = c->loc[1]; - temp->loc[2] = c->loc[2]; - temp->width[0] = c->width[0] / 2; - temp->width[1] = c->width[1] / 2; - temp->width[2] = c->width[2] / 2; - temp->dmin = c->dmin / 2; - if (k & 4) temp->loc[0] += temp->width[0]; - if (k & 2) temp->loc[1] += temp->width[1]; - if (k & 1) temp->loc[2] += temp->width[2]; - temp->depth = c->depth + 1; - temp->split = 0; - temp->h_max = 0.0; - temp->dx_max = 0.f; - temp->nodeID = c->nodeID; - temp->parent = c; - c->progeny[k] = temp; - } +void space_split_mapper(void *map_data, int num_elements, void *extra_data) { + + /* Unpack the inputs. */ + struct space *s = (struct space *)extra_data; + struct cell *cells = (struct cell *)map_data; + + for (int ind = 0; ind < num_elements; ind++) { + + struct cell *c = &cells[ind]; + + const int count = c->count; + const int gcount = c->gcount; + int maxdepth = 0; + float h_max = 0.0f; + int ti_end_min = max_nr_timesteps, ti_end_max = 0; + struct cell *temp; + struct part *parts = c->parts; + struct gpart *gparts = c->gparts; + struct xpart *xparts = c->xparts; + + /* Check the depth. */ + if (c->depth > s->maxdepth) s->maxdepth = c->depth; + + /* Split or let it be? */ + if (count > space_splitsize || gcount > space_splitsize) { + + /* No longer just a leaf. */ + c->split = 1; + + /* Create the cell's progeny. */ + for (int k = 0; k < 8; k++) { + temp = space_getcell(s); + temp->count = 0; + temp->gcount = 0; + temp->loc[0] = c->loc[0]; + temp->loc[1] = c->loc[1]; + temp->loc[2] = c->loc[2]; + temp->width[0] = c->width[0] / 2; + temp->width[1] = c->width[1] / 2; + temp->width[2] = c->width[2] / 2; + temp->dmin = c->dmin / 2; + if (k & 4) temp->loc[0] += temp->width[0]; + if (k & 2) temp->loc[1] += temp->width[1]; + if (k & 1) temp->loc[2] += temp->width[2]; + temp->depth = c->depth + 1; + temp->split = 0; + temp->h_max = 0.0; + temp->dx_max = 0.f; + temp->nodeID = c->nodeID; + temp->parent = c; + c->progeny[k] = temp; + } - /* Split the cell data. */ - cell_split(c, c->parts - s->parts); + /* Split the cell data. */ + cell_split(c, c->parts - s->parts); + + /* Remove any progeny with zero parts. */ + for (int k = 0; k < 8; k++) + if (c->progeny[k]->count == 0 && c->progeny[k]->gcount == 0) { + space_recycle(s, c->progeny[k]); + c->progeny[k] = NULL; + } else { + space_split_mapper(c->progeny[k], 1, s); + h_max = fmaxf(h_max, c->progeny[k]->h_max); + ti_end_min = min(ti_end_min, c->progeny[k]->ti_end_min); + ti_end_max = max(ti_end_max, c->progeny[k]->ti_end_max); + if (c->progeny[k]->maxdepth > maxdepth) + maxdepth = c->progeny[k]->maxdepth; + } - /* Remove any progeny with zero parts. */ - for (int k = 0; k < 8; k++) - if (c->progeny[k]->count == 0 && c->progeny[k]->gcount == 0) { - space_recycle(s, c->progeny[k]); - c->progeny[k] = NULL; - } else { - space_do_split(s, c->progeny[k]); - h_max = fmaxf(h_max, c->progeny[k]->h_max); - ti_end_min = min(ti_end_min, c->progeny[k]->ti_end_min); - ti_end_max = max(ti_end_max, c->progeny[k]->ti_end_max); - if (c->progeny[k]->maxdepth > maxdepth) - maxdepth = c->progeny[k]->maxdepth; + } + + /* Otherwise, collect the data for this cell. */ + else { + + /* Clear the progeny. */ + bzero(c->progeny, sizeof(struct cell *) * 8); + c->split = 0; + maxdepth = c->depth; + + /* Get dt_min/dt_max. */ + for (int k = 0; k < count; k++) { + struct part *p = &parts[k]; + struct xpart *xp = &xparts[k]; + const float h = p->h; + const int ti_end = p->ti_end; + xp->x_diff[0] = 0.f; + xp->x_diff[1] = 0.f; + xp->x_diff[2] = 0.f; + if (h > h_max) h_max = h; + if (ti_end < ti_end_min) ti_end_min = ti_end; + if (ti_end > ti_end_max) ti_end_max = ti_end; } + for (int k = 0; k < gcount; k++) { + struct gpart *gp = &gparts[k]; + const int ti_end = gp->ti_end; + gp->x_diff[0] = 0.f; + gp->x_diff[1] = 0.f; + gp->x_diff[2] = 0.f; + if (ti_end < ti_end_min) ti_end_min = ti_end; + if (ti_end > ti_end_max) ti_end_max = ti_end; + } + } /* Set the values for this cell. */ c->h_max = h_max; @@ -1302,52 +1342,16 @@ void space_do_split(struct space *s, struct cell *c) { c->ti_end_max = ti_end_max; c->maxdepth = maxdepth; + /* Set ownership according to the start of the parts array. */ + if (s->nr_parts > 0) + c->owner = + ((c->parts - s->parts) % s->nr_parts) * s->nr_queues / s->nr_parts; + else if (s->nr_gparts > 0) + c->owner = ((c->gparts - s->gparts) % s->nr_gparts) * s->nr_queues / + s->nr_gparts; + else + c->owner = 0; /* Ok, there is really nothing on this rank... */ } - - /* Otherwise, collect the data for this cell. */ - else { - - /* Clear the progeny. */ - bzero(c->progeny, sizeof(struct cell *) * 8); - c->split = 0; - c->maxdepth = c->depth; - - /* Get dt_min/dt_max. */ - for (int k = 0; k < count; k++) { - struct part *p = &parts[k]; - struct xpart *xp = &xparts[k]; - const float h = p->h; - const int ti_end = p->ti_end; - xp->x_diff[0] = 0.f; - xp->x_diff[1] = 0.f; - xp->x_diff[2] = 0.f; - if (h > h_max) h_max = h; - if (ti_end < ti_end_min) ti_end_min = ti_end; - if (ti_end > ti_end_max) ti_end_max = ti_end; - } - for (int k = 0; k < gcount; k++) { - struct gpart *gp = &gparts[k]; - const int ti_end = gp->ti_end; - gp->x_diff[0] = 0.f; - gp->x_diff[1] = 0.f; - gp->x_diff[2] = 0.f; - if (ti_end < ti_end_min) ti_end_min = ti_end; - if (ti_end > ti_end_max) ti_end_max = ti_end; - } - c->h_max = h_max; - c->ti_end_min = ti_end_min; - c->ti_end_max = ti_end_max; - } - - /* Set ownership according to the start of the parts array. */ - if (s->nr_parts > 0) - c->owner = - ((c->parts - s->parts) % s->nr_parts) * s->nr_queues / s->nr_parts; - else if (s->nr_gparts > 0) - c->owner = - ((c->gparts - s->gparts) % s->nr_gparts) * s->nr_queues / s->nr_gparts; - else - c->owner = 0; /* Ok, there is really nothing on this rank... */ } /** diff --git a/src/space.h b/src/space.h index 6fe3681c85068979a555ff1d78e32ba7577cf3f0..90313be8dbe817d65fbd0e6a8c30c156747594b1 100644 --- a/src/space.h +++ b/src/space.h @@ -68,8 +68,8 @@ struct space { /* Cell widths. */ double width[3], iwidth[3]; - /* The minimum and maximum cutoff radii. */ - double h_max, cell_min; + /* The minimum cell width. */ + double cell_min; /* Current maximum displacement for particles. */ float dx_max; @@ -132,7 +132,6 @@ struct parallel_sort { unsigned int stack_size; volatile unsigned int first, last, waiting; }; -extern struct parallel_sort space_sort_struct; /* function prototypes. */ void space_parts_sort(struct space *s, int *ind, size_t N, int min, int max, @@ -156,10 +155,14 @@ void space_map_parts_xparts(struct space *s, struct cell *c)); void space_map_cells_post(struct space *s, int full, void (*fun)(struct cell *c, void *data), void *data); +void space_parts_sort_mapper(void *map_data, int num_elements, + void *extra_data); +void space_gparts_sort_mapper(void *map_data, int num_elements, + void *extra_data); void space_rebuild(struct space *s, double h_max, int verbose); void space_recycle(struct space *s, struct cell *c); void space_split(struct space *s, struct cell *cells, int verbose); -void space_do_split(struct space *s, struct cell *c); +void space_split_mapper(void *map_data, int num_elements, void *extra_data); void space_do_parts_sort(); void space_do_gparts_sort(); void space_init_parts(struct space *s); diff --git a/src/task.c b/src/task.c index dc602ea79282459f792183db879b07072c79a246..37d6623e81d7f4585f9906aad03e1dcf0ee2e4c2 100644 --- a/src/task.c +++ b/src/task.c @@ -49,10 +49,9 @@ /* Task type names. */ const char *taskID_names[task_type_count] = { "none", "sort", "self", "pair", "sub_self", - "sub_pair", "init", "ghost", "extra_ghost", "drift", + "sub_pair", "init", "ghost", "extra_ghost", "kick", "kick_fixdt", "send", "recv", "grav_gather_m", - "grav_fft", "grav_mm", "grav_up", "grav_external", "part_sort", - "gpart_sort", "split_cell", "rewait"}; + "grav_fft", "grav_mm", "grav_up", "grav_external"}; const char *subtaskID_names[task_subtype_count] = { "none", "density", "gradient", "force", "grav", "tend"}; @@ -139,7 +138,6 @@ __attribute__((always_inline)) INLINE static enum task_actions task_acts_on( break; case task_type_init: - case task_type_drift: case task_type_kick: case task_type_kick_fixdt: case task_type_send: @@ -158,15 +156,8 @@ __attribute__((always_inline)) INLINE static enum task_actions task_acts_on( return task_action_gpart; break; - case task_type_part_sort: - case task_type_gpart_sort: - case task_type_split_cell: - case task_type_rewait: - return task_action_none; - break; - default: - error("Unknow task_action for task"); + error("Unknown task_action for task"); return task_action_none; break; } diff --git a/src/task.h b/src/task.h index 58192cd15ed8a74112396faf9037c3e503b0c4de..d174bad4fac62332b3d2c43997be4ddbf794ea34 100644 --- a/src/task.h +++ b/src/task.h @@ -42,7 +42,6 @@ enum task_types { task_type_init, task_type_ghost, task_type_extra_ghost, - task_type_drift, task_type_kick, task_type_kick_fixdt, task_type_send, @@ -52,10 +51,6 @@ enum task_types { task_type_grav_mm, task_type_grav_up, task_type_grav_external, - task_type_part_sort, - task_type_gpart_sort, - task_type_split_cell, - task_type_rewait, task_type_count }; diff --git a/src/threadpool.c b/src/threadpool.c new file mode 100644 index 0000000000000000000000000000000000000000..795fa4d557e4576e7cd71fc7ef554cee880f3974 --- /dev/null +++ b/src/threadpool.c @@ -0,0 +1,156 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2016 Pedro Gonnet (pedro.gonnet@durham.ac.uk) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +/* Config parameters. */ +#include "../config.h" + +/* Some standard headers. */ +#include <float.h> +#include <limits.h> +#include <math.h> +#include <stdlib.h> +#include <string.h> + +/* This object's header. */ +#include "threadpool.h" + +/* Local headers. */ +#include "atomic.h" +#include "error.h" + +void *threadpool_runner(void *data) { + + /* Our threadpool. */ + struct threadpool *tp = (struct threadpool *)data; + + /* Main loop. */ + while (1) { + + /* Let the controller know that this thread is waiting. */ + pthread_mutex_lock(&tp->thread_mutex); + tp->num_threads_waiting += 1; + if (tp->num_threads_waiting == tp->num_threads) { + pthread_cond_signal(&tp->control_cond); + } + + /* Wait for the controller. */ + pthread_cond_wait(&tp->thread_cond, &tp->thread_mutex); + tp->num_threads_waiting -= 1; + tp->num_threads_running += 1; + if (tp->num_threads_running == tp->num_threads) { + pthread_cond_signal(&tp->control_cond); + } + pthread_mutex_unlock(&tp->thread_mutex); + + /* The index of the mapping task we will work on next. */ + size_t task_ind; + while ((task_ind = atomic_add(&tp->map_data_count, tp->map_data_chunk)) < + tp->map_data_size) { + const int num_elements = task_ind + tp->map_data_chunk > tp->map_data_size + ? tp->map_data_size - task_ind + : tp->map_data_chunk; + tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind), + num_elements, tp->map_extra_data); + } + } +} + +void threadpool_init(struct threadpool *tp, int num_threads) { + + /* Initialize the thread counters. */ + tp->num_threads = num_threads; + tp->num_threads_waiting = 0; + + /* Init the threadpool mutexes. */ + if (pthread_mutex_init(&tp->thread_mutex, NULL) != 0) + error("Failed to initialize mutexex."); + if (pthread_cond_init(&tp->control_cond, NULL) != 0 || + pthread_cond_init(&tp->thread_cond, NULL) != 0) + error("Failed to initialize condition variables."); + + /* Set the task counter to zero. */ + tp->map_data_size = 0; + tp->map_data_count = 0; + tp->map_data_stride = 0; + tp->map_data_chunk = 0; + tp->map_function = NULL; + + /* Allocate the threads. */ + if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads)) == + NULL) { + error("Failed to allocate thread array."); + } + + /* Create and start the threads. */ + pthread_mutex_lock(&tp->thread_mutex); + for (int k = 0; k < num_threads; k++) { + if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0) + error("Failed to create threadpool runner thread."); + } + + /* Wait for all the threads to be up and running. */ + while (tp->num_threads_waiting < tp->num_threads) { + pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); + } + pthread_mutex_unlock(&tp->thread_mutex); +} + +/** + * @brief Map a function to an array of data in parallel using a #threadpool. + * + * The function @c map_function is called on each element of @c map_data + * in parallel. + * + * @param tp The #threadpool on which to run. + * @param map_function The function that will be applied to the map data. + * @param map_data The data on which the mapping function will be called. + * @param N Number of elements in @c map_data. + * @param stride Size, in bytes, of each element of @c map_data. + * @param chunk Number of map data elements to pass to the function at a time. + * @param extra_data Addtitional pointer that will be passed to the mapping + * function, may contain additional data. + */ + +void threadpool_map(struct threadpool *tp, threadpool_map_function map_function, + void *map_data, size_t N, int stride, int chunk, + void *extra_data) { + + /* Set the map data and signal the threads. */ + pthread_mutex_lock(&tp->thread_mutex); + tp->map_data_stride = stride; + tp->map_data_size = N; + tp->map_data_count = 0; + tp->map_data_chunk = chunk; + tp->map_function = map_function; + tp->map_data = map_data; + tp->map_extra_data = extra_data; + tp->num_threads_running = 0; + pthread_cond_broadcast(&tp->thread_cond); + + /* Wait for all the threads to be up and running. */ + while (tp->num_threads_running < tp->num_threads) { + pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); + } + + /* Wait for all threads to be done. */ + while (tp->num_threads_waiting < tp->num_threads) { + pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); + } + pthread_mutex_unlock(&tp->thread_mutex); +} diff --git a/src/threadpool.h b/src/threadpool.h new file mode 100644 index 0000000000000000000000000000000000000000..e07a0613c0595bc3c280f97cd8f28e5705bf313a --- /dev/null +++ b/src/threadpool.h @@ -0,0 +1,61 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2016 Pedro Gonnet (pedro.gonnet@durham.ac.uk) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ +#ifndef SWIFT_THREADPOOL_H +#define SWIFT_THREADPOOL_H + +/* Config parameters. */ +#include "../config.h" + +/* Some standard headers. */ +#include <pthread.h> + +/* Function type for mappings. */ +typedef void (*threadpool_map_function)(void *map_data, int num_elements, + void *extra_data); + +/* Data of a threadpool. */ +struct threadpool { + + /* Number of threads in this pool. */ + int num_threads; + + /* The threads themselves. */ + pthread_t *threads; + + /* This is where threads go to rest. */ + pthread_mutex_t thread_mutex; + pthread_cond_t control_cond, thread_cond; + + /* Current map data and count. */ + void *map_data, *map_extra_data; + volatile size_t map_data_count, map_data_size, map_data_stride, + map_data_chunk; + volatile threadpool_map_function map_function; + + /* Counter for the number of threads that are done. */ + volatile int num_threads_waiting, num_threads_running; +}; + +/* Function prototypes. */ +void threadpool_init(struct threadpool *tp, int num_threads); +void threadpool_map(struct threadpool *tp, threadpool_map_function map_function, + void *map_data, size_t N, int stride, int chunk, + void *extra_data); + +#endif /* SWIFT_THREADPOOL_H */ diff --git a/tests/Makefile.am b/tests/Makefile.am index ac99573af98da665be254defd11beac91920e865..a0db53bbfdda3715362f55deb5e4c4aa4c3dd880 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -25,12 +25,13 @@ TESTS = testGreetings testMaths testReading.sh testSingle testKernel testSymmetr testPair.sh testPairPerturbed.sh test27cells.sh test27cellsPerturbed.sh \ testParser.sh testSPHStep test125cells.sh testKernelGrav testFFT \ testAdiabaticIndex testRiemannExact testRiemannTRRS testRiemannHLLC \ - testMatrixInversion + testMatrixInversion threadpool_test # List of test programs to compile check_PROGRAMS = testGreetings testReading testSingle testTimeIntegration \ testSPHStep testPair test27cells test125cells testParser \ - testKernel testKernelGrav testFFT testInteractions testMaths testSymmetry \ + testKernel testKernelGrav testFFT testInteractions testMaths \ + testSymmetry threadpool_test \ testAdiabaticIndex testRiemannExact testRiemannTRRS \ testRiemannHLLC testMatrixInversion @@ -75,6 +76,8 @@ testRiemannHLLC_SOURCES = testRiemannHLLC.c testMatrixInversion_SOURCES = testMatrixInversion.c +threadpool_test_SOURCES = threadpool_test.c + # Files necessary for distribution EXTRA_DIST = testReading.sh makeInput.py testPair.sh testPairPerturbed.sh \ test27cells.sh test27cellsPerturbed.sh tolerance.dat testParser.sh \ diff --git a/tests/threadpool_test.c b/tests/threadpool_test.c new file mode 100644 index 0000000000000000000000000000000000000000..90ec58e10b16d816b52b785c87e6604ad11f61d8 --- /dev/null +++ b/tests/threadpool_test.c @@ -0,0 +1,80 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (C) 2016 Pedro Gonnet (pedro.gonnet@durham.ac.uk) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +// Standard includes. +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> + +// Local includes. +#include "../src/atomic.h" +#include "../src/threadpool.h" + +void map_function_first(void *map_data, int num_elements, void *extra_data) { + const int *inputs = (int *)map_data; + for (int ind = 0; ind < num_elements; ind++) { + int input = inputs[ind]; + usleep(rand() % 1000000); + printf("map_function_first: got input %i.\n", input); + fflush(stdout); + } +} + +void map_function_second(void *map_data, int num_elements, void *extra_data) { + const int *inputs = (int *)map_data; + for (int ind = 0; ind < num_elements; ind++) { + int input = inputs[ind]; + usleep(rand() % 1000000); + printf("map_function_second: got input %i.\n", input); + fflush(stdout); + } +} + +int main(int argc, char *argv[]) { + + // Some constants for this test. + const int num_threads = 16; + const int N = 20; + const int num_runs = 2; + + // Create a threadpool with 8 threads. + struct threadpool tp; + threadpool_init(&tp, num_threads); + + // Main loop. + for (int run = 0; run < num_runs; run++) { + + // Run over a set of integers and print them. + int data[N]; + for (int k = 0; k < N; k++) data[k] = k; + printf("processing integers from 0..%i.\n", N); + fflush(stdout); + threadpool_map(&tp, map_function_first, data, N, sizeof(int), 1, NULL); + + // Do the same thing again, with less jobs than threads. + printf("processing integers from 0..%i.\n", N / 2); + fflush(stdout); + threadpool_map(&tp, map_function_second, data, N / 2, sizeof(int), 1, NULL); + + // Do the same thing again, with a chunk size of two. + printf("processing integers from 0..%i.\n", N); + fflush(stdout); + threadpool_map(&tp, map_function_first, data, N, sizeof(int), 2, NULL); + } +}