Commit 879a5034 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'tasks_cleanup' into 'master'

Tasks cleanup

Replace all tasks that are not part of the main computation to a `threadpool` which can more simply and efficiently schedule small tasks with no dependencies. See #166.

Peter, can you check that this effectively passes all your tests? Thanks!

See merge request !176
parents 28388f70 9924b6e3
......@@ -42,7 +42,8 @@ endif
include_HEADERS = space.h runner.h queue.h task.h lock.h cell.h part.h const.h \
engine.h swift.h serial_io.h timers.h debug.h scheduler.h proxy.h parallel_io.h \
common_io.h single_io.h multipole.h map.h tools.h partition.h clocks.h parser.h \
physical_constants.h physical_constants_cgs.h potentials.h version.h hydro_properties.h
physical_constants.h physical_constants_cgs.h potentials.h version.h \
hydro_properties.h threadpool.h
# Common source files
AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \
......@@ -50,7 +51,7 @@ AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \
units.c common_io.c single_io.c multipole.c version.c map.c \
kernel_hydro.c tools.c part.c partition.c clocks.c parser.c \
physical_constants.c potentials.c hydro_properties.c \
runner_doiact_fft.c
runner_doiact_fft.c threadpool.c
# Include files for distribution, not installation.
nobase_noinst_HEADERS = approx_math.h atomic.h cycle.h error.h inline.h kernel_hydro.h kernel_gravity.h \
......
......@@ -23,6 +23,7 @@
#include "inline.h"
#define atomic_add(v, i) __sync_fetch_and_add(v, i)
#define atomic_or(v, i) __sync_fetch_and_or(v, i)
#define atomic_inc(v) atomic_add(v, 1)
#define atomic_dec(v) atomic_add(v, -1)
#define atomic_cas(v, o, n) __sync_val_compare_and_swap(v, o, n)
......
......@@ -122,7 +122,7 @@ struct cell {
int nr_density, nr_force, nr_grav;
/* The hierarchical tasks. */
struct task *ghost, *init, *drift, *kick;
struct task *ghost, *init, *kick;
/* Task receiving data. */
struct task *recv_xv, *recv_rho, *recv_ti;
......
......@@ -24,17 +24,20 @@
#include "../config.h"
/* Some standard headers. */
#include <float.h>
#include <stdio.h>
/* This object's header. */
#include "debug.h"
/* Local includes. */
#include "config.h"
#include "cell.h"
#include "const.h"
#include "engine.h"
#include "hydro.h"
#include "inline.h"
#include "part.h"
#include "space.h"
/* Import the right hydro definition */
#if defined(MINIMAL_SPH)
......@@ -137,6 +140,54 @@ void printgParticle_single(struct gpart *gp) {
printf("\n");
}
/**
* @brief Check that the cells and particles of a space have consistent h_max
* values.
*
* @param s the space.
* @result 1 or 0
*/
int checkSpacehmax(struct space *s) {
/* Loop over local cells. */
float cell_h_max = 0.0f;
for (int k = 0; k < s->nr_cells; k++) {
if (s->cells[k].nodeID == s->e->nodeID && s->cells[k].h_max > cell_h_max) {
cell_h_max = s->cells[k].h_max;
}
}
/* Now all particles. */
float part_h_max = 0.0f;
for (int k = 0; k < s->nr_parts; k++) {
if (s->parts[k].h > part_h_max) {
part_h_max = s->parts[k].h;
}
}
/* If within some epsilon we are OK. */
if (abs(cell_h_max - part_h_max) <= FLT_EPSILON) return 1;
/* There is a problem. Hunt it down. */
for (int k = 0; k < s->nr_cells; k++) {
if (s->cells[k].nodeID == s->e->nodeID) {
if (s->cells[k].h_max > part_h_max) {
message("cell %d is inconsistent (%f > %f)", k, s->cells[k].h_max,
part_h_max);
}
}
}
for (int k = 0; k < s->nr_parts; k++) {
if (s->parts[k].h > cell_h_max) {
message("part %lld is inconsistent (%f > %f)", s->parts[k].id,
s->parts[k].h, cell_h_max);
}
}
return 0;
}
#ifdef HAVE_METIS
/**
......
......@@ -22,6 +22,7 @@
/* Includes. */
#include "cell.h"
#include "part.h"
#include "space.h"
void printParticle(const struct part *parts, struct xpart *xparts,
long long int id, size_t N);
......@@ -30,6 +31,8 @@ void printgParticle(const struct gpart *gparts, const struct part *parts,
void printParticle_single(const struct part *p, const struct xpart *xp);
void printgParticle_single(struct gpart *gp);
int checkSpacehmax(struct space *s);
#ifdef HAVE_METIS
#include "metis.h"
void dumpMETISGraph(const char *prefix, idx_t nvtxs, idx_t ncon, idx_t *xadj,
......
This diff is collapsed.
......@@ -109,6 +109,9 @@ struct engine {
/* The task scheduler. */
struct scheduler sched;
/* Common threadpool for all the engine's tasks. */
struct threadpool threadpool;
/* The minimum and maximum allowed dt */
double dt_min, dt_max;
......@@ -234,7 +237,6 @@ void engine_rebuild(struct engine *e);
void engine_repartition(struct engine *e);
void engine_makeproxies(struct engine *e);
void engine_redistribute(struct engine *e);
struct link *engine_addlink(struct engine *e, struct link *l, struct task *t);
void engine_print_policy(struct engine *e);
int engine_is_done(struct engine *e);
void engine_pin();
......
......@@ -34,6 +34,7 @@
#define lock_trylock(l) (pthread_spin_lock(l) != 0)
#define lock_unlock(l) (pthread_spin_unlock(l) != 0)
#define lock_unlock_blind(l) pthread_spin_unlock(l)
#elif defined(PTHREAD_LOCK)
#include <pthread.h>
#define swift_lock_type pthread_mutex_t
......@@ -43,6 +44,7 @@
#define lock_trylock(l) (pthread_mutex_trylock(l) != 0)
#define lock_unlock(l) (pthread_mutex_unlock(l) != 0)
#define lock_unlock_blind(l) pthread_mutex_unlock(l)
#else
#define swift_lock_type volatile int
#define lock_init(l) (*(l) = 0)
......
......@@ -455,8 +455,8 @@ static void repart_edge_metis(int partweights, int bothweights, int nodeID,
/* Skip un-interesting tasks. */
if (t->type != task_type_self && t->type != task_type_pair &&
t->type != task_type_sub_self && t->type != task_type_sub_self &&
t->type != task_type_ghost && t->type != task_type_drift &&
t->type != task_type_kick && t->type != task_type_init)
t->type != task_type_ghost && t->type != task_type_kick &&
t->type != task_type_init)
continue;
/* Get the task weight. This can be slightly negative on multiple board
......@@ -494,8 +494,7 @@ static void repart_edge_metis(int partweights, int bothweights, int nodeID,
int cid = ci - cells;
/* Different weights for different tasks. */
if (t->type == task_type_ghost || t->type == task_type_drift ||
t->type == task_type_kick) {
if (t->type == task_type_ghost || t->type == task_type_kick) {
/* Particle updates add only to vertex weight. */
if (taskvweights) weights_v[cid] += w;
......
......@@ -584,33 +584,27 @@ void runner_do_ghost(struct runner *r, struct cell *c) {
}
/**
* @brief Drift particles and g-particles forward in time
* @brief Drift particles and g-particles in a cell forward in time
*
* @param r The runner thread.
* @param c The cell.
* @param timer Are we timing this ?
* @param e The engine.
*/
void runner_do_drift(struct runner *r, struct cell *c, int timer) {
static void runner_do_drift(struct cell *c, struct engine *e) {
const double timeBase = r->e->timeBase;
const double dt = (r->e->ti_current - r->e->ti_old) * timeBase;
const int ti_old = r->e->ti_old;
const int ti_current = r->e->ti_current;
struct part *restrict parts = c->parts;
struct xpart *restrict xparts = c->xparts;
struct gpart *restrict gparts = c->gparts;
const double timeBase = e->timeBase;
const double dt = (e->ti_current - e->ti_old) * timeBase;
const int ti_old = e->ti_old;
const int ti_current = e->ti_current;
struct part *const parts = c->parts;
struct xpart *const xparts = c->xparts;
struct gpart *const gparts = c->gparts;
float dx_max = 0.f, dx2_max = 0.f, h_max = 0.f;
double e_kin = 0.0, e_int = 0.0, e_pot = 0.0, entropy = 0.0, mass = 0.0;
double mom[3] = {0.0, 0.0, 0.0};
double ang_mom[3] = {0.0, 0.0, 0.0};
TIMER_TIC
#ifdef TASK_VERBOSE
OUT;
#endif
/* No children? */
if (!c->split) {
......@@ -619,7 +613,7 @@ void runner_do_drift(struct runner *r, struct cell *c, int timer) {
for (size_t k = 0; k < nr_gparts; k++) {
/* Get a handle on the gpart. */
struct gpart *restrict gp = &gparts[k];
struct gpart *const gp = &gparts[k];
/* Drift... */
drift_gpart(gp, dt, timeBase, ti_old, ti_current);
......@@ -636,8 +630,8 @@ void runner_do_drift(struct runner *r, struct cell *c, int timer) {
for (size_t k = 0; k < nr_parts; k++) {
/* Get a handle on the part. */
struct part *restrict p = &parts[k];
struct xpart *restrict xp = &xparts[k];
struct part *const p = &parts[k];
struct xpart *const xp = &xparts[k];
/* Drift... */
drift_part(p, xp, dt, timeBase, ti_old, ti_current);
......@@ -690,15 +684,14 @@ void runner_do_drift(struct runner *r, struct cell *c, int timer) {
/* Otherwise, aggregate data from children. */
else {
/* Loop over the progeny. */
/* Loop over the progeny and collect their data. */
for (int k = 0; k < 8; k++)
if (c->progeny[k] != NULL) {
struct cell *cp = c->progeny[k];
/* Recurse */
struct cell *restrict cp = c->progeny[k];
runner_do_drift(r, cp, 0);
/* Recurse. */
runner_do_drift(cp, e);
/* Collect */
dx_max = fmaxf(dx_max, cp->dx_max);
h_max = fmaxf(h_max, cp->h_max);
mass += cp->mass;
......@@ -729,8 +722,28 @@ void runner_do_drift(struct runner *r, struct cell *c, int timer) {
c->ang_mom[0] = ang_mom[0];
c->ang_mom[1] = ang_mom[1];
c->ang_mom[2] = ang_mom[2];
}
/**
* @brief Mapper function to drift particles and g-particles forward in time.
*
* @param map_data An array of #cell%s.
* @param num_elements Chunk size.
* @param extra_data Pointer to an #engine.
*/
void runner_do_drift_mapper(void *map_data, int num_elements,
void *extra_data) {
if (timer) TIMER_TOC(timer_drift);
struct engine *e = (struct engine *)extra_data;
struct cell *cells = (struct cell *)map_data;
for (int ind = 0; ind < num_elements; ind++) {
struct cell *c = &cells[ind];
/* Only drift local particles. */
if (c != NULL && c->nodeID == e->nodeID) runner_do_drift(c, e);
}
}
/**
......@@ -1122,9 +1135,6 @@ void *runner_main(void *data) {
case task_type_ghost:
runner_do_ghost(r, ci);
break;
case task_type_drift:
runner_do_drift(r, ci, 1);
break;
case task_type_kick:
runner_do_kick(r, ci, 1);
break;
......@@ -1158,19 +1168,6 @@ void *runner_main(void *data) {
case task_type_grav_external:
runner_do_grav_external(r, t->ci, 1);
break;
case task_type_part_sort:
space_do_parts_sort();
break;
case task_type_gpart_sort:
space_do_gparts_sort();
break;
case task_type_split_cell:
space_do_split(e->s, t->ci);
break;
case task_type_rewait:
scheduler_do_rewait((struct task *)t->ci, (struct task *)t->cj,
t->flags, t->rank);
break;
default:
error("Unknown task type.");
}
......
......@@ -51,8 +51,8 @@ void runner_do_sort(struct runner *r, struct cell *c, int flag, int clock);
void runner_do_gsort(struct runner *r, struct cell *c, int flag, int clock);
void runner_do_kick(struct runner *r, struct cell *c, int timer);
void runner_do_kick_fixdt(struct runner *r, struct cell *c, int timer);
void runner_do_drift(struct runner *r, struct cell *c, int timer);
void runner_do_init(struct runner *r, struct cell *c, int timer);
void *runner_main(void *data);
void runner_do_drift_mapper(void *map_data, int num_elements, void *extra_data);
#endif /* SWIFT_RUNNER_H */
This diff is collapsed.
......@@ -36,6 +36,7 @@
#include "lock.h"
#include "queue.h"
#include "task.h"
#include "threadpool.h"
/* Some constants. */
#define scheduler_maxwait 3
......@@ -83,9 +84,9 @@ struct scheduler {
int *tasks_ind;
/* The task unlocks. */
struct task **unlocks;
int *unlock_ind;
int nr_unlocks, size_unlocks;
struct task **volatile unlocks;
int *volatile unlock_ind;
volatile int nr_unlocks, size_unlocks, completed_unlock_writes;
/* Lock for this scheduler. */
swift_lock_type lock;
......@@ -97,13 +98,17 @@ struct scheduler {
/* The space associated with this scheduler. */
struct space *space;
/* Threadpool to use internally for mundane parallel work. */
struct threadpool *threadpool;
/* The node we are working on. */
int nodeID;
};
/* Function prototypes. */
void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
int nr_queues, unsigned int flags, int nodeID);
int nr_queues, unsigned int flags, int nodeID,
struct threadpool *tp);
struct task *scheduler_gettask(struct scheduler *s, int qid,
const struct task *prev);
void scheduler_enqueue(struct scheduler *s, struct task *t);
......@@ -122,8 +127,6 @@ void scheduler_addunlock(struct scheduler *s, struct task *ta, struct task *tb);
void scheduler_set_unlocks(struct scheduler *s);
void scheduler_dump_queue(struct scheduler *s);
void scheduler_print_tasks(const struct scheduler *s, const char *fileName);
void scheduler_do_rewait(struct task *t_begin, struct task *t_end,
unsigned int mask, unsigned int submask);
void scheduler_clean(struct scheduler *s);
#endif /* SWIFT_SCHEDULER_H */
This diff is collapsed.
......@@ -68,8 +68,8 @@ struct space {
/* Cell widths. */
double width[3], iwidth[3];
/* The minimum and maximum cutoff radii. */
double h_max, cell_min;
/* The minimum cell width. */
double cell_min;
/* Current maximum displacement for particles. */
float dx_max;
......@@ -132,7 +132,6 @@ struct parallel_sort {
unsigned int stack_size;
volatile unsigned int first, last, waiting;
};
extern struct parallel_sort space_sort_struct;
/* function prototypes. */
void space_parts_sort(struct space *s, int *ind, size_t N, int min, int max,
......@@ -156,10 +155,14 @@ void space_map_parts_xparts(struct space *s,
struct cell *c));
void space_map_cells_post(struct space *s, int full,
void (*fun)(struct cell *c, void *data), void *data);
void space_parts_sort_mapper(void *map_data, int num_elements,
void *extra_data);
void space_gparts_sort_mapper(void *map_data, int num_elements,
void *extra_data);
void space_rebuild(struct space *s, double h_max, int verbose);
void space_recycle(struct space *s, struct cell *c);
void space_split(struct space *s, struct cell *cells, int verbose);
void space_do_split(struct space *s, struct cell *c);
void space_split_mapper(void *map_data, int num_elements, void *extra_data);
void space_do_parts_sort();
void space_do_gparts_sort();
void space_init_parts(struct space *s);
......
......@@ -48,14 +48,13 @@
/* Task type names. */
const char *taskID_names[task_type_count] = {
"none", "sort", "self", "pair", "sub_self",
"sub_pair", "init", "ghost", "drift", "kick",
"kick_fixdt", "send", "recv", "grav_gather_m", "grav_fft",
"grav_mm", "grav_up", "grav_external", "part_sort", "gpart_sort",
"split_cell", "rewait"};
"none", "sort", "self", "pair", "sub_self",
"sub_pair", "init", "ghost", "kick", "kick_fixdt",
"send", "recv", "grav_gather_m", "grav_fft", "grav_mm",
"grav_up", "grav_external"};
const char *subtaskID_names[task_subtype_count] = {"none", "density", "force",
"grav", "tend"};
const char *subtaskID_names[task_type_count] = {"none", "density", "force",
"grav", "tend"};
/**
* @brief Computes the overlap between the parts array of two given cells.
......@@ -137,7 +136,6 @@ __attribute__((always_inline)) INLINE static enum task_actions task_acts_on(
break;
case task_type_init:
case task_type_drift:
case task_type_kick:
case task_type_kick_fixdt:
case task_type_send:
......@@ -156,15 +154,8 @@ __attribute__((always_inline)) INLINE static enum task_actions task_acts_on(
return task_action_gpart;
break;
case task_type_part_sort:
case task_type_gpart_sort:
case task_type_split_cell:
case task_type_rewait:
return task_action_none;
break;
default:
error("Unknow task_action for task");
error("Unknown task_action for task");
return task_action_none;
break;
}
......
......@@ -41,7 +41,6 @@ enum task_types {
task_type_sub_pair,
task_type_init,
task_type_ghost,
task_type_drift,
task_type_kick,
task_type_kick_fixdt,
task_type_send,
......@@ -51,10 +50,6 @@ enum task_types {
task_type_grav_mm,
task_type_grav_up,
task_type_grav_external,
task_type_part_sort,
task_type_gpart_sort,
task_type_split_cell,
task_type_rewait,
task_type_count
};
......
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2016 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/* Config parameters. */
#include "../config.h"
/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <math.h>
#include <stdlib.h>
#include <string.h>
/* This object's header. */
#include "threadpool.h"
/* Local headers. */
#include "atomic.h"
#include "error.h"
void *threadpool_runner(void *data) {
/* Our threadpool. */
struct threadpool *tp = (struct threadpool *)data;
/* Main loop. */
while (1) {
/* Let the controller know that this thread is waiting. */
pthread_mutex_lock(&tp->thread_mutex);
tp->num_threads_waiting += 1;
if (tp->num_threads_waiting == tp->num_threads) {
pthread_cond_signal(&tp->control_cond);
}
/* Wait for the controller. */
pthread_cond_wait(&tp->thread_cond, &tp->thread_mutex);
tp->num_threads_waiting -= 1;
tp->num_threads_running += 1;
if (tp->num_threads_running == tp->num_threads) {
pthread_cond_signal(&tp->control_cond);
}
pthread_mutex_unlock(&tp->thread_mutex);
/* The index of the mapping task we will work on next. */
size_t task_ind;
while ((task_ind = atomic_add(&tp->map_data_count, tp->map_data_chunk)) <
tp->map_data_size) {
const int num_elements = task_ind + tp->map_data_chunk > tp->map_data_size
? tp->map_data_size - task_ind
: tp->map_data_chunk;
tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind),
num_elements, tp->map_extra_data);
}
}
}
void threadpool_init(struct threadpool *tp, int num_threads) {
/* Initialize the thread counters. */
tp->num_threads = num_threads;
tp->num_threads_waiting = 0;
/* Init the threadpool mutexes. */
if (pthread_mutex_init(&tp->thread_mutex, NULL) != 0)
error("Failed to initialize mutexex.");
if (pthread_cond_init(&tp->control_cond, NULL) != 0 ||
pthread_cond_init(&tp->thread_cond, NULL) != 0)
error("Failed to initialize condition variables.");
/* Set the task counter to zero. */
tp->map_data_size = 0;
tp->map_data_count = 0;
tp->map_data_stride = 0;
tp->map_data_chunk = 0;
tp->map_function = NULL;
/* Allocate the threads. */
if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads)) ==
NULL) {
error("Failed to allocate thread array.");
}
/* Create and start the threads. */
pthread_mutex_lock(&tp->thread_mutex);
for (int k = 0; k < num_threads; k++) {
if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0)
error("Failed to create threadpool runner thread.");
}
/* Wait for all the threads to be up and running. */
while (tp->num_threads_waiting < tp->num_threads) {
pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
}
pthread_mutex_unlock(&tp->thread_mutex);
}
/**
* @brief Map a function to an array of data in parallel using a #threadpool.
*
* The function @c map_function is called on each element of @c map_data
* in parallel.
*
* @param tp The #threadpool on which to run.
* @param map_function The function that will be applied to the map data.
* @param map_data The data on which the mapping function will be called.
* @param N Number of elements in @c map_data.
* @param stride Size, in bytes, of each element of @c map_data.
* @param chunk Number of map data elements to pass to the function at a time.
* @param extra_data Addtitional pointer that will be passed to the mapping
* function, may contain additional data.
*/