Commit ddad736c authored by Matthieu Schaller's avatar Matthieu Schaller
Browse files

Merge branch 'engine_barrier' into 'master'

Engine barrier

Following the results in the `threadpool_task_plots` branch, I've replaced the elaborate hand-crafted `engine_barrier` function by two `pthread_barrier`s.

As a result, the runner threads should all start, and synchronize, faster.

See merge request !386
parents ca72103a 2ec5c08f
......@@ -2928,37 +2928,13 @@ void engine_prepare(struct engine *e) {
* @param e The #engine.
* @param tid The thread ID
*/
void engine_barrier(struct engine *e, int tid) {
void engine_barrier(struct engine *e) {
/* First, get the barrier mutex. */
if (pthread_mutex_lock(&e->barrier_mutex) != 0)
error("Failed to get barrier mutex.");
/* Wait at the wait barrier. */
pthread_barrier_wait(&e->wait_barrier);
/* This thread is no longer running. */
e->barrier_running -= 1;
/* If all threads are in, send a signal... */
if (e->barrier_running == 0)
if (pthread_cond_broadcast(&e->barrier_cond) != 0)
error("Failed to broadcast barrier full condition.");
/* Wait for the barrier to open. */
while (e->barrier_launch == 0 || tid >= e->barrier_launchcount)
if (pthread_cond_wait(&e->barrier_cond, &e->barrier_mutex) != 0)
error("Error waiting for barrier to close.");
/* This thread has been launched. */
e->barrier_running += 1;
e->barrier_launch -= 1;
/* If I'm the last one out, signal the condition again. */
if (e->barrier_launch == 0)
if (pthread_cond_broadcast(&e->barrier_cond) != 0)
error("Failed to broadcast empty barrier condition.");
/* Last but not least, release the mutex. */
if (pthread_mutex_unlock(&e->barrier_mutex) != 0)
error("Failed to get unlock the barrier mutex.");
/* Wait at the run barrier. */
pthread_barrier_wait(&e->run_barrier);
}
/**
......@@ -3224,9 +3200,8 @@ void engine_skip_drift(struct engine *e) {
* @brief Launch the runners.
*
* @param e The #engine.
* @param nr_runners The number of #runner to let loose.
*/
void engine_launch(struct engine *e, int nr_runners) {
void engine_launch(struct engine *e) {
const ticks tic = getticks();
......@@ -3239,15 +3214,10 @@ void engine_launch(struct engine *e, int nr_runners) {
atomic_inc(&e->sched.waiting);
/* Cry havoc and let loose the dogs of war. */
e->barrier_launch = nr_runners;
e->barrier_launchcount = nr_runners;
if (pthread_cond_broadcast(&e->barrier_cond) != 0)
error("Failed to broadcast barrier open condition.");
pthread_barrier_wait(&e->run_barrier);
/* Load the tasks. */
pthread_mutex_unlock(&e->barrier_mutex);
scheduler_start(&e->sched);
pthread_mutex_lock(&e->barrier_mutex);
/* Remove the safeguard. */
pthread_mutex_lock(&e->sched.sleep_mutex);
......@@ -3256,9 +3226,7 @@ void engine_launch(struct engine *e, int nr_runners) {
pthread_mutex_unlock(&e->sched.sleep_mutex);
/* Sit back and wait for the runners to come home. */
while (e->barrier_launch || e->barrier_running)
if (pthread_cond_wait(&e->barrier_cond, &e->barrier_mutex) != 0)
error("Error while waiting for barrier.");
pthread_barrier_wait(&e->wait_barrier);
if (e->verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
......@@ -3308,7 +3276,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs,
/* Now, launch the calculation */
TIMER_TIC;
engine_launch(e, e->nr_threads);
engine_launch(e);
TIMER_TOC(timer_runners);
/* Apply some conversions (e.g. internal energy -> entropy) */
......@@ -3324,7 +3292,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs,
if (hydro_need_extra_init_loop) {
engine_marktasks(e);
engine_skip_force_and_kick(e);
engine_launch(e, e->nr_threads);
engine_launch(e);
}
}
......@@ -3366,7 +3334,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs,
#endif
/* Run the 0th time-step */
engine_launch(e, e->nr_threads);
engine_launch(e);
#ifdef SWIFT_GRAVITY_FORCE_CHECKS
/* Check the accuracy of the gravity calculation */
......@@ -3540,7 +3508,7 @@ void engine_step(struct engine *e) {
/* Start all the tasks. */
TIMER_TIC;
engine_launch(e, e->nr_threads);
engine_launch(e);
TIMER_TOC(timer_runners);
#ifdef SWIFT_GRAVITY_FORCE_CHECKS
......@@ -4441,15 +4409,9 @@ void engine_init(struct engine *e, struct space *s,
threadpool_init(&e->threadpool, e->nr_threads);
/* First of all, init the barrier and lock it. */
if (pthread_mutex_init(&e->barrier_mutex, NULL) != 0)
error("Failed to initialize barrier mutex.");
if (pthread_cond_init(&e->barrier_cond, NULL) != 0)
error("Failed to initialize barrier condition variable.");
if (pthread_mutex_lock(&e->barrier_mutex) != 0)
error("Failed to lock barrier mutex.");
e->barrier_running = 0;
e->barrier_launch = 0;
e->barrier_launchcount = 0;
if (pthread_barrier_init(&e->wait_barrier, NULL, e->nr_threads + 1) != 0 ||
pthread_barrier_init(&e->run_barrier, NULL, e->nr_threads + 1) != 0)
error("Failed to initialize barrier.");
/* Init the scheduler with enough tasks for the initial sorting tasks. */
const int nr_tasks = 2 * s->tot_cells + 2 * e->nr_threads;
......@@ -4463,7 +4425,6 @@ void engine_init(struct engine *e, struct space *s,
for (int k = 0; k < e->nr_threads; k++) {
e->runners[k].id = k;
e->runners[k].e = e;
e->barrier_running += 1;
if (pthread_create(&e->runners[k].thread, NULL, &runner_main,
&e->runners[k]) != 0)
error("Failed to create runner thread.");
......@@ -4526,9 +4487,7 @@ void engine_init(struct engine *e, struct space *s,
#endif
/* Wait for the runner threads to be in place. */
while (e->barrier_running || e->barrier_launch)
if (pthread_cond_wait(&e->barrier_cond, &e->barrier_mutex) != 0)
error("Error while waiting for runner threads to get in place.");
pthread_barrier_wait(&e->wait_barrier);
}
/**
......
......@@ -176,9 +176,8 @@ struct engine {
int count_step;
/* Data for the threads' barrier. */
pthread_mutex_t barrier_mutex;
pthread_cond_t barrier_cond;
volatile int barrier_running, barrier_launch, barrier_launchcount;
pthread_barrier_t wait_barrier;
pthread_barrier_t run_barrier;
/* ID of the node this engine lives on. */
int nr_nodes, nodeID;
......@@ -253,7 +252,7 @@ struct engine {
};
/* Function prototypes. */
void engine_barrier(struct engine *e, int tid);
void engine_barrier(struct engine *e);
void engine_compute_next_snapshot_time(struct engine *e);
void engine_unskip(struct engine *e);
void engine_drift_all(struct engine *e);
......@@ -271,7 +270,7 @@ void engine_init(struct engine *e, struct space *s,
const struct external_potential *potential,
const struct cooling_function_data *cooling_func,
struct sourceterms *sourceterms);
void engine_launch(struct engine *e, int nr_runners);
void engine_launch(struct engine *e);
void engine_prepare(struct engine *e);
void engine_init_particles(struct engine *e, int flag_entropy_ICs,
int clean_h_values);
......
......@@ -1754,7 +1754,7 @@ void *runner_main(void *data) {
while (1) {
/* Wait at the barrier. */
engine_barrier(e, r->id);
engine_barrier(e);
/* Re-set the pointer to the previous task, as there is none. */
struct task *t = NULL;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment