Commit 345d9c4a authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

make the thread that calls threadpool_map do work as well, as if it was the Nth thread in the pool.

parent 0bac5a1b
......@@ -129,6 +129,36 @@ void threadpool_dump_log(struct threadpool *tp, const char *filename,
}
#endif // SWIFT_DEBUG_THREADPOOL
/**
* @brief Runner main loop, get a chunk and call the mapper function.
*/
void threadpool_chomp(struct threadpool *tp, int tid) {
/* Loop until we can't get a chunk. */
while (1) {
/* Desired chunk size. */
size_t chunk_size =
(tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads);
if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk;
if (chunk_size < 1) chunk_size = 1;
/* Get a chunk and check its size. */
size_t task_ind = atomic_add(&tp->map_data_count, chunk_size);
if (task_ind >= tp->map_data_size) break;
if (task_ind + chunk_size > tp->map_data_size)
chunk_size = tp->map_data_size - task_ind;
/* Call the mapper function. */
#ifdef SWIFT_DEBUG_THREADPOOL
ticks tic = getticks();
#endif
tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind),
chunk_size, tp->map_extra_data);
#ifdef SWIFT_DEBUG_THREADPOOL
threadpool_log(tp, tid, chunk_size, tic, getticks());
#endif
}
}
void *threadpool_runner(void *data) {
/* Our threadpool. */
......@@ -140,7 +170,7 @@ void *threadpool_runner(void *data) {
/* Let the controller know that this thread is waiting. */
pthread_mutex_lock(&tp->thread_mutex);
tp->num_threads_waiting += 1;
if (tp->num_threads_waiting == tp->num_threads) {
if (tp->num_threads_waiting == tp->num_threads - 1) {
pthread_cond_signal(&tp->control_cond);
}
......@@ -151,35 +181,13 @@ void *threadpool_runner(void *data) {
const int tid = tp->num_threads_running;
#endif
tp->num_threads_running += 1;
if (tp->num_threads_running == tp->num_threads) {
if (tp->num_threads_running == tp->num_threads - 1) {
pthread_cond_signal(&tp->control_cond);
}
pthread_mutex_unlock(&tp->thread_mutex);
/* The index of the mapping task we will work on next. */
while (1) {
/* Desired chunk size. */
size_t chunk_size =
(tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads);
if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk;
if (chunk_size < 1) chunk_size = 1;
/* Get a chunk and check its size. */
size_t task_ind = atomic_add(&tp->map_data_count, chunk_size);
if (task_ind >= tp->map_data_size) break;
if (task_ind + chunk_size > tp->map_data_size)
chunk_size = tp->map_data_size - task_ind;
/* Call the mapper function. */
#ifdef SWIFT_DEBUG_THREADPOOL
ticks tic = getticks();
#endif
tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind),
chunk_size, tp->map_extra_data);
#ifdef SWIFT_DEBUG_THREADPOOL
threadpool_log(tp, tid, chunk_size, tic, getticks());
#endif
}
/* Do actual work. */
threadpool_chomp(tp, tid);
}
}
......@@ -226,21 +234,22 @@ void threadpool_init(struct threadpool *tp, int num_threads) {
tp->map_data_chunk = 0;
tp->map_function = NULL;
/* Allocate the threads. */
if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads)) ==
NULL) {
/* Allocate the threads, one less than requested since the calling thread
works as well. */
if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) *
(num_threads - 1))) == NULL) {
error("Failed to allocate thread array.");
}
/* Create and start the threads. */
pthread_mutex_lock(&tp->thread_mutex);
for (int k = 0; k < num_threads; k++) {
for (int k = 0; k < num_threads - 1; k++) {
if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0)
error("Failed to create threadpool runner thread.");
}
/* Wait for all the threads to be up and running. */
while (tp->num_threads_waiting < tp->num_threads) {
while (tp->num_threads_waiting < tp->num_threads - 1) {
pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
}
pthread_mutex_unlock(&tp->thread_mutex);
......@@ -295,12 +304,15 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function,
pthread_cond_broadcast(&tp->thread_cond);
/* Wait for all the threads to be up and running. */
while (tp->num_threads_running < tp->num_threads) {
while (tp->num_threads_running < tp->num_threads - 1) {
pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
}
/* Do some work while I'm at it. */
threadpool_chomp(tp, tp->num_threads - 1);
/* Wait for all threads to be done. */
while (tp->num_threads_waiting < tp->num_threads) {
while (tp->num_threads_waiting < tp->num_threads - 1) {
pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
}
pthread_mutex_unlock(&tp->thread_mutex);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment