diff --git a/src/threadpool.c b/src/threadpool.c index cba8d78e882552b610ffd6b2424af1eb3a1bc876..005be0ae4a9e7c2221d05dffb5028ca76b039851 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -129,6 +129,36 @@ void threadpool_dump_log(struct threadpool *tp, const char *filename, } #endif // SWIFT_DEBUG_THREADPOOL +/** + * @brief Runner main loop, get a chunk and call the mapper function. + */ +void threadpool_chomp(struct threadpool *tp, int tid) { + /* Loop until we can't get a chunk. */ + while (1) { + /* Desired chunk size. */ + size_t chunk_size = + (tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads); + if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk; + if (chunk_size < 1) chunk_size = 1; + + /* Get a chunk and check its size. */ + size_t task_ind = atomic_add(&tp->map_data_count, chunk_size); + if (task_ind >= tp->map_data_size) break; + if (task_ind + chunk_size > tp->map_data_size) + chunk_size = tp->map_data_size - task_ind; + +/* Call the mapper function. */ +#ifdef SWIFT_DEBUG_THREADPOOL + ticks tic = getticks(); +#endif + tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind), + chunk_size, tp->map_extra_data); +#ifdef SWIFT_DEBUG_THREADPOOL + threadpool_log(tp, tid, chunk_size, tic, getticks()); +#endif + } +} + void *threadpool_runner(void *data) { /* Our threadpool. */ @@ -140,7 +170,7 @@ void *threadpool_runner(void *data) { /* Let the controller know that this thread is waiting. */ pthread_mutex_lock(&tp->thread_mutex); tp->num_threads_waiting += 1; - if (tp->num_threads_waiting == tp->num_threads) { + if (tp->num_threads_waiting == tp->num_threads - 1) { pthread_cond_signal(&tp->control_cond); } @@ -151,35 +181,13 @@ void *threadpool_runner(void *data) { const int tid = tp->num_threads_running; #endif tp->num_threads_running += 1; - if (tp->num_threads_running == tp->num_threads) { + if (tp->num_threads_running == tp->num_threads - 1) { pthread_cond_signal(&tp->control_cond); } pthread_mutex_unlock(&tp->thread_mutex); - /* The index of the mapping task we will work on next. */ - while (1) { - /* Desired chunk size. */ - size_t chunk_size = - (tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads); - if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk; - if (chunk_size < 1) chunk_size = 1; - - /* Get a chunk and check its size. */ - size_t task_ind = atomic_add(&tp->map_data_count, chunk_size); - if (task_ind >= tp->map_data_size) break; - if (task_ind + chunk_size > tp->map_data_size) - chunk_size = tp->map_data_size - task_ind; - -/* Call the mapper function. */ -#ifdef SWIFT_DEBUG_THREADPOOL - ticks tic = getticks(); -#endif - tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind), - chunk_size, tp->map_extra_data); -#ifdef SWIFT_DEBUG_THREADPOOL - threadpool_log(tp, tid, chunk_size, tic, getticks()); -#endif - } + /* Do actual work. */ + threadpool_chomp(tp, tid); } } @@ -226,21 +234,22 @@ void threadpool_init(struct threadpool *tp, int num_threads) { tp->map_data_chunk = 0; tp->map_function = NULL; - /* Allocate the threads. */ - if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads)) == - NULL) { + /* Allocate the threads, one less than requested since the calling thread + works as well. */ + if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * + (num_threads - 1))) == NULL) { error("Failed to allocate thread array."); } /* Create and start the threads. */ pthread_mutex_lock(&tp->thread_mutex); - for (int k = 0; k < num_threads; k++) { + for (int k = 0; k < num_threads - 1; k++) { if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0) error("Failed to create threadpool runner thread."); } /* Wait for all the threads to be up and running. */ - while (tp->num_threads_waiting < tp->num_threads) { + while (tp->num_threads_waiting < tp->num_threads - 1) { pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); } pthread_mutex_unlock(&tp->thread_mutex); @@ -295,12 +304,15 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function, pthread_cond_broadcast(&tp->thread_cond); /* Wait for all the threads to be up and running. */ - while (tp->num_threads_running < tp->num_threads) { + while (tp->num_threads_running < tp->num_threads - 1) { pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); } + /* Do some work while I'm at it. */ + threadpool_chomp(tp, tp->num_threads - 1); + /* Wait for all threads to be done. */ - while (tp->num_threads_waiting < tp->num_threads) { + while (tp->num_threads_waiting < tp->num_threads - 1) { pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); } pthread_mutex_unlock(&tp->thread_mutex);