diff --git a/src/threadpool.c b/src/threadpool.c index c75a3946dfb9ca7b2a6f2fec046752fc9e0de41a..79bd586cd9d32b22a21f05ca66bfcc6b30bdfbbf 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -133,6 +133,7 @@ void threadpool_dump_log(struct threadpool *tp, const char *filename, * @brief Runner main loop, get a chunk and call the mapper function. */ void threadpool_chomp(struct threadpool *tp, int tid) { + /* Loop until we can't get a chunk. */ while (1) { /* Desired chunk size. */ @@ -168,26 +169,13 @@ void *threadpool_runner(void *data) { while (1) { /* Let the controller know that this thread is waiting. */ - pthread_mutex_lock(&tp->thread_mutex); - tp->num_threads_waiting += 1; - if (tp->num_threads_waiting == tp->num_threads - 1) { - pthread_cond_signal(&tp->control_cond); - } + pthread_barrier_wait(&tp->wait_barrier); /* Wait for the controller. */ - pthread_cond_wait(&tp->thread_cond, &tp->thread_mutex); - tp->num_threads_waiting -= 1; -#ifdef SWIFT_DEBUG_THREADPOOL - const int tid = tp->num_threads_running; -#endif - tp->num_threads_running += 1; - if (tp->num_threads_running == tp->num_threads - 1) { - pthread_cond_signal(&tp->control_cond); - } - pthread_mutex_unlock(&tp->thread_mutex); + pthread_barrier_wait(&tp->run_barrier); /* Do actual work. */ - threadpool_chomp(tp, tid); + threadpool_chomp(tp, atomic_inc(&tp->num_threads_running)); } } @@ -201,7 +189,6 @@ void threadpool_init(struct threadpool *tp, int num_threads) { /* Initialize the thread counters. */ tp->num_threads = num_threads; - tp->num_threads_waiting = 0; #ifdef SWIFT_DEBUG_THREADPOOL if ((tp->logs = (struct mapper_log *)malloc(sizeof(struct mapper_log) * @@ -220,12 +207,10 @@ void threadpool_init(struct threadpool *tp, int num_threads) { we will just do work in the (blocked) calling thread. */ if (num_threads == 1) return; - /* Init the threadpool mutexes. */ - if (pthread_mutex_init(&tp->thread_mutex, NULL) != 0) - error("Failed to initialize mutex."); - if (pthread_cond_init(&tp->control_cond, NULL) != 0 || - pthread_cond_init(&tp->thread_cond, NULL) != 0) - error("Failed to initialize condition variables."); + /* Init the barriers. */ + if (pthread_barrier_init(&tp->wait_barrier, NULL, num_threads) != 0 || + pthread_barrier_init(&tp->run_barrier, NULL, num_threads) != 0) + error("Failed to initialize barriers."); /* Set the task counter to zero. */ tp->map_data_size = 0; @@ -242,17 +227,13 @@ void threadpool_init(struct threadpool *tp, int num_threads) { } /* Create and start the threads. */ - pthread_mutex_lock(&tp->thread_mutex); for (int k = 0; k < num_threads - 1; k++) { if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0) error("Failed to create threadpool runner thread."); } /* Wait for all the threads to be up and running. */ - while (tp->num_threads_waiting < tp->num_threads - 1) { - pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); - } - pthread_mutex_unlock(&tp->thread_mutex); + pthread_barrier_wait(&tp->wait_barrier); } /** @@ -289,7 +270,6 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function, } /* Set the map data and signal the threads. */ - pthread_mutex_lock(&tp->thread_mutex); tp->map_data_stride = stride; tp->map_data_size = N; tp->map_data_count = 0; @@ -301,23 +281,15 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function, tp->map_data = map_data; tp->map_extra_data = extra_data; tp->num_threads_running = 0; - pthread_cond_broadcast(&tp->thread_cond); /* Wait for all the threads to be up and running. */ - while (tp->num_threads_running < tp->num_threads - 1) { - pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); - } + pthread_barrier_wait(&tp->run_barrier); /* Do some work while I'm at it. */ - pthread_mutex_unlock(&tp->thread_mutex); threadpool_chomp(tp, tp->num_threads - 1); - pthread_mutex_lock(&tp->thread_mutex); /* Wait for all threads to be done. */ - while (tp->num_threads_waiting < tp->num_threads - 1) { - pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); - } - pthread_mutex_unlock(&tp->thread_mutex); + pthread_barrier_wait(&tp->wait_barrier); #ifdef SWIFT_DEBUG_THREADPOOL /* Log the total call time to thread id -1. */ diff --git a/src/threadpool.h b/src/threadpool.h index 351646ad89bfe09b7c46361f3c9a7c0d65e3af1a..019403f658a22d36c4a6e1ec1ae1fdc47c62658d 100644 --- a/src/threadpool.h +++ b/src/threadpool.h @@ -70,8 +70,8 @@ struct threadpool { pthread_t *threads; /* This is where threads go to rest. */ - pthread_mutex_t thread_mutex; - pthread_cond_t control_cond, thread_cond; + pthread_barrier_t wait_barrier; + pthread_barrier_t run_barrier; /* Current map data and count. */ void *map_data, *map_extra_data; @@ -83,7 +83,7 @@ struct threadpool { int num_threads; /* Counter for the number of threads that are done. */ - volatile int num_threads_waiting, num_threads_running; + volatile int num_threads_running; #ifdef SWIFT_DEBUG_THREADPOOL struct mapper_log *logs;