Commit d9c69983 authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

use pthread_barrier instead of mutexes and condvars, might be faster.

parent d7419412
...@@ -133,6 +133,7 @@ void threadpool_dump_log(struct threadpool *tp, const char *filename, ...@@ -133,6 +133,7 @@ void threadpool_dump_log(struct threadpool *tp, const char *filename,
* @brief Runner main loop, get a chunk and call the mapper function. * @brief Runner main loop, get a chunk and call the mapper function.
*/ */
void threadpool_chomp(struct threadpool *tp, int tid) { void threadpool_chomp(struct threadpool *tp, int tid) {
/* Loop until we can't get a chunk. */ /* Loop until we can't get a chunk. */
while (1) { while (1) {
/* Desired chunk size. */ /* Desired chunk size. */
...@@ -168,26 +169,13 @@ void *threadpool_runner(void *data) { ...@@ -168,26 +169,13 @@ void *threadpool_runner(void *data) {
while (1) { while (1) {
/* Let the controller know that this thread is waiting. */ /* Let the controller know that this thread is waiting. */
pthread_mutex_lock(&tp->thread_mutex); pthread_barrier_wait(&tp->wait_barrier);
tp->num_threads_waiting += 1;
if (tp->num_threads_waiting == tp->num_threads - 1) {
pthread_cond_signal(&tp->control_cond);
}
/* Wait for the controller. */ /* Wait for the controller. */
pthread_cond_wait(&tp->thread_cond, &tp->thread_mutex); pthread_barrier_wait(&tp->run_barrier);
tp->num_threads_waiting -= 1;
#ifdef SWIFT_DEBUG_THREADPOOL
const int tid = tp->num_threads_running;
#endif
tp->num_threads_running += 1;
if (tp->num_threads_running == tp->num_threads - 1) {
pthread_cond_signal(&tp->control_cond);
}
pthread_mutex_unlock(&tp->thread_mutex);
/* Do actual work. */ /* Do actual work. */
threadpool_chomp(tp, tid); threadpool_chomp(tp, atomic_inc(&tp->num_threads_running));
} }
} }
...@@ -201,7 +189,6 @@ void threadpool_init(struct threadpool *tp, int num_threads) { ...@@ -201,7 +189,6 @@ void threadpool_init(struct threadpool *tp, int num_threads) {
/* Initialize the thread counters. */ /* Initialize the thread counters. */
tp->num_threads = num_threads; tp->num_threads = num_threads;
tp->num_threads_waiting = 0;
#ifdef SWIFT_DEBUG_THREADPOOL #ifdef SWIFT_DEBUG_THREADPOOL
if ((tp->logs = (struct mapper_log *)malloc(sizeof(struct mapper_log) * if ((tp->logs = (struct mapper_log *)malloc(sizeof(struct mapper_log) *
...@@ -220,12 +207,10 @@ void threadpool_init(struct threadpool *tp, int num_threads) { ...@@ -220,12 +207,10 @@ void threadpool_init(struct threadpool *tp, int num_threads) {
we will just do work in the (blocked) calling thread. */ we will just do work in the (blocked) calling thread. */
if (num_threads == 1) return; if (num_threads == 1) return;
/* Init the threadpool mutexes. */ /* Init the barriers. */
if (pthread_mutex_init(&tp->thread_mutex, NULL) != 0) if (pthread_barrier_init(&tp->wait_barrier, NULL, num_threads) != 0 ||
error("Failed to initialize mutex."); pthread_barrier_init(&tp->run_barrier, NULL, num_threads) != 0)
if (pthread_cond_init(&tp->control_cond, NULL) != 0 || error("Failed to initialize barriers.");
pthread_cond_init(&tp->thread_cond, NULL) != 0)
error("Failed to initialize condition variables.");
/* Set the task counter to zero. */ /* Set the task counter to zero. */
tp->map_data_size = 0; tp->map_data_size = 0;
...@@ -242,17 +227,13 @@ void threadpool_init(struct threadpool *tp, int num_threads) { ...@@ -242,17 +227,13 @@ void threadpool_init(struct threadpool *tp, int num_threads) {
} }
/* Create and start the threads. */ /* Create and start the threads. */
pthread_mutex_lock(&tp->thread_mutex);
for (int k = 0; k < num_threads - 1; k++) { for (int k = 0; k < num_threads - 1; k++) {
if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0) if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0)
error("Failed to create threadpool runner thread."); error("Failed to create threadpool runner thread.");
} }
/* Wait for all the threads to be up and running. */ /* Wait for all the threads to be up and running. */
while (tp->num_threads_waiting < tp->num_threads - 1) { pthread_barrier_wait(&tp->wait_barrier);
pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
}
pthread_mutex_unlock(&tp->thread_mutex);
} }
/** /**
...@@ -289,7 +270,6 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function, ...@@ -289,7 +270,6 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function,
} }
/* Set the map data and signal the threads. */ /* Set the map data and signal the threads. */
pthread_mutex_lock(&tp->thread_mutex);
tp->map_data_stride = stride; tp->map_data_stride = stride;
tp->map_data_size = N; tp->map_data_size = N;
tp->map_data_count = 0; tp->map_data_count = 0;
...@@ -301,23 +281,15 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function, ...@@ -301,23 +281,15 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function,
tp->map_data = map_data; tp->map_data = map_data;
tp->map_extra_data = extra_data; tp->map_extra_data = extra_data;
tp->num_threads_running = 0; tp->num_threads_running = 0;
pthread_cond_broadcast(&tp->thread_cond);
/* Wait for all the threads to be up and running. */ /* Wait for all the threads to be up and running. */
while (tp->num_threads_running < tp->num_threads - 1) { pthread_barrier_wait(&tp->run_barrier);
pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
}
/* Do some work while I'm at it. */ /* Do some work while I'm at it. */
pthread_mutex_unlock(&tp->thread_mutex);
threadpool_chomp(tp, tp->num_threads - 1); threadpool_chomp(tp, tp->num_threads - 1);
pthread_mutex_lock(&tp->thread_mutex);
/* Wait for all threads to be done. */ /* Wait for all threads to be done. */
while (tp->num_threads_waiting < tp->num_threads - 1) { pthread_barrier_wait(&tp->wait_barrier);
pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
}
pthread_mutex_unlock(&tp->thread_mutex);
#ifdef SWIFT_DEBUG_THREADPOOL #ifdef SWIFT_DEBUG_THREADPOOL
/* Log the total call time to thread id -1. */ /* Log the total call time to thread id -1. */
......
...@@ -70,8 +70,8 @@ struct threadpool { ...@@ -70,8 +70,8 @@ struct threadpool {
pthread_t *threads; pthread_t *threads;
/* This is where threads go to rest. */ /* This is where threads go to rest. */
pthread_mutex_t thread_mutex; pthread_barrier_t wait_barrier;
pthread_cond_t control_cond, thread_cond; pthread_barrier_t run_barrier;
/* Current map data and count. */ /* Current map data and count. */
void *map_data, *map_extra_data; void *map_data, *map_extra_data;
...@@ -83,7 +83,7 @@ struct threadpool { ...@@ -83,7 +83,7 @@ struct threadpool {
int num_threads; int num_threads;
/* Counter for the number of threads that are done. */ /* Counter for the number of threads that are done. */
volatile int num_threads_waiting, num_threads_running; volatile int num_threads_running;
#ifdef SWIFT_DEBUG_THREADPOOL #ifdef SWIFT_DEBUG_THREADPOOL
struct mapper_log *logs; struct mapper_log *logs;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment