Commit 76a66528 authored by Pedro Gonnet's avatar Pedro Gonnet Committed by Matthieu Schaller

Add threadpool_fixed_chunk_size, which when passed to threadpool_map, splits the…

parent b4e619f7
......@@ -136,10 +136,16 @@ void threadpool_chomp(struct threadpool *tp, int tid) {
/* Loop until we can't get a chunk. */
while (1) {
/* Desired chunk size. */
size_t chunk_size =
(tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads);
if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk;
/* Compute the desired chunk size. */
ptrdiff_t chunk_size;
if (tp->map_data_chunk == threadpool_uniform_chunk_size) {
chunk_size = (int)((tid + 1) * tp->map_data_size / tp->num_threads) -
(int)(tid * tp->map_data_size / tp->num_threads);
} else {
chunk_size =
(tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads);
if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk;
}
if (chunk_size < 1) chunk_size = 1;
/* Get a chunk and check its size. */
......@@ -252,7 +258,10 @@ void threadpool_init(struct threadpool *tp, int num_threads) {
* @param N Number of elements in @c map_data.
* @param stride Size, in bytes, of each element of @c map_data.
* @param chunk Number of map data elements to pass to the function at a time,
* or #threadpool_auto_chunk_size to choose the number automatically.
* or #threadpool_auto_chunk_size to choose the number dynamically
* depending on the number of threads and tasks (recommended), or
* #threadpool_uniform_chunk_size to spread the tasks evenly over the
* threads in one go.
* @param extra_data Addtitional pointer that will be passed to the mapping
* function, may contain additional data.
*/
......@@ -278,11 +287,14 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function,
tp->map_data_stride = stride;
tp->map_data_size = N;
tp->map_data_count = 0;
tp->map_data_chunk =
(chunk == threadpool_auto_chunk_size)
? max((int)(N / (tp->num_threads * threadpool_default_chunk_ratio)),
1)
: chunk;
if (chunk == threadpool_auto_chunk_size) {
tp->map_data_chunk =
max((int)(N / (tp->num_threads * threadpool_default_chunk_ratio)), 1);
} else if (chunk == threadpool_uniform_chunk_size) {
tp->map_data_chunk = threadpool_uniform_chunk_size;
} else {
tp->map_data_chunk = chunk;
}
tp->map_function = map_function;
tp->map_data = map_data;
tp->map_extra_data = extra_data;
......
......@@ -24,6 +24,7 @@
/* Standard headers */
#include <pthread.h>
#include <stddef.h>
/* Local includes. */
#include "barrier.h"
......@@ -33,6 +34,7 @@
#define threadpool_log_initial_size 1000
#define threadpool_default_chunk_ratio 7
#define threadpool_auto_chunk_size 0
#define threadpool_uniform_chunk_size -1
/* Function type for mappings. */
typedef void (*threadpool_map_function)(void *map_data, int num_elements,
......@@ -77,8 +79,8 @@ struct threadpool {
/* Current map data and count. */
void *map_data, *map_extra_data;
volatile size_t map_data_count, map_data_size, map_data_stride,
map_data_chunk;
volatile size_t map_data_count, map_data_size, map_data_stride;
volatile ptrdiff_t map_data_chunk;
volatile threadpool_map_function map_function;
/* Number of threads in this pool. */
......
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (C) 2016 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
* Copyright (C) 2020 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
......@@ -48,6 +49,27 @@ void map_function_second(void *map_data, int num_elements, void *extra_data) {
}
}
void map_function_check_uniform(void *map_data, int num_elements,
void *extra_data) {
const int *inputs = (int *)map_data;
int count = inputs[0];
if (num_elements == 1) {
/* Single element. Sum this in the extra_data counter. Should
* be the sum of counts when threadpool is completed. */
atomic_add((int *)extra_data, count);
} else {
for (int ind = 1; ind < num_elements; ind++) {
if (inputs[ind] != count + 1) {
printf(" uniform chunking not correct, out of sequence\n");
fflush(stdout);
exit(1);
}
count = inputs[ind];
}
}
printf(" map_function_check_uniform handled %d elements\n", num_elements);
}
int main(int argc, char *argv[]) {
// Some constants for this test.
......@@ -95,5 +117,57 @@ int main(int argc, char *argv[]) {
printf("\n");
}
printf("# threadpool_uniform_chunk_size checks\n");
/* Check the spread of threads with threadpool_uniform_chunk_size */
int counts[23];
for (int i = 0; i < 23; i++) counts[i] = i;
struct threadpool utp;
int unum_thread = 7;
threadpool_init(&utp, unum_thread);
/* Under provision of threads. */
int dummy;
printf("# under provision\n");
threadpool_map(&utp, map_function_check_uniform, counts, 23, sizeof(int),
threadpool_uniform_chunk_size, &dummy);
/* Over provision of threads. */
int sum = 0;
for (int i = 0; i < 5; i++) sum += i;
static int lsum = 0;
printf("# over provision\n");
threadpool_map(&utp, map_function_check_uniform, counts, 5, sizeof(int),
threadpool_uniform_chunk_size, &lsum);
if (lsum != sum) {
printf(
" uniform chunking not correct, sum of tids failed "
"(%d != %d).\n",
sum, lsum);
fflush(stdout);
exit(1);
}
/* Exact provision of threads. */
sum = 0;
for (int i = 0; i < unum_thread; i++) sum += i;
lsum = 0;
printf("# exact provision\n");
threadpool_map(&utp, map_function_check_uniform, counts, unum_thread,
sizeof(int), threadpool_uniform_chunk_size, &lsum);
if (lsum != sum) {
printf(
" uniform chunking not correct, sum of tids failed "
"(%d != %d).\n",
sum, lsum);
fflush(stdout);
exit(1);
}
threadpool_clean(&utp);
printf("# passed uniform checks\n");
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment