/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2016 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see .
*
******************************************************************************/
/* Config parameters. */
#include
/* Some standard headers. */
#include
#include
#include
#include
#include
#include
#ifdef SWIFT_DEBUG_THREADPOOL
#include
#endif
/* This object's header. */
#include "threadpool.h"
/* Local headers. */
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "minmax.h"
/* Keys for thread specific data. */
static pthread_key_t threadpool_tid;
/* Affinity mask shared by all threads, and if set. */
#ifdef HAVE_SETAFFINITY
static cpu_set_t thread_affinity;
static int thread_affinity_set = 0;
#endif
/* Local declarations. */
static void threadpool_apply_affinity_mask(void);
#ifdef SWIFT_DEBUG_THREADPOOL
/**
* @brief Store a log entry of the given chunk.
*/
static void threadpool_log(struct threadpool *tp, int tid, size_t chunk_size,
ticks tic, ticks toc) {
struct mapper_log *log = &tp->logs[tid > 0 ? tid : 0];
/* Check if we need to re-allocate the log buffer. */
if (log->count == log->size) {
log->size *= 2;
struct mapper_log_entry *new_log;
if ((new_log = (struct mapper_log_entry *)malloc(
sizeof(struct mapper_log_entry) * log->size)) == NULL)
error("Failed to re-allocate mapper log.");
memcpy(new_log, log->log, sizeof(struct mapper_log_entry) * log->count);
free(log->log);
log->log = new_log;
}
/* Store the new entry. */
struct mapper_log_entry *entry = &log->log[log->count];
entry->tid = tid;
entry->chunk_size = chunk_size;
entry->tic = tic;
entry->toc = toc;
entry->map_function = tp->map_function;
log->count++;
}
void threadpool_dump_log(struct threadpool *tp, const char *filename,
int reset) {
/* Open the output file. */
FILE *fd;
if ((fd = fopen(filename, "w")) == NULL)
error("Failed to create log file '%s'.", filename);
/* Create a buffer of function names. */
const int max_names = 100;
struct name_entry {
threadpool_map_function map_function;
const char *name;
};
struct name_entry names[max_names];
bzero(names, sizeof(struct name_entry) * max_names);
/* Write a header. */
fprintf(fd, "# map_function thread_id chunk_size tic toc\n");
fprintf(fd, "# {'num_threads': %i, 'cpufreq': %lli}\n", tp->num_threads,
clocks_get_cpufreq());
/* Loop over the per-tid logs and dump them. */
for (int k = 0; k < tp->num_threads; k++) {
struct mapper_log *log = &tp->logs[k];
/* Loop over the log entries and dump them. */
for (int i = 0; i < log->count; i++) {
struct mapper_log_entry *entry = &log->log[i];
/* Look for the function pointer in the buffer. */
int nid = 0;
while (nid < max_names && names[nid].map_function != entry->map_function)
nid++;
/* If the name was not found, make a new entry. */
if (nid == max_names) {
for (int j = 1; j < max_names; j++) names[j - 1] = names[j];
names[0].map_function = entry->map_function;
Dl_info dl_info;
dladdr(entry->map_function, &dl_info);
names[0].name = dl_info.dli_sname;
nid = 0;
}
/* Log a line to the file. */
fprintf(fd, "%s %i %i %lli %lli\n", names[nid].name, entry->tid,
entry->chunk_size, entry->tic, entry->toc);
}
/* Clear the log if requested. */
if (reset) log->count = 0;
}
/* Close the file. */
fclose(fd);
}
#endif // SWIFT_DEBUG_THREADPOOL
/**
* @brief Runner main loop, get a chunk and call the mapper function.
*/
static void threadpool_chomp(struct threadpool *tp, int tid) {
/* Store the thread ID as thread specific data. */
int localtid = tid;
pthread_setspecific(threadpool_tid, &localtid);
/* Loop until we can't get a chunk. */
while (1) {
/* Compute the desired chunk size. */
ptrdiff_t chunk_size;
if (tp->map_data_chunk == threadpool_uniform_chunk_size) {
chunk_size = ((tid + 1) * tp->map_data_size / tp->num_threads) -
(tid * tp->map_data_size / tp->num_threads);
} else {
chunk_size =
(tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads);
if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk;
}
if (chunk_size < 1) chunk_size = 1;
/* A chunk cannot exceed INT_MAX, as we use int elements in map_function. */
if (chunk_size > INT_MAX) chunk_size = INT_MAX;
/* Get a chunk and check its size. */
size_t task_ind = atomic_add(&tp->map_data_count, chunk_size);
if (task_ind >= tp->map_data_size) break;
if (task_ind + chunk_size > tp->map_data_size)
chunk_size = tp->map_data_size - task_ind;
/* Call the mapper function. */
#ifdef SWIFT_DEBUG_THREADPOOL
ticks tic = getticks();
#endif
tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind),
chunk_size, tp->map_extra_data);
#ifdef SWIFT_DEBUG_THREADPOOL
threadpool_log(tp, tid, chunk_size, tic, getticks());
#endif
}
}
/**
* @brief The thread start routine. Loops until told to exit.
*
* @param data the threadpool we are part of.
*/
static void *threadpool_runner(void *data) {
/* Our threadpool. */
struct threadpool *tp = (struct threadpool *)data;
/* Our affinity, if set. */
threadpool_apply_affinity_mask();
/* Main loop. */
while (1) {
/* Let the controller know that this thread is waiting. */
swift_barrier_wait(&tp->wait_barrier);
/* Wait for the controller. */
swift_barrier_wait(&tp->run_barrier);
/* If no map function is specified, just die. We use this as a mechanism
to shut down threads without leaving the barriers in an invalid state. */
if (tp->map_function == NULL) pthread_exit(NULL);
/* Do actual work. */
threadpool_chomp(tp, atomic_inc(&tp->num_threads_running));
}
}
/**
* @brief Initialises the #threadpool with a given number of threads.
*
* @param tp The #threadpool.
* @param num_threads The number of threads.
*/
void threadpool_init(struct threadpool *tp, int num_threads) {
/* Initialize the thread counters. */
tp->num_threads = num_threads;
/* Create thread local data areas. Only do this once for all threads. */
pthread_key_create(&threadpool_tid, NULL);
/* Store the main thread ID as thread specific data. */
static int localtid = 0;
pthread_setspecific(threadpool_tid, &localtid);
#ifdef SWIFT_DEBUG_THREADPOOL
if ((tp->logs = (struct mapper_log *)malloc(sizeof(struct mapper_log) *
num_threads)) == NULL)
error("Failed to allocate mapper logs.");
for (int k = 0; k < num_threads; k++) {
tp->logs[k].size = threadpool_log_initial_size;
tp->logs[k].count = 0;
if ((tp->logs[k].log = (struct mapper_log_entry *)malloc(
sizeof(struct mapper_log_entry) * tp->logs[k].size)) == NULL)
error("Failed to allocate mapper log.");
}
#endif
/* If there is only a single thread, do nothing more as of here as
we will just do work in the (blocked) calling thread. */
if (num_threads == 1) return;
/* Init the barriers. */
if (swift_barrier_init(&tp->wait_barrier, NULL, num_threads) != 0 ||
swift_barrier_init(&tp->run_barrier, NULL, num_threads) != 0)
error("Failed to initialize barriers.");
/* Set the task counter to zero. */
tp->map_data_size = 0;
tp->map_data_count = 0;
tp->map_data_stride = 0;
tp->map_data_chunk = 0;
tp->map_function = NULL;
/* Allocate the threads, one less than requested since the calling thread
works as well. */
if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) *
(num_threads - 1))) == NULL) {
error("Failed to allocate thread array.");
}
/* Create and start the threads. */
for (int k = 0; k < num_threads - 1; k++) {
if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0)
error("Failed to create threadpool runner thread.");
}
/* Wait for all the threads to be up and running. */
swift_barrier_wait(&tp->wait_barrier);
}
/**
* @brief Map a function to an array of data in parallel using a #threadpool.
*
* The function @c map_function is called on each element of @c map_data
* in parallel.
*
* @param tp The #threadpool on which to run.
* @param map_function The function that will be applied to the map data.
* @param map_data The data on which the mapping function will be called.
* @param N Number of elements in @c map_data.
* @param stride Size, in bytes, of each element of @c map_data.
* @param chunk Number of map data elements to pass to the function at a time,
* or #threadpool_auto_chunk_size to choose the number dynamically
* depending on the number of threads and tasks (recommended), or
* #threadpool_uniform_chunk_size to spread the tasks evenly over the
* threads in one go.
* @param extra_data Addtitional pointer that will be passed to the mapping
* function, may contain additional data.
*/
void threadpool_map(struct threadpool *tp, threadpool_map_function map_function,
void *map_data, size_t N, int stride, int chunk,
void *extra_data) {
#ifdef SWIFT_DEBUG_THREADPOOL
ticks tic_total = getticks();
#endif
/* If we just have a single thread, call the map function directly. */
if (tp->num_threads == 1) {
if (N <= INT_MAX) {
map_function(map_data, N, extra_data);
#ifdef SWIFT_DEBUG_THREADPOOL
tp->map_function = map_function;
threadpool_log(tp, 0, N, tic_total, getticks());
#endif
} else {
/* N > INT_MAX, we need to do this in chunks as map_function only takes
* an int. */
size_t chunk_size = INT_MAX;
size_t data_size = N;
size_t data_count = 0;
while (1) {
/* Call the mapper function. */
#ifdef SWIFT_DEBUG_THREADPOOL
ticks tic = getticks();
#endif
map_function((char *)map_data + (stride * data_count), chunk_size,
extra_data);
#ifdef SWIFT_DEBUG_THREADPOOL
threadpool_log(tp, 0, chunk_size, tic, getticks());
#endif
/* Get the next chunk and check its size. */
data_count += chunk_size;
if (data_count >= data_size) break;
if (data_count + chunk_size > data_size)
chunk_size = data_size - data_count;
}
}
return;
}
/* Set the map data and signal the threads. */
tp->map_data_stride = stride;
tp->map_data_size = N;
tp->map_data_count = 0;
if (chunk == threadpool_auto_chunk_size) {
tp->map_data_chunk =
max((N / (tp->num_threads * threadpool_default_chunk_ratio)), 1U);
} else if (chunk == threadpool_uniform_chunk_size) {
tp->map_data_chunk = threadpool_uniform_chunk_size;
} else {
tp->map_data_chunk = chunk;
}
tp->map_function = map_function;
tp->map_data = map_data;
tp->map_extra_data = extra_data;
tp->num_threads_running = 0;
/* Wait for all the threads to be up and running. */
swift_barrier_wait(&tp->run_barrier);
/* Do some work while I'm at it. */
threadpool_chomp(tp, tp->num_threads - 1);
/* Wait for all threads to be done. */
swift_barrier_wait(&tp->wait_barrier);
#ifdef SWIFT_DEBUG_THREADPOOL
/* Log the total call time to thread id -1. */
threadpool_log(tp, -1, N, tic_total, getticks());
#endif
}
/**
* @brief Re-sets the log for this #threadpool.
*/
#ifdef SWIFT_DEBUG_THREADPOOL
void threadpool_reset_log(struct threadpool *tp) {
for (int k = 0; k < tp->num_threads; k++) tp->logs[k].count = 0;
}
#endif
/**
* @brief Frees up the memory allocated for this #threadpool.
*/
void threadpool_clean(struct threadpool *tp) {
if (tp->num_threads > 1) {
/* Destroy the runner threads by calling them with a NULL mapper function
* and waiting for all the threads to terminate. This ensures that no
* thread is still waiting at a barrier. */
tp->map_function = NULL;
swift_barrier_wait(&tp->run_barrier);
for (int k = 0; k < tp->num_threads - 1; k++) {
void *retval;
pthread_join(tp->threads[k], &retval);
}
/* Release the barriers. */
if (swift_barrier_destroy(&tp->wait_barrier) != 0 ||
swift_barrier_destroy(&tp->run_barrier) != 0)
error("Failed to destroy threadpool barriers.");
/* Clean up memory. */
free(tp->threads);
}
#ifdef SWIFT_DEBUG_THREADPOOL
for (int k = 0; k < tp->num_threads; k++) {
free(tp->logs[k].log);
}
free(tp->logs);
#endif
}
/**
* @brief return the threadpool id of the current thread.
*/
int threadpool_gettid(void) {
int *tid = (int *)pthread_getspecific(threadpool_tid);
return *tid;
}
#ifdef HAVE_SETAFFINITY
/**
* @brief set an affinity mask to be used for all threads.
*
* @param affinity the mask to use.
*/
void threadpool_set_affinity_mask(cpu_set_t *affinity) {
memcpy(&thread_affinity, affinity, sizeof(cpu_set_t));
thread_affinity_set = 1;
}
#endif
/**
* @brief apply the affinity mask the current thread, if set.
*
*/
static void threadpool_apply_affinity_mask(void) {
#ifdef HAVE_SETAFFINITY
if (thread_affinity_set) {
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &thread_affinity);
}
#endif
}