From e33097723c7539fb227f32bac6d9c9ac2210f57c Mon Sep 17 00:00:00 2001 From: "Peter W. Draper" <p.w.draper@durham.ac.uk> Date: Fri, 28 Jul 2017 17:20:04 +0100 Subject: [PATCH] Revert "Merge branch 'threadpool_task_plots' into 'master' " This reverts merge request !375 --- configure.ac | 22 +-- examples/main.c | 37 +---- examples/plot_threadpool.py | 271 ------------------------------- examples/process_plot_threadpool | 96 ----------- src/engine.c | 26 ++- src/engine.h | 2 +- src/gravity.c | 2 +- src/queue.h | 2 +- src/scheduler.c | 6 +- src/space.c | 16 +- src/statistics.c | 4 +- src/swift.h | 1 - src/threadpool.c | 254 +++++++---------------------- src/threadpool.h | 49 +----- tests/testThreadpool.c | 6 - 15 files changed, 106 insertions(+), 688 deletions(-) delete mode 100755 examples/plot_threadpool.py delete mode 100755 examples/process_plot_threadpool diff --git a/configure.ac b/configure.ac index 448224a0ad..91ee913c03 100644 --- a/configure.ac +++ b/configure.ac @@ -189,19 +189,6 @@ if test "$enable_task_debugging" = "yes"; then AC_DEFINE([SWIFT_DEBUG_TASKS],1,[Enable task debugging]) fi -# Check if threadpool debugging is on. -AC_ARG_ENABLE([threadpool-debugging], - [AS_HELP_STRING([--enable-threadpool-debugging], - [Store threadpool mapper timing information and generate threadpool dump files @<:@yes/no@:>@] - )], - [enable_threadpool_debugging="$enableval"], - [enable_threadpool_debugging="no"] -) -if test "$enable_threadpool_debugging" = "yes"; then - AC_DEFINE([SWIFT_DEBUG_THREADPOOL],1,[Enable threadpool debugging]) - LDFLAGS="$LDFLAGS -rdynamic" -fi - # Check if the general timers are switched on. AC_ARG_ENABLE([timers], [AS_HELP_STRING([--enable-timers], @@ -910,10 +897,9 @@ AC_MSG_RESULT([ Multipole order : $with_multipole_order No gravity below ID : $no_gravity_below_id - Individual timers : $enable_timers - Task debugging : $enable_task_debugging - Threadpool debugging : $enable_threadpool_debugging - Debugging checks : $enable_debugging_checks - Gravity checks : $gravity_force_checks + Individual timers : $enable_timers + Task debugging : $enable_task_debugging + Debugging checks : $enable_debugging_checks + Gravity checks : $gravity_force_checks ------------------------]) diff --git a/examples/main.c b/examples/main.c index 65a048526b..583f05af82 100644 --- a/examples/main.c +++ b/examples/main.c @@ -153,8 +153,7 @@ int main(int argc, char *argv[]) { /* Let's pin the main thread */ #if defined(HAVE_SETAFFINITY) && defined(HAVE_LIBNUMA) && defined(_GNU_SOURCE) - if (((ENGINE_POLICY) & engine_policy_setaffinity) == - engine_policy_setaffinity) + if (((ENGINE_POLICY)&engine_policy_setaffinity) == engine_policy_setaffinity) engine_pin(); #endif @@ -164,7 +163,6 @@ int main(int argc, char *argv[]) { int with_aff = 0; int dry_run = 0; int dump_tasks = 0; - int dump_threadpool = 0; int nsteps = -2; int with_cosmology = 0; int with_external_gravity = 0; @@ -186,7 +184,7 @@ int main(int argc, char *argv[]) { /* Parse the parameters */ int c; - while ((c = getopt(argc, argv, "acCdDef:FgGhMn:P:sSt:Tv:y:Y:")) != -1) + while ((c = getopt(argc, argv, "acCdDef:FgGhMn:P:sSt:Tv:y:")) != -1) switch (c) { case 'a': with_aff = 1; @@ -275,21 +273,6 @@ int main(int argc, char *argv[]) { "Task dumping is only possible if SWIFT was configured with the " "--enable-task-debugging option."); } -#endif - break; - case 'Y': - if (sscanf(optarg, "%d", &dump_threadpool) != 1) { - if (myrank == 0) printf("Error parsing dump_threadpool (-Y). \n"); - if (myrank == 0) print_help_message(); - return 1; - } -#ifndef SWIFT_DEBUG_THREADPOOL - if (dump_threadpool) { - error( - "Threadpool dumping is only possible if SWIFT was configured " - "with the " - "--enable-threadpool-debugging option."); - } #endif break; case '?': @@ -770,22 +753,6 @@ int main(int argc, char *argv[]) { #endif // WITH_MPI } #endif // SWIFT_DEBUG_TASKS - -#ifdef SWIFT_DEBUG_THREADPOOL - /* Dump the task data using the given frequency. */ - if (dump_threadpool && (dump_threadpool == 1 || j % dump_threadpool == 1)) { - char dumpfile[40]; -#ifdef WITH_MPI - snprintf(dumpfile, 30, "threadpool_info-rank%d-step%d.dat", engine_rank, - j + 1); -#else - snprintf(dumpfile, 30, "threadpool_info-step%d.dat", j + 1); -#endif // WITH_MPI - threadpool_dump_log(&e.threadpool, dumpfile, 1); - } else { - threadpool_reset_log(&e.threadpool); - } -#endif // SWIFT_DEBUG_THREADPOOL } /* Print the values of the runner histogram. */ diff --git a/examples/plot_threadpool.py b/examples/plot_threadpool.py deleted file mode 100755 index 68b038ed40..0000000000 --- a/examples/plot_threadpool.py +++ /dev/null @@ -1,271 +0,0 @@ -#!/usr/bin/env python -""" -Usage: - plot_threadpool.py [options] input.dat output.png - -where input.dat is a threadpool info file for a step. Use the '-Y interval' -flag of the swift command to create these. The output plot will be called -'output.png'. The --limit option can be used to produce plots with the same -time span and the --expand option to expand each thread line into '*expand' -lines, so that adjacent tasks of the same type can be distinguished. Other -options can be seen using the --help flag. - -This file is part of SWIFT. -Copyright (c) 2015 Pedro Gonnet (pedro.gonnet@durham.ac.uk), - Bert Vandenbroucke (bert.vandenbroucke@ugent.be) - Matthieu Schaller (matthieu.schaller@durham.ac.uk) - (c) 2017 Peter W. Draper (p.w.draper@durham.ac.uk) - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU Lesser General Public License as published -by the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU Lesser General Public License -along with this program. If not, see <http://www.gnu.org/licenses/>. -""" - -import matplotlib -matplotlib.use("Agg") -import matplotlib.collections as collections -import matplotlib.ticker as plticker -import pylab as pl -import sys -import argparse - -# Handle the command line. -parser = argparse.ArgumentParser(description="Plot threadpool function graphs") - -parser.add_argument("input", help="Threadpool data file (-Y output)") -parser.add_argument("outpng", help="Name for output graphic file (PNG)") -parser.add_argument("-l", "--limit", dest="limit", - help="Upper time limit in millisecs (def: depends on data)", - default=0, type=int) -parser.add_argument("-e", "--expand", dest="expand", - help="Thread expansion factor (def: 1)", - default=1, type=int) -parser.add_argument("--height", dest="height", - help="Height of plot in inches (def: 4)", - default=4., type=float) -parser.add_argument("--width", dest="width", - help="Width of plot in inches (def: 16)", - default=16., type=float) -parser.add_argument("--nolegend", dest="nolegend", - help="Whether to show the legend (def: False)", - default=False, action="store_true") -parser.add_argument("-v", "--verbose", dest="verbose", - help="Show colour assignments and other details (def: False)", - default=False, action="store_true") - -args = parser.parse_args() -infile = args.input -outpng = args.outpng -delta_t = args.limit -expand = args.expand - -# Basic plot configuration. -PLOT_PARAMS = {"axes.labelsize": 10, - "axes.titlesize": 10, - "font.size": 12, - "legend.fontsize": 12, - "xtick.labelsize": 10, - "ytick.labelsize": 10, - "figure.figsize" : (args.width, args.height), - "figure.subplot.left" : 0.03, - "figure.subplot.right" : 0.995, - "figure.subplot.bottom" : 0.09, - "figure.subplot.top" : 0.99, - "figure.subplot.wspace" : 0., - "figure.subplot.hspace" : 0., - "lines.markersize" : 6, - "lines.linewidth" : 3. - } -pl.rcParams.update(PLOT_PARAMS) - -# A number of colours for the various types. Recycled when there are -# more task types than colours... -colours = ["cyan", "lightgray", "darkblue", "yellow", "tan", "dodgerblue", - "sienna", "aquamarine", "bisque", "blue", "green", "lightgreen", - "brown", "purple", "moccasin", "olivedrab", "chartreuse", - "darksage", "darkgreen", "green", "mediumseagreen", - "mediumaquamarine", "darkslategrey", "mediumturquoise", - "black", "cadetblue", "skyblue", "red", "slategray", "gold", - "slateblue", "blueviolet", "mediumorchid", "firebrick", - "magenta", "hotpink", "pink", "orange", "lightgreen"] -maxcolours = len(colours) - -# Read header. First two lines. -with open(infile) as infid: - head = [next(infid) for x in xrange(2)] -header = head[1][2:].strip() -header = eval(header) -nthread = int(header['num_threads']) + 1 -CPU_CLOCK = float(header['cpufreq']) / 1000.0 -print "Number of threads: ", nthread -if args.verbose: - print "CPU frequency:", CPU_CLOCK * 1000.0 - -# Read input. -data = pl.genfromtxt(infile, dtype=None, delimiter=" ") - -# Mixed types, so need to separate. -tics = [] -tocs = [] -funcs = [] -threads = [] -chunks = [] -for i in data: - if i[0] != "#": - funcs.append(i[0]) - if i[1] < 0: - threads.append(nthread-1) - else: - threads.append(i[1]) - chunks.append(i[2]) - tics.append(i[3]) - tocs.append(i[4]) -tics = pl.array(tics) -tocs = pl.array(tocs) -funcs = pl.array(funcs) -threads = pl.array(threads) -chunks = pl.array(chunks) - - -# Recover the start and end time -tic_step = min(tics) -toc_step = max(tocs) - -# Not known. - -# Calculate the time range, if not given. -delta_t = delta_t * CPU_CLOCK -if delta_t == 0: - dt = toc_step - tic_step - if dt > delta_t: - delta_t = dt - print "Data range: ", delta_t / CPU_CLOCK, "ms" - -# Once more doing the real gather and plots this time. -start_t = float(tic_step) -tics -= tic_step -tocs -= tic_step -end_t = (toc_step - start_t) / CPU_CLOCK - -# Get all "task" names and assign colours. -TASKTYPES = pl.unique(funcs) -print TASKTYPES - -# Set colours of task/subtype. -TASKCOLOURS = {} -ncolours = 0 -for task in TASKTYPES: - TASKCOLOURS[task] = colours[ncolours] - ncolours = (ncolours + 1) % maxcolours - -# For fiddling with colours... -if args.verbose: - print "#Selected colours:" - for task in sorted(TASKCOLOURS.keys()): - print "# " + task + ": " + TASKCOLOURS[task] - for task in sorted(SUBCOLOURS.keys()): - print "# " + task + ": " + SUBCOLOURS[task] - -tasks = {} -tasks[-1] = [] -for i in range(nthread*expand): - tasks[i] = [] - -# Counters for each thread when expanding. -ecounter = [] -for i in range(nthread): - ecounter.append(0) - -for i in range(len(threads)): - thread = threads[i] - - # Expand to cover extra lines if expanding. - ethread = thread * expand + (ecounter[thread] % expand) - ecounter[thread] = ecounter[thread] + 1 - thread = ethread - - tasks[thread].append({}) - tasks[thread][-1]["type"] = funcs[i] - tic = tics[i] / CPU_CLOCK - toc = tocs[i] / CPU_CLOCK - tasks[thread][-1]["tic"] = tic - tasks[thread][-1]["toc"] = toc - tasks[thread][-1]["colour"] = TASKCOLOURS[funcs[i]] - -# Use expanded threads from now on. -nthread = nthread * expand - -typesseen = [] -fig = pl.figure() -ax = fig.add_subplot(1,1,1) -ax.set_xlim(-delta_t * 0.01 / CPU_CLOCK, delta_t * 1.01 / CPU_CLOCK) -ax.set_ylim(0, nthread) - -# Fake thread is used to colour the whole range, do that first. -tictocs = [] -colours = [] -j = 0 -for task in tasks[nthread - expand]: - tictocs.append((task["tic"], task["toc"] - task["tic"])) - colours.append(task["colour"]) -ax.broken_barh(tictocs, [0,(nthread-1)], facecolors = colours, linewidth=0, alpha=0.15) - -# And we don't plot the fake thread. -nthread = nthread - expand -for i in range(nthread): - - # Collect ranges and colours into arrays. - tictocs = [] - colours = [] - j = 0 - for task in tasks[i]: - tictocs.append((task["tic"], task["toc"] - task["tic"])) - colours.append(task["colour"]) - - # Legend support, collections don't add to this. - qtask = task["type"] - if qtask not in typesseen: - pl.plot([], [], color=task["colour"], label=qtask) - typesseen.append(qtask) - - # Now plot. - ax.broken_barh(tictocs, [i+0.05,0.90], facecolors = colours, linewidth=0) - -# Legend and room for it. -nrow = len(typesseen) / 5 -if not args.nolegend: - ax.fill_between([0, 0], nthread+0.5, nthread + nrow + 0.5, facecolor="white") - ax.set_ylim(0, nthread + 0.5) - ax.legend(loc=1, shadow=True, bbox_to_anchor=(0., 1.05 ,1., 0.2), mode="expand", ncol=5) - box = ax.get_position() - ax.set_position([box.x0, box.y0, box.width, box.height*0.8]) - -# Start and end of time-step -ax.plot([0, 0], [0, nthread + nrow + 1], 'k--', linewidth=1) -ax.plot([end_t, end_t], [0, nthread + nrow + 1], 'k--', linewidth=1) - -ax.set_xlabel("Wall clock time [ms]", labelpad=0.) -if expand == 1: - ax.set_ylabel("Thread ID", labelpad=0 ) -else: - ax.set_ylabel("Thread ID * " + str(expand), labelpad=0 ) -ax.set_yticks(pl.array(range(nthread)), True) - -loc = plticker.MultipleLocator(base=expand) -ax.yaxis.set_major_locator(loc) -ax.grid(True, which='major', axis="y", linestyle="-") - -pl.show() -pl.savefig(outpng) -print "Graphics done, output written to", outpng - -sys.exit(0) diff --git a/examples/process_plot_threadpool b/examples/process_plot_threadpool deleted file mode 100755 index d7bef48d31..0000000000 --- a/examples/process_plot_threadpool +++ /dev/null @@ -1,96 +0,0 @@ -#!/bin/bash -# -# Usage: -# process_plot_threadpool nprocess time-range-ms -# -# Description: -# Process all the threadpool info files in the current directory -# creating function graphs for steps and threads. -# -# The input files are created by a run using the "-Y interval" flag and -# should be named "threadpool_info-step<n>.dat" in the current directory. -# All located files will be processed using "nprocess" concurrent -# processes and all plots will have the given time range. An output -# HTML file "index.html" will be created to view all the plots. -# -# -# This file is part of SWIFT: -# -# Copyright (C) 2017 Peter W. Draper (p.w.draper@durham.ac.uk) -# All Rights Reserved. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License as published -# by the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -# Handle command-line -if test "$2" == ""; then - echo "Usage: $0 nprocess time-range-ms" - exit 1 -fi -NPROCS=$1 -TIMERANGE=$2 - -# Find all thread info files. Use version sort to get into correct order. -files=$(ls -v threadpool_info-step*.dat) -if test $? != 0; then - echo "Failed to find any threadpool info files" - exit 1 -fi - -# Construct list of names, the step no and names for the graphics. -list="" -for f in $files; do - s=$(echo $f| sed 's,threadpool_info-step\(.*\).dat,\1,') - list="$list $f $s poolstep${s}r" -done - -# And process them, -echo "Processing threadpool info files..." -echo $list | xargs -P $NPROCS -n 3 /bin/bash -c "./plot_threadpool.py --expand 3 --limit $TIMERANGE --width 16 --height 4 \$0 \$2 " - -echo "Writing output index.html file" -# Construct document - serial. -cat <<EOF > index.html - <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> -<html> - <head> - <title>SWIFT task graphs</title> - </head> - <body> - <h1>SWIFT task graphs</h1> -EOF - -echo $list | xargs -n 3 | while read f s g; do - cat <<EOF >> index.html -<h2>Step $s</h2> -EOF - cat <<EOF >> index.html -<a href="poolstep${s}r${i}.html"><img src="poolstep${s}r${i}.png" width=400px/></a> -EOF - cat <<EOF > poolstep${s}r${i}.html - <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> -<html> -<body> -<img src="poolstep${s}r${i}.png"> -<pre> -EOF -done - -cat <<EOF >> index.html - </body> -</html> -EOF - -echo "Finished" - -exit diff --git a/src/engine.c b/src/engine.c index 6bd2fb5ade..ee48d6565d 100644 --- a/src/engine.c +++ b/src/engine.c @@ -2402,6 +2402,21 @@ void engine_make_gravityrecursive_tasks(struct engine *e) { /* } */ } +void engine_check_sort_tasks(struct engine *e, struct cell *c) { + + /* Find the parent sort task, if any, and copy its flags. */ + if (c->sorts != NULL) { + struct cell *parent = c->parent; + while (parent != NULL && parent->sorts == NULL) parent = parent->parent; + if (parent != NULL) c->sorts->flags |= parent->sorts->flags; + } + + /* Recurse? */ + if (c->split) + for (int k = 0; k < 8; k++) + if (c->progeny[k] != NULL) engine_check_sort_tasks(e, c->progeny[k]); +} + /** * @brief Fill the #space's task list. * @@ -2480,6 +2495,9 @@ void engine_maketasks(struct engine *e) { for (int k = 0; k < nr_cells; k++) engine_make_hierarchical_tasks(e, &cells[k]); + /* Append hierarchical tasks to each cell. */ + for (int k = 0; k < nr_cells; k++) engine_check_sort_tasks(e, &cells[k]); + /* Run through the tasks and make force tasks for each density task. Each force task depends on the cell ghosts and unlocks the kick task of its super-cell. */ @@ -2774,7 +2792,7 @@ int engine_marktasks(struct engine *e) { /* Run through the tasks and mark as skip or not. */ size_t extra_data[3] = {(size_t)e, rebuild_space, (size_t)&e->sched}; threadpool_map(&e->threadpool, engine_marktasks_mapper, s->tasks, s->nr_tasks, - sizeof(struct task), 0, extra_data); + sizeof(struct task), 10000, extra_data); rebuild_space = extra_data[1]; if (e->verbose) @@ -3678,7 +3696,7 @@ void engine_drift_all(struct engine *e) { #endif threadpool_map(&e->threadpool, engine_do_drift_all_mapper, e->s->cells_top, - e->s->nr_cells, sizeof(struct cell), 0, e); + e->s->nr_cells, sizeof(struct cell), 1, e); /* Synchronize particle positions */ space_synchronize_particle_positions(e->s); @@ -3730,7 +3748,7 @@ void engine_drift_top_multipoles(struct engine *e) { const ticks tic = getticks(); threadpool_map(&e->threadpool, engine_do_drift_top_multipoles_mapper, - e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 0, e); + e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 10, e); #ifdef SWIFT_DEBUG_CHECKS /* Check that all cells have been drifted to the current time. */ @@ -3768,7 +3786,7 @@ void engine_reconstruct_multipoles(struct engine *e) { const ticks tic = getticks(); threadpool_map(&e->threadpool, engine_do_reconstruct_multipoles_mapper, - e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 0, e); + e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 10, e); if (e->verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), diff --git a/src/engine.h b/src/engine.h index 79ff45a69a..c7aaa08b57 100644 --- a/src/engine.h +++ b/src/engine.h @@ -77,7 +77,7 @@ extern const char *engine_policy_names[]; #define engine_queue_scale 1.2 #define engine_maxtaskspercell 96 #define engine_maxproxies 64 -#define engine_tasksreweight 1 +#define engine_tasksreweight 10 #define engine_parts_size_grow 1.05 #define engine_redistribute_alloc_margin 1.2 #define engine_default_energy_file_name "energy" diff --git a/src/gravity.c b/src/gravity.c index 49bbaca39b..97b2955b32 100644 --- a/src/gravity.c +++ b/src/gravity.c @@ -207,7 +207,7 @@ void gravity_exact_force_compute(struct space *s, const struct engine *e) { data.const_G = e->physical_constants->const_newton_G; threadpool_map(&s->e->threadpool, gravity_exact_force_compute_mapper, - s->gparts, s->nr_gparts, sizeof(struct gpart), 0, &data); + s->gparts, s->nr_gparts, sizeof(struct gpart), 1000, &data); message("Computed exact gravity for %d gparts (took %.3f %s). ", data.counter_global, clocks_from_ticks(getticks() - tic), diff --git a/src/queue.h b/src/queue.h index c85cf0cabe..951a3e5a05 100644 --- a/src/queue.h +++ b/src/queue.h @@ -29,7 +29,7 @@ #define queue_sizeinit 100 #define queue_sizegrow 2 #define queue_search_window 8 -#define queue_incoming_size 10240 +#define queue_incoming_size 1024 #define queue_struct_align 64 /* Counters. */ diff --git a/src/scheduler.c b/src/scheduler.c index 2e64595125..e14fc017d3 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -759,7 +759,7 @@ void scheduler_splittasks(struct scheduler *s) { /* Call the mapper on each current task. */ threadpool_map(s->threadpool, scheduler_splittasks_mapper, s->tasks, - s->nr_tasks, sizeof(struct task), 0, s); + s->nr_tasks, sizeof(struct task), 1000, s); } /** @@ -1174,7 +1174,7 @@ void scheduler_start(struct scheduler *s) { /* Re-wait the tasks. */ if (s->active_count > 1000) { threadpool_map(s->threadpool, scheduler_rewait_mapper, s->tid_active, - s->active_count, sizeof(int), 0, s); + s->active_count, sizeof(int), 1000, s); } else { scheduler_rewait_mapper(s->tid_active, s->active_count, s); } @@ -1250,7 +1250,7 @@ void scheduler_start(struct scheduler *s) { /* Loop over the tasks and enqueue whoever is ready. */ if (s->active_count > 1000) { threadpool_map(s->threadpool, scheduler_enqueue_mapper, s->tid_active, - s->active_count, sizeof(int), 0, s); + s->active_count, sizeof(int), 1000, s); } else { scheduler_enqueue_mapper(s->tid_active, s->active_count, s); } diff --git a/src/space.c b/src/space.c index 8ad571be38..23902a3750 100644 --- a/src/space.c +++ b/src/space.c @@ -378,7 +378,7 @@ void space_regrid(struct space *s, int verbose) { /* Free the old cells, if they were allocated. */ if (s->cells_top != NULL) { threadpool_map(&s->e->threadpool, space_rebuild_recycle_mapper, - s->cells_top, s->nr_cells, sizeof(struct cell), 0, s); + s->cells_top, s->nr_cells, sizeof(struct cell), 100, s); free(s->cells_top); free(s->multipoles_top); s->maxdepth = 0; @@ -491,7 +491,7 @@ void space_regrid(struct space *s, int verbose) { /* Free the old cells, if they were allocated. */ threadpool_map(&s->e->threadpool, space_rebuild_recycle_mapper, - s->cells_top, s->nr_cells, sizeof(struct cell), 0, s); + s->cells_top, s->nr_cells, sizeof(struct cell), 100, s); s->maxdepth = 0; } @@ -970,7 +970,7 @@ void space_split(struct space *s, struct cell *cells, int nr_cells, const ticks tic = getticks(); threadpool_map(&s->e->threadpool, space_split_mapper, cells, nr_cells, - sizeof(struct cell), 0, s); + sizeof(struct cell), 1, s); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), @@ -1004,7 +1004,7 @@ void space_sanitize(struct space *s) { if (s->e->nodeID == 0) message("Cleaning up unreasonable values of h"); threadpool_map(&s->e->threadpool, space_sanitize_mapper, s->cells_top, - s->nr_cells, sizeof(struct cell), 0, NULL); + s->nr_cells, sizeof(struct cell), 1, NULL); } /** @@ -1187,7 +1187,7 @@ void space_parts_get_cell_index(struct space *s, int *ind, struct cell *cells, data.ind = ind; threadpool_map(&s->e->threadpool, space_parts_get_cell_index_mapper, s->parts, - s->nr_parts, sizeof(struct part), 0, &data); + s->nr_parts, sizeof(struct part), 1000, &data); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), @@ -1214,7 +1214,7 @@ void space_gparts_get_cell_index(struct space *s, int *gind, struct cell *cells, data.ind = gind; threadpool_map(&s->e->threadpool, space_gparts_get_cell_index_mapper, - s->gparts, s->nr_gparts, sizeof(struct gpart), 0, &data); + s->gparts, s->nr_gparts, sizeof(struct gpart), 1000, &data); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), @@ -1241,7 +1241,7 @@ void space_sparts_get_cell_index(struct space *s, int *sind, struct cell *cells, data.ind = sind; threadpool_map(&s->e->threadpool, space_sparts_get_cell_index_mapper, - s->sparts, s->nr_sparts, sizeof(struct spart), 0, &data); + s->sparts, s->nr_sparts, sizeof(struct spart), 1000, &data); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), @@ -2501,7 +2501,7 @@ void space_synchronize_particle_positions(struct space *s) { (s->nr_gparts > 0 && s->nr_sparts > 0)) threadpool_map(&s->e->threadpool, space_synchronize_particle_positions_mapper, s->gparts, - s->nr_gparts, sizeof(struct gpart), 0, (void *)s); + s->nr_gparts, sizeof(struct gpart), 1000, (void *)s); } /** diff --git a/src/statistics.c b/src/statistics.c index 5a3f1ff4f9..57d60bcb1b 100644 --- a/src/statistics.c +++ b/src/statistics.c @@ -271,12 +271,12 @@ void stats_collect(const struct space *s, struct statistics *stats) { /* Run parallel collection of statistics for parts */ if (s->nr_parts > 0) threadpool_map(&s->e->threadpool, stats_collect_part_mapper, s->parts, - s->nr_parts, sizeof(struct part), 0, &extra_data); + s->nr_parts, sizeof(struct part), 10000, &extra_data); /* Run parallel collection of statistics for gparts */ if (s->nr_gparts > 0) threadpool_map(&s->e->threadpool, stats_collect_gpart_mapper, s->gparts, - s->nr_gparts, sizeof(struct gpart), 0, &extra_data); + s->nr_gparts, sizeof(struct gpart), 10000, &extra_data); } /** diff --git a/src/swift.h b/src/swift.h index 1d1a7c7d04..20397eb24d 100644 --- a/src/swift.h +++ b/src/swift.h @@ -57,7 +57,6 @@ #include "sourceterms.h" #include "space.h" #include "task.h" -#include "threadpool.h" #include "timeline.h" #include "timers.h" #include "tools.h" diff --git a/src/threadpool.c b/src/threadpool.c index 79bd586cd9..c11fd8121b 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -26,139 +26,13 @@ #include <math.h> #include <stdlib.h> #include <string.h> -#ifdef SWIFT_DEBUG_THREADPOOL -#include <dlfcn.h> -#endif /* This object's header. */ #include "threadpool.h" /* Local headers. */ #include "atomic.h" -#include "clocks.h" #include "error.h" -#include "minmax.h" - -#ifdef SWIFT_DEBUG_THREADPOOL -/** - * @breif Store a log entry of the given chunk. - */ -void threadpool_log(struct threadpool *tp, int tid, size_t chunk_size, - ticks tic, ticks toc) { - struct mapper_log *log = &tp->logs[tid > 0 ? tid : 0]; - - /* Check if we need to re-allocate the log buffer. */ - if (log->count == log->size) { - log->size *= 2; - struct mapper_log_entry *new_log; - if ((new_log = (struct mapper_log_entry *)malloc( - sizeof(struct mapper_log_entry) * log->size)) == NULL) - error("Failed to re-allocate mapper log."); - memcpy(new_log, log->log, sizeof(struct mapper_log_entry) * log->count); - free(log->log); - log->log = new_log; - } - - /* Store the new entry. */ - struct mapper_log_entry *entry = &log->log[log->count]; - entry->tid = tid; - entry->chunk_size = chunk_size; - entry->tic = tic; - entry->toc = toc; - entry->map_function = tp->map_function; - log->count++; -} - -void threadpool_dump_log(struct threadpool *tp, const char *filename, - int reset) { - - /* Open the output file. */ - FILE *fd; - if ((fd = fopen(filename, "w")) == NULL) - error("Failed to create log file '%s'.", filename); - - /* Create a buffer of function names. */ - const int max_names = 100; - struct name_entry { - threadpool_map_function map_function; - const char *name; - }; - struct name_entry names[max_names]; - bzero(names, sizeof(struct name_entry) * max_names); - - /* Write a header. */ - fprintf(fd, "# map_function thread_id chunk_size tic toc\n"); - fprintf(fd, "# {'num_threads': %i, 'cpufreq': %lli}\n", tp->num_threads, - clocks_get_cpufreq()); - - /* Loop over the per-tid logs and dump them. */ - for (int k = 0; k < tp->num_threads; k++) { - struct mapper_log *log = &tp->logs[k]; - - /* Loop over the log entries and dump them. */ - for (int i = 0; i < log->count; i++) { - - struct mapper_log_entry *entry = &log->log[i]; - - /* Look for the function pointer in the buffer. */ - int nid = 0; - while (nid < max_names && names[nid].map_function != entry->map_function) - nid++; - - /* If the name was not found, make a new entry. */ - if (nid == max_names) { - for (int j = 1; j < max_names; j++) names[j - 1] = names[j]; - names[0].map_function = entry->map_function; - Dl_info dl_info; - dladdr(entry->map_function, &dl_info); - names[0].name = dl_info.dli_sname; - nid = 0; - } - - /* Log a line to the file. */ - fprintf(fd, "%s %i %i %lli %lli\n", names[nid].name, entry->tid, - entry->chunk_size, entry->tic, entry->toc); - } - - /* Clear the log if requested. */ - if (reset) log->count = 0; - } - - /* Close the file. */ - fclose(fd); -} -#endif // SWIFT_DEBUG_THREADPOOL - -/** - * @brief Runner main loop, get a chunk and call the mapper function. - */ -void threadpool_chomp(struct threadpool *tp, int tid) { - - /* Loop until we can't get a chunk. */ - while (1) { - /* Desired chunk size. */ - size_t chunk_size = - (tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads); - if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk; - if (chunk_size < 1) chunk_size = 1; - - /* Get a chunk and check its size. */ - size_t task_ind = atomic_add(&tp->map_data_count, chunk_size); - if (task_ind >= tp->map_data_size) break; - if (task_ind + chunk_size > tp->map_data_size) - chunk_size = tp->map_data_size - task_ind; - -/* Call the mapper function. */ -#ifdef SWIFT_DEBUG_THREADPOOL - ticks tic = getticks(); -#endif - tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind), - chunk_size, tp->map_extra_data); -#ifdef SWIFT_DEBUG_THREADPOOL - threadpool_log(tp, tid, chunk_size, tic, getticks()); -#endif - } -} void *threadpool_runner(void *data) { @@ -169,13 +43,39 @@ void *threadpool_runner(void *data) { while (1) { /* Let the controller know that this thread is waiting. */ - pthread_barrier_wait(&tp->wait_barrier); + pthread_mutex_lock(&tp->thread_mutex); + tp->num_threads_waiting += 1; + if (tp->num_threads_waiting == tp->num_threads) { + pthread_cond_signal(&tp->control_cond); + } /* Wait for the controller. */ - pthread_barrier_wait(&tp->run_barrier); - - /* Do actual work. */ - threadpool_chomp(tp, atomic_inc(&tp->num_threads_running)); + pthread_cond_wait(&tp->thread_cond, &tp->thread_mutex); + tp->num_threads_waiting -= 1; + tp->num_threads_running += 1; + if (tp->num_threads_running == tp->num_threads) { + pthread_cond_signal(&tp->control_cond); + } + pthread_mutex_unlock(&tp->thread_mutex); + + /* The index of the mapping task we will work on next. */ + while (1) { + /* Desired chunk size. */ + size_t chunk_size = + (tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads); + if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk; + if (chunk_size < 1) chunk_size = 1; + + /* Get a chunk and check its size. */ + size_t task_ind = atomic_add(&tp->map_data_count, chunk_size); + if (task_ind >= tp->map_data_size) break; + if (task_ind + chunk_size > tp->map_data_size) + chunk_size = tp->map_data_size - task_ind; + + /* Call the mapper function. */ + tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind), + chunk_size, tp->map_extra_data); + } } } @@ -189,28 +89,18 @@ void threadpool_init(struct threadpool *tp, int num_threads) { /* Initialize the thread counters. */ tp->num_threads = num_threads; - -#ifdef SWIFT_DEBUG_THREADPOOL - if ((tp->logs = (struct mapper_log *)malloc(sizeof(struct mapper_log) * - num_threads)) == NULL) - error("Failed to allocate mapper logs."); - for (int k = 0; k < num_threads; k++) { - tp->logs[k].size = threadpool_log_initial_size; - tp->logs[k].count = 0; - if ((tp->logs[k].log = (struct mapper_log_entry *)malloc( - sizeof(struct mapper_log_entry) * tp->logs[k].size)) == NULL) - error("Failed to allocate mapper log."); - } -#endif + tp->num_threads_waiting = 0; /* If there is only a single thread, do nothing more as of here as we will just do work in the (blocked) calling thread. */ if (num_threads == 1) return; - /* Init the barriers. */ - if (pthread_barrier_init(&tp->wait_barrier, NULL, num_threads) != 0 || - pthread_barrier_init(&tp->run_barrier, NULL, num_threads) != 0) - error("Failed to initialize barriers."); + /* Init the threadpool mutexes. */ + if (pthread_mutex_init(&tp->thread_mutex, NULL) != 0) + error("Failed to initialize mutexex."); + if (pthread_cond_init(&tp->control_cond, NULL) != 0 || + pthread_cond_init(&tp->thread_cond, NULL) != 0) + error("Failed to initialize condition variables."); /* Set the task counter to zero. */ tp->map_data_size = 0; @@ -219,21 +109,24 @@ void threadpool_init(struct threadpool *tp, int num_threads) { tp->map_data_chunk = 0; tp->map_function = NULL; - /* Allocate the threads, one less than requested since the calling thread - works as well. */ - if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * - (num_threads - 1))) == NULL) { + /* Allocate the threads. */ + if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads)) == + NULL) { error("Failed to allocate thread array."); } /* Create and start the threads. */ - for (int k = 0; k < num_threads - 1; k++) { + pthread_mutex_lock(&tp->thread_mutex); + for (int k = 0; k < num_threads; k++) { if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0) error("Failed to create threadpool runner thread."); } /* Wait for all the threads to be up and running. */ - pthread_barrier_wait(&tp->wait_barrier); + while (tp->num_threads_waiting < tp->num_threads) { + pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); + } + pthread_mutex_unlock(&tp->thread_mutex); } /** @@ -247,8 +140,7 @@ void threadpool_init(struct threadpool *tp, int num_threads) { * @param map_data The data on which the mapping function will be called. * @param N Number of elements in @c map_data. * @param stride Size, in bytes, of each element of @c map_data. - * @param chunk Number of map data elements to pass to the function at a time, - * or zero to choose the number automatically. + * @param chunk Number of map data elements to pass to the function at a time. * @param extra_data Addtitional pointer that will be passed to the mapping * function, may contain additional data. */ @@ -256,65 +148,37 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function, void *map_data, size_t N, int stride, int chunk, void *extra_data) { -#ifdef SWIFT_DEBUG_THREADPOOL - ticks tic = getticks(); -#endif - /* If we just have a single thread, call the map function directly. */ if (tp->num_threads == 1) { map_function(map_data, N, extra_data); -#ifdef SWIFT_DEBUG_THREADPOOL - threadpool_log(tp, 0, N, tic, getticks()); -#endif return; } /* Set the map data and signal the threads. */ + pthread_mutex_lock(&tp->thread_mutex); tp->map_data_stride = stride; tp->map_data_size = N; tp->map_data_count = 0; - tp->map_data_chunk = - chunk ? chunk - : max((int)(N / (tp->num_threads * threadpool_default_chunk_ratio)), - 1); + tp->map_data_chunk = chunk; tp->map_function = map_function; tp->map_data = map_data; tp->map_extra_data = extra_data; tp->num_threads_running = 0; + pthread_cond_broadcast(&tp->thread_cond); /* Wait for all the threads to be up and running. */ - pthread_barrier_wait(&tp->run_barrier); - - /* Do some work while I'm at it. */ - threadpool_chomp(tp, tp->num_threads - 1); + while (tp->num_threads_running < tp->num_threads) { + pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); + } /* Wait for all threads to be done. */ - pthread_barrier_wait(&tp->wait_barrier); - -#ifdef SWIFT_DEBUG_THREADPOOL - /* Log the total call time to thread id -1. */ - threadpool_log(tp, -1, N, tic, getticks()); -#endif -} - -/** - * @brief Re-sets the log for this #threadpool. - */ -#ifdef SWIFT_DEBUG_THREADPOOL -void threadpool_reset_log(struct threadpool *tp) { - for (int k = 0; k < tp->num_threads; k++) tp->logs[k].count = 0; + while (tp->num_threads_waiting < tp->num_threads) { + pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); + } + pthread_mutex_unlock(&tp->thread_mutex); } -#endif /** * @brief Frees up the memory allocated for this #threadpool. */ -void threadpool_clean(struct threadpool *tp) { - free(tp->threads); -#ifdef SWIFT_DEBUG_THREADPOOL - for (int k = 0; k < tp->num_threads; k++) { - free(tp->logs[k].log); - } - free(tp->logs); -#endif -} +void threadpool_clean(struct threadpool *tp) { free(tp->threads); } diff --git a/src/threadpool.h b/src/threadpool.h index 019403f658..f9c7eeffb7 100644 --- a/src/threadpool.h +++ b/src/threadpool.h @@ -25,44 +25,10 @@ /* Some standard headers. */ #include <pthread.h> -/* Local includes. */ -#include "cycle.h" - -/* Local defines. */ -#define threadpool_log_initial_size 1000 -#define threadpool_default_chunk_ratio 7 - /* Function type for mappings. */ typedef void (*threadpool_map_function)(void *map_data, int num_elements, void *extra_data); -/* Data for threadpool logging. */ -struct mapper_log_entry { - - /* ID of the thread executing the chunk. */ - int tid; - - /* Size of the chunk processed. */ - int chunk_size; - - /* Pointer to the mapper function. */ - threadpool_map_function map_function; - - /*! Start and end time of this task */ - ticks tic, toc; -}; - -struct mapper_log { - /* Log of threadpool mapper calls. */ - struct mapper_log_entry *log; - - /* Size of the allocated log. */ - int size; - - /* Number of entries in the log. */ - int count; -}; - /* Data of a threadpool. */ struct threadpool { @@ -70,8 +36,8 @@ struct threadpool { pthread_t *threads; /* This is where threads go to rest. */ - pthread_barrier_t wait_barrier; - pthread_barrier_t run_barrier; + pthread_mutex_t thread_mutex; + pthread_cond_t control_cond, thread_cond; /* Current map data and count. */ void *map_data, *map_extra_data; @@ -83,11 +49,7 @@ struct threadpool { int num_threads; /* Counter for the number of threads that are done. */ - volatile int num_threads_running; - -#ifdef SWIFT_DEBUG_THREADPOOL - struct mapper_log *logs; -#endif + volatile int num_threads_waiting, num_threads_running; }; /* Function prototypes. */ @@ -96,10 +58,5 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function, void *map_data, size_t N, int stride, int chunk, void *extra_data); void threadpool_clean(struct threadpool *tp); -#ifdef SWIFT_DEBUG_THREADPOOL -void threadpool_reset_log(struct threadpool *tp); -void threadpool_dump_log(struct threadpool *tp, const char *filename, - int reset); -#endif #endif /* SWIFT_THREADPOOL_H */ diff --git a/tests/testThreadpool.c b/tests/testThreadpool.c index 2a9e98c5ca..aa65d533a2 100644 --- a/tests/testThreadpool.c +++ b/tests/testThreadpool.c @@ -23,7 +23,6 @@ #include <unistd.h> // Local includes. -#include "../config.h" #include "../src/atomic.h" #include "../src/threadpool.h" @@ -79,11 +78,6 @@ int main(int argc, char *argv[]) { threadpool_map(&tp, map_function_first, data, N, sizeof(int), 2, NULL); } -/* If logging was enabled, dump the log. */ -#ifdef SWIFT_DEBUG_THREADPOOL - threadpool_dump_log(&tp, "threadpool_log.txt", 1); -#endif - /* Be clean */ threadpool_clean(&tp); -- GitLab