Skip to content
Snippets Groups Projects
threadpool.c 5.57 KiB
/*******************************************************************************
 * This file is part of SWIFT.
 * Copyright (c) 2016 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <float.h>
#include <limits.h>
#include <math.h>
#include <stdlib.h>
#include <string.h>

/* This object's header. */
#include "threadpool.h"

/* Local headers. */
#include "atomic.h"
#include "error.h"

void *threadpool_runner(void *data) {

  /* Our threadpool. */
  struct threadpool *tp = (struct threadpool *)data;

  /* Main loop. */
  while (1) {

    /* Let the controller know that this thread is waiting. */
    pthread_mutex_lock(&tp->thread_mutex);
    tp->num_threads_waiting += 1;
    if (tp->num_threads_waiting == tp->num_threads) {
      pthread_cond_signal(&tp->control_cond);
    }

    /* Wait for the controller. */
    pthread_cond_wait(&tp->thread_cond, &tp->thread_mutex);
    tp->num_threads_waiting -= 1;
    tp->num_threads_running += 1;
    if (tp->num_threads_running == tp->num_threads) {
      pthread_cond_signal(&tp->control_cond);
    }
    pthread_mutex_unlock(&tp->thread_mutex);

    /* The index of the mapping task we will work on next. */
    size_t task_ind;
    while ((task_ind = atomic_add(&tp->map_data_count, tp->map_data_chunk)) <
           tp->map_data_size) {
      const int num_elements = task_ind + tp->map_data_chunk > tp->map_data_size
                                   ? tp->map_data_size - task_ind
                                   : tp->map_data_chunk;
      tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind),
                       num_elements, tp->map_extra_data);
    }
  }
}

/**
 * @brief Initialises the #threadpool with a given number of threads.
 *
 * @param tp The #threadpool.
 * @param num_threads The number of threads.
 */
void threadpool_init(struct threadpool *tp, int num_threads) {

  /* Initialize the thread counters. */
  tp->num_threads = num_threads;
  tp->num_threads_waiting = 0;

  /* Init the threadpool mutexes. */
  if (pthread_mutex_init(&tp->thread_mutex, NULL) != 0)
    error("Failed to initialize mutexex.");
  if (pthread_cond_init(&tp->control_cond, NULL) != 0 ||
      pthread_cond_init(&tp->thread_cond, NULL) != 0)
    error("Failed to initialize condition variables.");

  /* Set the task counter to zero. */
  tp->map_data_size = 0;
  tp->map_data_count = 0;
  tp->map_data_stride = 0;
  tp->map_data_chunk = 0;
  tp->map_function = NULL;

  /* Allocate the threads. */
  if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads)) ==
      NULL) {
    error("Failed to allocate thread array.");
  }

  /* Create and start the threads. */
  pthread_mutex_lock(&tp->thread_mutex);
  for (int k = 0; k < num_threads; k++) {
    if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0)
      error("Failed to create threadpool runner thread.");
  }

  /* Wait for all the threads to be up and running. */
  while (tp->num_threads_waiting < tp->num_threads) {
    pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
  }
  pthread_mutex_unlock(&tp->thread_mutex);
}

/**
 * @brief Map a function to an array of data in parallel using a #threadpool.
 *
 * The function @c map_function is called on each element of @c map_data
 * in parallel.
 *
 * @param tp The #threadpool on which to run.
 * @param map_function The function that will be applied to the map data.
 * @param map_data The data on which the mapping function will be called.
 * @param N Number of elements in @c map_data.
 * @param stride Size, in bytes, of each element of @c map_data.
 * @param chunk Number of map data elements to pass to the function at a time.
 * @param extra_data Addtitional pointer that will be passed to the mapping
 *        function, may contain additional data.
 */
void threadpool_map(struct threadpool *tp, threadpool_map_function map_function,
                    void *map_data, size_t N, int stride, int chunk,
                    void *extra_data) {

  /* Set the map data and signal the threads. */
  pthread_mutex_lock(&tp->thread_mutex);
  tp->map_data_stride = stride;
  tp->map_data_size = N;
  tp->map_data_count = 0;
  tp->map_data_chunk = chunk;
  tp->map_function = map_function;
  tp->map_data = map_data;
  tp->map_extra_data = extra_data;
  tp->num_threads_running = 0;
  pthread_cond_broadcast(&tp->thread_cond);

  /* Wait for all the threads to be up and running. */
  while (tp->num_threads_running < tp->num_threads) {
    pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
  }

  /* Wait for all threads to be done. */
  while (tp->num_threads_waiting < tp->num_threads) {
    pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
  }
  pthread_mutex_unlock(&tp->thread_mutex);
}

/**
 * @brief Frees up the memory allocated for this #threadpool.
 */
void threadpool_clean(struct threadpool *tp) { free(tp->threads); }