diff --git a/README b/README index 3425a5952897b14ba0853ead66ce40ca89c24154..c088a94488133ddf53cd8a6eba45d8dcdebfeb72 100644 --- a/README +++ b/README @@ -38,6 +38,7 @@ Valid options are: 1: MPI-rank 0 writes, 2: All MPI-ranks write. -y {int} Time-step frequency at which task graphs are dumped. + -Y {int} Time-step frequency at which threadpool tasks are dumped. -h Print this help message and exit. See the file parameter_example.yml for an example of parameter file. diff --git a/configure.ac b/configure.ac index 91ee913c03d5559ed496137c5e3b9422f0808848..448224a0ad571f9d4251d136ff69ebb54cc9fb5b 100644 --- a/configure.ac +++ b/configure.ac @@ -189,6 +189,19 @@ if test "$enable_task_debugging" = "yes"; then AC_DEFINE([SWIFT_DEBUG_TASKS],1,[Enable task debugging]) fi +# Check if threadpool debugging is on. +AC_ARG_ENABLE([threadpool-debugging], + [AS_HELP_STRING([--enable-threadpool-debugging], + [Store threadpool mapper timing information and generate threadpool dump files @<:@yes/no@:>@] + )], + [enable_threadpool_debugging="$enableval"], + [enable_threadpool_debugging="no"] +) +if test "$enable_threadpool_debugging" = "yes"; then + AC_DEFINE([SWIFT_DEBUG_THREADPOOL],1,[Enable threadpool debugging]) + LDFLAGS="$LDFLAGS -rdynamic" +fi + # Check if the general timers are switched on. AC_ARG_ENABLE([timers], [AS_HELP_STRING([--enable-timers], @@ -897,9 +910,10 @@ AC_MSG_RESULT([ Multipole order : $with_multipole_order No gravity below ID : $no_gravity_below_id - Individual timers : $enable_timers - Task debugging : $enable_task_debugging - Debugging checks : $enable_debugging_checks - Gravity checks : $gravity_force_checks + Individual timers : $enable_timers + Task debugging : $enable_task_debugging + Threadpool debugging : $enable_threadpool_debugging + Debugging checks : $enable_debugging_checks + Gravity checks : $gravity_force_checks ------------------------]) diff --git a/examples/analyse_threadpool_tasks.py b/examples/analyse_threadpool_tasks.py new file mode 100755 index 0000000000000000000000000000000000000000..fda73366d2e49750ff951262ec4dbd021547cf5a --- /dev/null +++ b/examples/analyse_threadpool_tasks.py @@ -0,0 +1,273 @@ +#!/usr/bin/env python +""" +Usage: + analsyse_threadpool_tasks.py [options] input.dat + +where input.dat is a threadpool dump for a step. Use the '-Y interval' flag +of the swift command to create these. + +The output is an analysis of the threadpool task timings, including deadtime +per thread and step, total amount of time spent for each task type, for the +whole step and per thread and the minimum and maximum times spent per task +type. + +This file is part of SWIFT. +Copyright (c) 2017 Peter W. Draper (p.w.draper@durham.ac.uk) + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published +by the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. +""" + +import matplotlib +matplotlib.use("Agg") +import matplotlib.collections as collections +import matplotlib.ticker as plticker +import pylab as pl +import sys +import argparse + +# Handle the command line. +parser = argparse.ArgumentParser(description="Analyse task dumps") + +parser.add_argument("input", help="Threadpool data file (-y output)") +parser.add_argument("-v", "--verbose", dest="verbose", + help="Verbose output (default: False)", + default=False, action="store_true") + +args = parser.parse_args() +infile = args.input + +# Read header. First two lines. +with open(infile) as infid: + head = [next(infid) for x in xrange(2)] +header = head[1][2:].strip() +header = eval(header) +nthread = int(header['num_threads']) + 1 +CPU_CLOCK = float(header['cpufreq']) / 1000.0 +print "Number of threads: ", nthread - 1 +if args.verbose: + print "CPU frequency:", CPU_CLOCK * 1000.0 + +# Read input. +data = pl.genfromtxt(infile, dtype=None, delimiter=" ") + +# Mixed types, so need to separate. +tics = [] +tocs = [] +funcs = [] +threads = [] +chunks = [] +for i in data: + if i[0] != "#": + funcs.append(i[0]) + if i[1] < 0: + threads.append(nthread-1) + else: + threads.append(i[1]) + chunks.append(i[2]) + tics.append(i[3]) + tocs.append(i[4]) +tics = pl.array(tics) +tocs = pl.array(tocs) +funcs = pl.array(funcs) +threads = pl.array(threads) +chunks = pl.array(chunks) + +# Recover the start and end time +tic_step = min(tics) +toc_step = max(tocs) + +# Calculate the time range. +total_t = (toc_step - tic_step)/ CPU_CLOCK +print "# Data range: ", total_t, "ms" +print + +# Correct times to relative millisecs. +start_t = float(tic_step) +tics = (tics - start_t) / CPU_CLOCK +tocs = (tocs - start_t) / CPU_CLOCK + +tasks = {} +tasks[-1] = [] +for i in range(nthread): + tasks[i] = [] + +# Gather into by thread data. +for i in range(len(tics)): + tasks[threads[i]].append([tics[i],tocs[i],funcs[i]]) + +# Don't actually process the fake thread. +nthread = nthread - 1 + +# Sort by tic and gather used thread ids. +threadids = [] +for i in range(nthread): + if len(tasks[i]) > 0: + tasks[i] = sorted(tasks[i], key=lambda task: task[0]) + threadids.append(i) + +# Times per task. +print "# Task times:" +print "# -----------" +print "# {0:<31s}: {1:>7s} {2:>9s} {3:>9s} {4:>9s} {5:>9s} {6:>9s}"\ + .format("type/subtype", "count","minimum", "maximum", + "sum", "mean", "percent") +alltasktimes = {} +sidtimes = {} +for i in threadids: + tasktimes = {} + for task in tasks[i]: + key = task[2] + dt = task[1] - task[0] + if not key in tasktimes: + tasktimes[key] = [] + tasktimes[key].append(dt) + + if not key in alltasktimes: + alltasktimes[key] = [] + alltasktimes[key].append(dt) + + print "# Thread : ", i + for key in sorted(tasktimes.keys()): + taskmin = min(tasktimes[key]) + taskmax = max(tasktimes[key]) + tasksum = sum(tasktimes[key]) + print "{0:33s}: {1:7d} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.4f} {6:9.2f}"\ + .format(key, len(tasktimes[key]), taskmin, taskmax, tasksum, + tasksum / len(tasktimes[key]), tasksum / total_t * 100.0) + print + +print "# All threads : " +for key in sorted(alltasktimes.keys()): + taskmin = min(alltasktimes[key]) + taskmax = max(alltasktimes[key]) + tasksum = sum(alltasktimes[key]) + print "{0:33s}: {1:7d} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.4f} {6:9.2f}"\ + .format(key, len(alltasktimes[key]), taskmin, taskmax, tasksum, + tasksum / len(alltasktimes[key]), + tasksum / (len(threadids) * total_t) * 100.0) +print + +# Dead times. +print "# Times not in tasks (deadtimes)" +print "# ------------------------------" +print "# Time before first task:" +print "# no. : {0:>9s} {1:>9s}".format("value", "percent") +predeadtimes = [] +for i in threadids: + predeadtime = tasks[i][0][0] + print "thread {0:2d}: {1:9.4f} {2:9.4f}"\ + .format(i, predeadtime, predeadtime / total_t * 100.0) + predeadtimes.append(predeadtime) + +predeadmin = min(predeadtimes) +predeadmax = max(predeadtimes) +predeadsum = sum(predeadtimes) +print "# : {0:>9s} {1:>9s} {2:>9s} {3:>9s} {4:>9s} {5:>9s}"\ + .format("count", "minimum", "maximum", "sum", "mean", "percent") +print "all : {0:9d} {1:9.4f} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.2f}"\ + .format(len(predeadtimes), predeadmin, predeadmax, predeadsum, + predeadsum / len(predeadtimes), + predeadsum / (len(threadids) * total_t ) * 100.0) +print + +print "# Time after last task:" +print "# no. : {0:>9s} {1:>9s}".format("value", "percent") +postdeadtimes = [] +for i in threadids: + postdeadtime = total_t - tasks[i][-1][1] + print "thread {0:2d}: {1:9.4f} {2:9.4f}"\ + .format(i, postdeadtime, postdeadtime / total_t * 100.0) + postdeadtimes.append(postdeadtime) + +postdeadmin = min(postdeadtimes) +postdeadmax = max(postdeadtimes) +postdeadsum = sum(postdeadtimes) +print "# : {0:>9s} {1:>9s} {2:>9s} {3:>9s} {4:>9s} {5:>9s}"\ + .format("count", "minimum", "maximum", "sum", "mean", "percent") +print "all : {0:9d} {1:9.4f} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.2f}"\ + .format(len(postdeadtimes), postdeadmin, postdeadmax, postdeadsum, + postdeadsum / len(postdeadtimes), + postdeadsum / (len(threadids) * total_t ) * 100.0) +print + +# Time in threadpool, i.e. from first to last tasks. +print "# Time between tasks (threadpool deadtime):" +print "# no. : {0:>9s} {1:>9s} {2:>9s} {3:>9s} {4:>9s} {5:>9s}"\ + .format("count", "minimum", "maximum", "sum", "mean", "percent") +threadpooldeadtimes = [] +for i in threadids: + deadtimes = [] + last = tasks[i][0][0] + for task in tasks[i]: + dt = task[0] - last + deadtimes.append(dt) + last = task[1] + + # Drop first value, last value already gone. + if len(deadtimes) > 1: + deadtimes = deadtimes[1:] + else: + # Only one task, so no deadtime by definition. + deadtimes = [0.0] + + deadmin = min(deadtimes) + deadmax = max(deadtimes) + deadsum = sum(deadtimes) + print "thread {0:2d}: {1:9d} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.4f} {6:9.2f}"\ + .format(i, len(deadtimes), deadmin, deadmax, deadsum, + deadsum / len(deadtimes), deadsum / total_t * 100.0) + threadpooldeadtimes.extend(deadtimes) + +deadmin = min(threadpooldeadtimes) +deadmax = max(threadpooldeadtimes) +deadsum = sum(threadpooldeadtimes) +print "all : {0:9d} {1:9.4f} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.2f}"\ + .format(len(threadpooldeadtimes), deadmin, deadmax, deadsum, + deadsum / len(threadpooldeadtimes), + deadsum / (len(threadids) * total_t ) * 100.0) +print + +# All times in step. +print "# All deadtimes:" +print "# no. : {0:>9s} {1:>9s} {2:>9s} {3:>9s} {4:>9s} {5:>9s}"\ + .format("count", "minimum", "maximum", "sum", "mean", "percent") +alldeadtimes = [] +for i in threadids: + deadtimes = [] + last = 0 + for task in tasks[i]: + dt = task[0] - last + deadtimes.append(dt) + last = task[1] + dt = total_t - last + deadtimes.append(dt) + + deadmin = min(deadtimes) + deadmax = max(deadtimes) + deadsum = sum(deadtimes) + print "thread {0:2d}: {1:9d} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.4f} {6:9.2f}"\ + .format(i, len(deadtimes), deadmin, deadmax, deadsum, + deadsum / len(deadtimes), deadsum / total_t * 100.0) + alldeadtimes.extend(deadtimes) + +deadmin = min(alldeadtimes) +deadmax = max(alldeadtimes) +deadsum = sum(alldeadtimes) +print "all : {0:9d} {1:9.4f} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.2f}"\ + .format(len(alldeadtimes), deadmin, deadmax, deadsum, + deadsum / len(alldeadtimes), + deadsum / (len(threadids) * total_t ) * 100.0) +print + +sys.exit(0) diff --git a/examples/main.c b/examples/main.c index 583f05af82514f7e12e4d61d1d886082df9ddcf5..89b1317cf855c91c61f8b22dc29a7412a75ef7a0 100644 --- a/examples/main.c +++ b/examples/main.c @@ -103,6 +103,8 @@ void print_help_message() { printf(" %2s %14s %s\n", "", "", "2: All MPI-ranks write."); printf(" %2s %14s %s\n", "-y", "{int}", "Time-step frequency at which task graphs are dumped."); + printf(" %2s %14s %s\n", "-Y", "{int}", + "Time-step frequency at which threadpool tasks are dumped."); printf(" %2s %14s %s\n", "-h", "", "Print this help message and exit."); printf( "\nSee the file parameter_example.yml for an example of " @@ -153,7 +155,8 @@ int main(int argc, char *argv[]) { /* Let's pin the main thread */ #if defined(HAVE_SETAFFINITY) && defined(HAVE_LIBNUMA) && defined(_GNU_SOURCE) - if (((ENGINE_POLICY)&engine_policy_setaffinity) == engine_policy_setaffinity) + if (((ENGINE_POLICY) & engine_policy_setaffinity) == + engine_policy_setaffinity) engine_pin(); #endif @@ -163,6 +166,7 @@ int main(int argc, char *argv[]) { int with_aff = 0; int dry_run = 0; int dump_tasks = 0; + int dump_threadpool = 0; int nsteps = -2; int with_cosmology = 0; int with_external_gravity = 0; @@ -184,7 +188,7 @@ int main(int argc, char *argv[]) { /* Parse the parameters */ int c; - while ((c = getopt(argc, argv, "acCdDef:FgGhMn:P:sSt:Tv:y:")) != -1) + while ((c = getopt(argc, argv, "acCdDef:FgGhMn:P:sSt:Tv:y:Y:")) != -1) switch (c) { case 'a': with_aff = 1; @@ -273,6 +277,21 @@ int main(int argc, char *argv[]) { "Task dumping is only possible if SWIFT was configured with the " "--enable-task-debugging option."); } +#endif + break; + case 'Y': + if (sscanf(optarg, "%d", &dump_threadpool) != 1) { + if (myrank == 0) printf("Error parsing dump_threadpool (-Y). \n"); + if (myrank == 0) print_help_message(); + return 1; + } +#ifndef SWIFT_DEBUG_THREADPOOL + if (dump_threadpool) { + error( + "Threadpool dumping is only possible if SWIFT was configured " + "with the " + "--enable-threadpool-debugging option."); + } #endif break; case '?': @@ -753,6 +772,22 @@ int main(int argc, char *argv[]) { #endif // WITH_MPI } #endif // SWIFT_DEBUG_TASKS + +#ifdef SWIFT_DEBUG_THREADPOOL + /* Dump the task data using the given frequency. */ + if (dump_threadpool && (dump_threadpool == 1 || j % dump_threadpool == 1)) { + char dumpfile[40]; +#ifdef WITH_MPI + snprintf(dumpfile, 30, "threadpool_info-rank%d-step%d.dat", engine_rank, + j + 1); +#else + snprintf(dumpfile, 30, "threadpool_info-step%d.dat", j + 1); +#endif // WITH_MPI + threadpool_dump_log(&e.threadpool, dumpfile, 1); + } else { + threadpool_reset_log(&e.threadpool); + } +#endif // SWIFT_DEBUG_THREADPOOL } /* Print the values of the runner histogram. */ diff --git a/examples/plot_threadpool.py b/examples/plot_threadpool.py new file mode 100755 index 0000000000000000000000000000000000000000..68b038ed406fdac8c4fee3e250fbb397bb51ae21 --- /dev/null +++ b/examples/plot_threadpool.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python +""" +Usage: + plot_threadpool.py [options] input.dat output.png + +where input.dat is a threadpool info file for a step. Use the '-Y interval' +flag of the swift command to create these. The output plot will be called +'output.png'. The --limit option can be used to produce plots with the same +time span and the --expand option to expand each thread line into '*expand' +lines, so that adjacent tasks of the same type can be distinguished. Other +options can be seen using the --help flag. + +This file is part of SWIFT. +Copyright (c) 2015 Pedro Gonnet (pedro.gonnet@durham.ac.uk), + Bert Vandenbroucke (bert.vandenbroucke@ugent.be) + Matthieu Schaller (matthieu.schaller@durham.ac.uk) + (c) 2017 Peter W. Draper (p.w.draper@durham.ac.uk) + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published +by the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. +""" + +import matplotlib +matplotlib.use("Agg") +import matplotlib.collections as collections +import matplotlib.ticker as plticker +import pylab as pl +import sys +import argparse + +# Handle the command line. +parser = argparse.ArgumentParser(description="Plot threadpool function graphs") + +parser.add_argument("input", help="Threadpool data file (-Y output)") +parser.add_argument("outpng", help="Name for output graphic file (PNG)") +parser.add_argument("-l", "--limit", dest="limit", + help="Upper time limit in millisecs (def: depends on data)", + default=0, type=int) +parser.add_argument("-e", "--expand", dest="expand", + help="Thread expansion factor (def: 1)", + default=1, type=int) +parser.add_argument("--height", dest="height", + help="Height of plot in inches (def: 4)", + default=4., type=float) +parser.add_argument("--width", dest="width", + help="Width of plot in inches (def: 16)", + default=16., type=float) +parser.add_argument("--nolegend", dest="nolegend", + help="Whether to show the legend (def: False)", + default=False, action="store_true") +parser.add_argument("-v", "--verbose", dest="verbose", + help="Show colour assignments and other details (def: False)", + default=False, action="store_true") + +args = parser.parse_args() +infile = args.input +outpng = args.outpng +delta_t = args.limit +expand = args.expand + +# Basic plot configuration. +PLOT_PARAMS = {"axes.labelsize": 10, + "axes.titlesize": 10, + "font.size": 12, + "legend.fontsize": 12, + "xtick.labelsize": 10, + "ytick.labelsize": 10, + "figure.figsize" : (args.width, args.height), + "figure.subplot.left" : 0.03, + "figure.subplot.right" : 0.995, + "figure.subplot.bottom" : 0.09, + "figure.subplot.top" : 0.99, + "figure.subplot.wspace" : 0., + "figure.subplot.hspace" : 0., + "lines.markersize" : 6, + "lines.linewidth" : 3. + } +pl.rcParams.update(PLOT_PARAMS) + +# A number of colours for the various types. Recycled when there are +# more task types than colours... +colours = ["cyan", "lightgray", "darkblue", "yellow", "tan", "dodgerblue", + "sienna", "aquamarine", "bisque", "blue", "green", "lightgreen", + "brown", "purple", "moccasin", "olivedrab", "chartreuse", + "darksage", "darkgreen", "green", "mediumseagreen", + "mediumaquamarine", "darkslategrey", "mediumturquoise", + "black", "cadetblue", "skyblue", "red", "slategray", "gold", + "slateblue", "blueviolet", "mediumorchid", "firebrick", + "magenta", "hotpink", "pink", "orange", "lightgreen"] +maxcolours = len(colours) + +# Read header. First two lines. +with open(infile) as infid: + head = [next(infid) for x in xrange(2)] +header = head[1][2:].strip() +header = eval(header) +nthread = int(header['num_threads']) + 1 +CPU_CLOCK = float(header['cpufreq']) / 1000.0 +print "Number of threads: ", nthread +if args.verbose: + print "CPU frequency:", CPU_CLOCK * 1000.0 + +# Read input. +data = pl.genfromtxt(infile, dtype=None, delimiter=" ") + +# Mixed types, so need to separate. +tics = [] +tocs = [] +funcs = [] +threads = [] +chunks = [] +for i in data: + if i[0] != "#": + funcs.append(i[0]) + if i[1] < 0: + threads.append(nthread-1) + else: + threads.append(i[1]) + chunks.append(i[2]) + tics.append(i[3]) + tocs.append(i[4]) +tics = pl.array(tics) +tocs = pl.array(tocs) +funcs = pl.array(funcs) +threads = pl.array(threads) +chunks = pl.array(chunks) + + +# Recover the start and end time +tic_step = min(tics) +toc_step = max(tocs) + +# Not known. + +# Calculate the time range, if not given. +delta_t = delta_t * CPU_CLOCK +if delta_t == 0: + dt = toc_step - tic_step + if dt > delta_t: + delta_t = dt + print "Data range: ", delta_t / CPU_CLOCK, "ms" + +# Once more doing the real gather and plots this time. +start_t = float(tic_step) +tics -= tic_step +tocs -= tic_step +end_t = (toc_step - start_t) / CPU_CLOCK + +# Get all "task" names and assign colours. +TASKTYPES = pl.unique(funcs) +print TASKTYPES + +# Set colours of task/subtype. +TASKCOLOURS = {} +ncolours = 0 +for task in TASKTYPES: + TASKCOLOURS[task] = colours[ncolours] + ncolours = (ncolours + 1) % maxcolours + +# For fiddling with colours... +if args.verbose: + print "#Selected colours:" + for task in sorted(TASKCOLOURS.keys()): + print "# " + task + ": " + TASKCOLOURS[task] + for task in sorted(SUBCOLOURS.keys()): + print "# " + task + ": " + SUBCOLOURS[task] + +tasks = {} +tasks[-1] = [] +for i in range(nthread*expand): + tasks[i] = [] + +# Counters for each thread when expanding. +ecounter = [] +for i in range(nthread): + ecounter.append(0) + +for i in range(len(threads)): + thread = threads[i] + + # Expand to cover extra lines if expanding. + ethread = thread * expand + (ecounter[thread] % expand) + ecounter[thread] = ecounter[thread] + 1 + thread = ethread + + tasks[thread].append({}) + tasks[thread][-1]["type"] = funcs[i] + tic = tics[i] / CPU_CLOCK + toc = tocs[i] / CPU_CLOCK + tasks[thread][-1]["tic"] = tic + tasks[thread][-1]["toc"] = toc + tasks[thread][-1]["colour"] = TASKCOLOURS[funcs[i]] + +# Use expanded threads from now on. +nthread = nthread * expand + +typesseen = [] +fig = pl.figure() +ax = fig.add_subplot(1,1,1) +ax.set_xlim(-delta_t * 0.01 / CPU_CLOCK, delta_t * 1.01 / CPU_CLOCK) +ax.set_ylim(0, nthread) + +# Fake thread is used to colour the whole range, do that first. +tictocs = [] +colours = [] +j = 0 +for task in tasks[nthread - expand]: + tictocs.append((task["tic"], task["toc"] - task["tic"])) + colours.append(task["colour"]) +ax.broken_barh(tictocs, [0,(nthread-1)], facecolors = colours, linewidth=0, alpha=0.15) + +# And we don't plot the fake thread. +nthread = nthread - expand +for i in range(nthread): + + # Collect ranges and colours into arrays. + tictocs = [] + colours = [] + j = 0 + for task in tasks[i]: + tictocs.append((task["tic"], task["toc"] - task["tic"])) + colours.append(task["colour"]) + + # Legend support, collections don't add to this. + qtask = task["type"] + if qtask not in typesseen: + pl.plot([], [], color=task["colour"], label=qtask) + typesseen.append(qtask) + + # Now plot. + ax.broken_barh(tictocs, [i+0.05,0.90], facecolors = colours, linewidth=0) + +# Legend and room for it. +nrow = len(typesseen) / 5 +if not args.nolegend: + ax.fill_between([0, 0], nthread+0.5, nthread + nrow + 0.5, facecolor="white") + ax.set_ylim(0, nthread + 0.5) + ax.legend(loc=1, shadow=True, bbox_to_anchor=(0., 1.05 ,1., 0.2), mode="expand", ncol=5) + box = ax.get_position() + ax.set_position([box.x0, box.y0, box.width, box.height*0.8]) + +# Start and end of time-step +ax.plot([0, 0], [0, nthread + nrow + 1], 'k--', linewidth=1) +ax.plot([end_t, end_t], [0, nthread + nrow + 1], 'k--', linewidth=1) + +ax.set_xlabel("Wall clock time [ms]", labelpad=0.) +if expand == 1: + ax.set_ylabel("Thread ID", labelpad=0 ) +else: + ax.set_ylabel("Thread ID * " + str(expand), labelpad=0 ) +ax.set_yticks(pl.array(range(nthread)), True) + +loc = plticker.MultipleLocator(base=expand) +ax.yaxis.set_major_locator(loc) +ax.grid(True, which='major', axis="y", linestyle="-") + +pl.show() +pl.savefig(outpng) +print "Graphics done, output written to", outpng + +sys.exit(0) diff --git a/examples/process_plot_threadpool b/examples/process_plot_threadpool new file mode 100755 index 0000000000000000000000000000000000000000..343c1559ee37d6714ac32e5305457eddbb7e6414 --- /dev/null +++ b/examples/process_plot_threadpool @@ -0,0 +1,108 @@ +#!/bin/bash +# +# Usage: +# process_plot_threadpool nprocess [time-range-ms] +# +# Description: +# Process all the threadpool info files in the current directory +# creating function graphs for steps and threads. +# +# The input files are created by a run using the "-Y interval" flag and +# should be named "threadpool_info-step<n>.dat" in the current directory. +# All located files will be processed using "nprocess" concurrent +# processes and all plots will have the same time range if one is given. +# An output HTML file "index.html" will be created to view all the plots. +# +# +# This file is part of SWIFT: +# +# Copyright (C) 2017 Peter W. Draper (p.w.draper@durham.ac.uk) +# All Rights Reserved. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# Handle command-line +if test "$1" = ""; then + echo "Usage: $0 nprocess [time-range-ms]" + exit 1 +fi +NPROCS=$1 +TIMERANGE=0 +LIMIT="(autoranged)" +if test "$2" != ""; then + TIMERANGE=$2 + LIMIT="" +fi + +# Find all thread info files. Use version sort to get into correct order. +files=$(ls -v threadpool_info-step*.dat) +if test $? != 0; then + echo "Failed to find any threadpool info files" + exit 1 +fi + +# Construct list of names, the step no and names for the graphics. +list="" +for f in $files; do + s=$(echo $f| sed 's,threadpool_info-step\(.*\).dat,\1,') + list="$list $f $s poolstep${s}r" +done + +# And process them, +echo "Processing threadpool info files..." +echo $list | xargs -P $NPROCS -n 3 /bin/bash -c "./plot_threadpool.py --expand 1 --limit $TIMERANGE --width 16 --height 4 \$0 \$2 " +echo $list | xargs -P $NPROCS -n 3 /bin/bash -c "./analyse_threadpool_tasks.py \$0 > \$2.stats" + +echo "Writing output threadpool-index.html file" +# Construct document - serial. +cat <<EOF > threadpool-index.html + <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> +<html> + <head> + <title>SWIFT threadpool tasks $LIMIT</title> + </head> + <body> + <h1>SWIFT threadpool tasks $LIMIT</h1> +EOF + +echo $list | xargs -n 3 | while read f s g; do + cat <<EOF >> threadpool-index.html +<h2>Step $s</h2> +EOF + cat <<EOF >> threadpool-index.html +<a href="poolstep${s}r${i}.html"><img src="poolstep${s}r${i}.png" width=400px/></a> +EOF + cat <<EOF > poolstep${s}r${i}.html + <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> +<html> +<body> +<img src="poolstep${s}r${i}.png"> +<pre> +EOF +cat poolstep${s}r${i}.stats >> poolstep${s}r${i}.html +cat <<EOF >> poolstep${s}r${i}.html +</body> +</html> +EOF + +done + +cat <<EOF >> threadpool-index.html + </body> +</html> +EOF + +echo "Finished" + +exit diff --git a/src/engine.c b/src/engine.c index ee48d6565dc62f725b0159dc44e8d9d92d7a4adf..89bb6c693e63b2705e1b8a9371713edc0945acb1 100644 --- a/src/engine.c +++ b/src/engine.c @@ -909,6 +909,9 @@ void engine_repartition(struct engine *e) { #else if (e->reparttype->type != REPART_NONE) error("SWIFT was not compiled with MPI and METIS support."); + + /* Clear the repartition flag. */ + e->forcerepart = 0; #endif } @@ -923,8 +926,9 @@ void engine_repartition_trigger(struct engine *e) { /* Do nothing if there have not been enough steps since the last * repartition, don't want to repeat this too often or immediately after - * a repartition step. */ - if (e->step - e->last_repartition >= 2) { + * a repartition step. Also nothing to do when requested. */ + if (e->step - e->last_repartition >= 2 && + e->reparttype->type != REPART_NONE) { /* Old style if trigger is >1 or this is the second step (want an early * repartition following the initial repartition). */ @@ -985,8 +989,9 @@ void engine_repartition_trigger(struct engine *e) { if (e->forcerepart) e->last_repartition = e->step; } - /* We always reset CPU time for next check. */ - e->cputime_last_step = clocks_get_cputime_used(); + /* We always reset CPU time for next check, unless it will not be used. */ + if (e->reparttype->type != REPART_NONE) + e->cputime_last_step = clocks_get_cputime_used(); #endif } @@ -2402,21 +2407,6 @@ void engine_make_gravityrecursive_tasks(struct engine *e) { /* } */ } -void engine_check_sort_tasks(struct engine *e, struct cell *c) { - - /* Find the parent sort task, if any, and copy its flags. */ - if (c->sorts != NULL) { - struct cell *parent = c->parent; - while (parent != NULL && parent->sorts == NULL) parent = parent->parent; - if (parent != NULL) c->sorts->flags |= parent->sorts->flags; - } - - /* Recurse? */ - if (c->split) - for (int k = 0; k < 8; k++) - if (c->progeny[k] != NULL) engine_check_sort_tasks(e, c->progeny[k]); -} - /** * @brief Fill the #space's task list. * @@ -2495,9 +2485,6 @@ void engine_maketasks(struct engine *e) { for (int k = 0; k < nr_cells; k++) engine_make_hierarchical_tasks(e, &cells[k]); - /* Append hierarchical tasks to each cell. */ - for (int k = 0; k < nr_cells; k++) engine_check_sort_tasks(e, &cells[k]); - /* Run through the tasks and make force tasks for each density task. Each force task depends on the cell ghosts and unlocks the kick task of its super-cell. */ @@ -2792,7 +2779,7 @@ int engine_marktasks(struct engine *e) { /* Run through the tasks and mark as skip or not. */ size_t extra_data[3] = {(size_t)e, rebuild_space, (size_t)&e->sched}; threadpool_map(&e->threadpool, engine_marktasks_mapper, s->tasks, s->nr_tasks, - sizeof(struct task), 10000, extra_data); + sizeof(struct task), 0, extra_data); rebuild_space = extra_data[1]; if (e->verbose) @@ -3696,7 +3683,7 @@ void engine_drift_all(struct engine *e) { #endif threadpool_map(&e->threadpool, engine_do_drift_all_mapper, e->s->cells_top, - e->s->nr_cells, sizeof(struct cell), 1, e); + e->s->nr_cells, sizeof(struct cell), 0, e); /* Synchronize particle positions */ space_synchronize_particle_positions(e->s); @@ -3748,7 +3735,7 @@ void engine_drift_top_multipoles(struct engine *e) { const ticks tic = getticks(); threadpool_map(&e->threadpool, engine_do_drift_top_multipoles_mapper, - e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 10, e); + e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 0, e); #ifdef SWIFT_DEBUG_CHECKS /* Check that all cells have been drifted to the current time. */ @@ -3786,7 +3773,7 @@ void engine_reconstruct_multipoles(struct engine *e) { const ticks tic = getticks(); threadpool_map(&e->threadpool, engine_do_reconstruct_multipoles_mapper, - e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 10, e); + e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 0, e); if (e->verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), diff --git a/src/engine.h b/src/engine.h index c7aaa08b57eb2e61b311deae7a7ccb102f7e3cf8..79ff45a69a39ee7d4d45589df51ce2d53f810fda 100644 --- a/src/engine.h +++ b/src/engine.h @@ -77,7 +77,7 @@ extern const char *engine_policy_names[]; #define engine_queue_scale 1.2 #define engine_maxtaskspercell 96 #define engine_maxproxies 64 -#define engine_tasksreweight 10 +#define engine_tasksreweight 1 #define engine_parts_size_grow 1.05 #define engine_redistribute_alloc_margin 1.2 #define engine_default_energy_file_name "energy" diff --git a/src/gravity.c b/src/gravity.c index 97b2955b32e1513c3d86d1d1f4da2169130feb77..49bbaca39b5278009543204a0d9f5e72d69806c4 100644 --- a/src/gravity.c +++ b/src/gravity.c @@ -207,7 +207,7 @@ void gravity_exact_force_compute(struct space *s, const struct engine *e) { data.const_G = e->physical_constants->const_newton_G; threadpool_map(&s->e->threadpool, gravity_exact_force_compute_mapper, - s->gparts, s->nr_gparts, sizeof(struct gpart), 1000, &data); + s->gparts, s->nr_gparts, sizeof(struct gpart), 0, &data); message("Computed exact gravity for %d gparts (took %.3f %s). ", data.counter_global, clocks_from_ticks(getticks() - tic), diff --git a/src/queue.h b/src/queue.h index 951a3e5a056d7ad0c3935f98341a0d93c805e3ad..c85cf0cabe30a03d163e2564fdc216c19495761a 100644 --- a/src/queue.h +++ b/src/queue.h @@ -29,7 +29,7 @@ #define queue_sizeinit 100 #define queue_sizegrow 2 #define queue_search_window 8 -#define queue_incoming_size 1024 +#define queue_incoming_size 10240 #define queue_struct_align 64 /* Counters. */ diff --git a/src/scheduler.c b/src/scheduler.c index 161be418046636c473362f061d4afa1c64ace04a..6b60419cd579e3dbec28fc8d9345adf376ac85d5 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -759,7 +759,7 @@ void scheduler_splittasks(struct scheduler *s) { /* Call the mapper on each current task. */ threadpool_map(s->threadpool, scheduler_splittasks_mapper, s->tasks, - s->nr_tasks, sizeof(struct task), 1000, s); + s->nr_tasks, sizeof(struct task), 0, s); } /** @@ -1174,7 +1174,7 @@ void scheduler_start(struct scheduler *s) { /* Re-wait the tasks. */ if (s->active_count > 1000) { threadpool_map(s->threadpool, scheduler_rewait_mapper, s->tid_active, - s->active_count, sizeof(int), 1000, s); + s->active_count, sizeof(int), 0, s); } else { scheduler_rewait_mapper(s->tid_active, s->active_count, s); } @@ -1250,7 +1250,7 @@ void scheduler_start(struct scheduler *s) { /* Loop over the tasks and enqueue whoever is ready. */ if (s->active_count > 1000) { threadpool_map(s->threadpool, scheduler_enqueue_mapper, s->tid_active, - s->active_count, sizeof(int), 1000, s); + s->active_count, sizeof(int), 0, s); } else { scheduler_enqueue_mapper(s->tid_active, s->active_count, s); } diff --git a/src/space.c b/src/space.c index 23902a37501c7b13992f5423a3f002d526ba2c27..8ad571be3800fbeebd280dfd09b9ee29158bfdf8 100644 --- a/src/space.c +++ b/src/space.c @@ -378,7 +378,7 @@ void space_regrid(struct space *s, int verbose) { /* Free the old cells, if they were allocated. */ if (s->cells_top != NULL) { threadpool_map(&s->e->threadpool, space_rebuild_recycle_mapper, - s->cells_top, s->nr_cells, sizeof(struct cell), 100, s); + s->cells_top, s->nr_cells, sizeof(struct cell), 0, s); free(s->cells_top); free(s->multipoles_top); s->maxdepth = 0; @@ -491,7 +491,7 @@ void space_regrid(struct space *s, int verbose) { /* Free the old cells, if they were allocated. */ threadpool_map(&s->e->threadpool, space_rebuild_recycle_mapper, - s->cells_top, s->nr_cells, sizeof(struct cell), 100, s); + s->cells_top, s->nr_cells, sizeof(struct cell), 0, s); s->maxdepth = 0; } @@ -970,7 +970,7 @@ void space_split(struct space *s, struct cell *cells, int nr_cells, const ticks tic = getticks(); threadpool_map(&s->e->threadpool, space_split_mapper, cells, nr_cells, - sizeof(struct cell), 1, s); + sizeof(struct cell), 0, s); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), @@ -1004,7 +1004,7 @@ void space_sanitize(struct space *s) { if (s->e->nodeID == 0) message("Cleaning up unreasonable values of h"); threadpool_map(&s->e->threadpool, space_sanitize_mapper, s->cells_top, - s->nr_cells, sizeof(struct cell), 1, NULL); + s->nr_cells, sizeof(struct cell), 0, NULL); } /** @@ -1187,7 +1187,7 @@ void space_parts_get_cell_index(struct space *s, int *ind, struct cell *cells, data.ind = ind; threadpool_map(&s->e->threadpool, space_parts_get_cell_index_mapper, s->parts, - s->nr_parts, sizeof(struct part), 1000, &data); + s->nr_parts, sizeof(struct part), 0, &data); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), @@ -1214,7 +1214,7 @@ void space_gparts_get_cell_index(struct space *s, int *gind, struct cell *cells, data.ind = gind; threadpool_map(&s->e->threadpool, space_gparts_get_cell_index_mapper, - s->gparts, s->nr_gparts, sizeof(struct gpart), 1000, &data); + s->gparts, s->nr_gparts, sizeof(struct gpart), 0, &data); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), @@ -1241,7 +1241,7 @@ void space_sparts_get_cell_index(struct space *s, int *sind, struct cell *cells, data.ind = sind; threadpool_map(&s->e->threadpool, space_sparts_get_cell_index_mapper, - s->sparts, s->nr_sparts, sizeof(struct spart), 1000, &data); + s->sparts, s->nr_sparts, sizeof(struct spart), 0, &data); if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), @@ -2501,7 +2501,7 @@ void space_synchronize_particle_positions(struct space *s) { (s->nr_gparts > 0 && s->nr_sparts > 0)) threadpool_map(&s->e->threadpool, space_synchronize_particle_positions_mapper, s->gparts, - s->nr_gparts, sizeof(struct gpart), 1000, (void *)s); + s->nr_gparts, sizeof(struct gpart), 0, (void *)s); } /** diff --git a/src/statistics.c b/src/statistics.c index 57d60bcb1b247c9616c859b7ac8a475acdcd878f..5a3f1ff4f9a2232a14817e7e55fd2cff5bdcd80e 100644 --- a/src/statistics.c +++ b/src/statistics.c @@ -271,12 +271,12 @@ void stats_collect(const struct space *s, struct statistics *stats) { /* Run parallel collection of statistics for parts */ if (s->nr_parts > 0) threadpool_map(&s->e->threadpool, stats_collect_part_mapper, s->parts, - s->nr_parts, sizeof(struct part), 10000, &extra_data); + s->nr_parts, sizeof(struct part), 0, &extra_data); /* Run parallel collection of statistics for gparts */ if (s->nr_gparts > 0) threadpool_map(&s->e->threadpool, stats_collect_gpart_mapper, s->gparts, - s->nr_gparts, sizeof(struct gpart), 10000, &extra_data); + s->nr_gparts, sizeof(struct gpart), 0, &extra_data); } /** diff --git a/src/swift.h b/src/swift.h index 20397eb24df478cba65a0e35d686b402f1d8ee70..1d1a7c7d04b3662c524504c292aa7d9eee2c3d09 100644 --- a/src/swift.h +++ b/src/swift.h @@ -57,6 +57,7 @@ #include "sourceterms.h" #include "space.h" #include "task.h" +#include "threadpool.h" #include "timeline.h" #include "timers.h" #include "tools.h" diff --git a/src/threadpool.c b/src/threadpool.c index c11fd8121bb02f36fce1796d79a7eb55a38102c4..f6476e2a929babae697c6ef03149c39252f6f015 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -26,13 +26,139 @@ #include <math.h> #include <stdlib.h> #include <string.h> +#ifdef SWIFT_DEBUG_THREADPOOL +#include <dlfcn.h> +#endif /* This object's header. */ #include "threadpool.h" /* Local headers. */ #include "atomic.h" +#include "clocks.h" #include "error.h" +#include "minmax.h" + +#ifdef SWIFT_DEBUG_THREADPOOL +/** + * @brief Store a log entry of the given chunk. + */ +void threadpool_log(struct threadpool *tp, int tid, size_t chunk_size, + ticks tic, ticks toc) { + struct mapper_log *log = &tp->logs[tid > 0 ? tid : 0]; + + /* Check if we need to re-allocate the log buffer. */ + if (log->count == log->size) { + log->size *= 2; + struct mapper_log_entry *new_log; + if ((new_log = (struct mapper_log_entry *)malloc( + sizeof(struct mapper_log_entry) * log->size)) == NULL) + error("Failed to re-allocate mapper log."); + memcpy(new_log, log->log, sizeof(struct mapper_log_entry) * log->count); + free(log->log); + log->log = new_log; + } + + /* Store the new entry. */ + struct mapper_log_entry *entry = &log->log[log->count]; + entry->tid = tid; + entry->chunk_size = chunk_size; + entry->tic = tic; + entry->toc = toc; + entry->map_function = tp->map_function; + log->count++; +} + +void threadpool_dump_log(struct threadpool *tp, const char *filename, + int reset) { + + /* Open the output file. */ + FILE *fd; + if ((fd = fopen(filename, "w")) == NULL) + error("Failed to create log file '%s'.", filename); + + /* Create a buffer of function names. */ + const int max_names = 100; + struct name_entry { + threadpool_map_function map_function; + const char *name; + }; + struct name_entry names[max_names]; + bzero(names, sizeof(struct name_entry) * max_names); + + /* Write a header. */ + fprintf(fd, "# map_function thread_id chunk_size tic toc\n"); + fprintf(fd, "# {'num_threads': %i, 'cpufreq': %lli}\n", tp->num_threads, + clocks_get_cpufreq()); + + /* Loop over the per-tid logs and dump them. */ + for (int k = 0; k < tp->num_threads; k++) { + struct mapper_log *log = &tp->logs[k]; + + /* Loop over the log entries and dump them. */ + for (int i = 0; i < log->count; i++) { + + struct mapper_log_entry *entry = &log->log[i]; + + /* Look for the function pointer in the buffer. */ + int nid = 0; + while (nid < max_names && names[nid].map_function != entry->map_function) + nid++; + + /* If the name was not found, make a new entry. */ + if (nid == max_names) { + for (int j = 1; j < max_names; j++) names[j - 1] = names[j]; + names[0].map_function = entry->map_function; + Dl_info dl_info; + dladdr(entry->map_function, &dl_info); + names[0].name = dl_info.dli_sname; + nid = 0; + } + + /* Log a line to the file. */ + fprintf(fd, "%s %i %i %lli %lli\n", names[nid].name, entry->tid, + entry->chunk_size, entry->tic, entry->toc); + } + + /* Clear the log if requested. */ + if (reset) log->count = 0; + } + + /* Close the file. */ + fclose(fd); +} +#endif // SWIFT_DEBUG_THREADPOOL + +/** + * @brief Runner main loop, get a chunk and call the mapper function. + */ +void threadpool_chomp(struct threadpool *tp, int tid) { + + /* Loop until we can't get a chunk. */ + while (1) { + /* Desired chunk size. */ + size_t chunk_size = + (tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads); + if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk; + if (chunk_size < 1) chunk_size = 1; + + /* Get a chunk and check its size. */ + size_t task_ind = atomic_add(&tp->map_data_count, chunk_size); + if (task_ind >= tp->map_data_size) break; + if (task_ind + chunk_size > tp->map_data_size) + chunk_size = tp->map_data_size - task_ind; + +/* Call the mapper function. */ +#ifdef SWIFT_DEBUG_THREADPOOL + ticks tic = getticks(); +#endif + tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind), + chunk_size, tp->map_extra_data); +#ifdef SWIFT_DEBUG_THREADPOOL + threadpool_log(tp, tid, chunk_size, tic, getticks()); +#endif + } +} void *threadpool_runner(void *data) { @@ -43,39 +169,17 @@ void *threadpool_runner(void *data) { while (1) { /* Let the controller know that this thread is waiting. */ - pthread_mutex_lock(&tp->thread_mutex); - tp->num_threads_waiting += 1; - if (tp->num_threads_waiting == tp->num_threads) { - pthread_cond_signal(&tp->control_cond); - } + pthread_barrier_wait(&tp->wait_barrier); /* Wait for the controller. */ - pthread_cond_wait(&tp->thread_cond, &tp->thread_mutex); - tp->num_threads_waiting -= 1; - tp->num_threads_running += 1; - if (tp->num_threads_running == tp->num_threads) { - pthread_cond_signal(&tp->control_cond); - } - pthread_mutex_unlock(&tp->thread_mutex); - - /* The index of the mapping task we will work on next. */ - while (1) { - /* Desired chunk size. */ - size_t chunk_size = - (tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads); - if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk; - if (chunk_size < 1) chunk_size = 1; - - /* Get a chunk and check its size. */ - size_t task_ind = atomic_add(&tp->map_data_count, chunk_size); - if (task_ind >= tp->map_data_size) break; - if (task_ind + chunk_size > tp->map_data_size) - chunk_size = tp->map_data_size - task_ind; - - /* Call the mapper function. */ - tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind), - chunk_size, tp->map_extra_data); - } + pthread_barrier_wait(&tp->run_barrier); + + /* If no map function is specified, just die. We use this as a mechanism + to shut down threads without leaving the barriers in an invalid state. */ + if (tp->map_function == NULL) pthread_exit(NULL); + + /* Do actual work. */ + threadpool_chomp(tp, atomic_inc(&tp->num_threads_running)); } } @@ -89,18 +193,28 @@ void threadpool_init(struct threadpool *tp, int num_threads) { /* Initialize the thread counters. */ tp->num_threads = num_threads; - tp->num_threads_waiting = 0; + +#ifdef SWIFT_DEBUG_THREADPOOL + if ((tp->logs = (struct mapper_log *)malloc(sizeof(struct mapper_log) * + num_threads)) == NULL) + error("Failed to allocate mapper logs."); + for (int k = 0; k < num_threads; k++) { + tp->logs[k].size = threadpool_log_initial_size; + tp->logs[k].count = 0; + if ((tp->logs[k].log = (struct mapper_log_entry *)malloc( + sizeof(struct mapper_log_entry) * tp->logs[k].size)) == NULL) + error("Failed to allocate mapper log."); + } +#endif /* If there is only a single thread, do nothing more as of here as we will just do work in the (blocked) calling thread. */ if (num_threads == 1) return; - /* Init the threadpool mutexes. */ - if (pthread_mutex_init(&tp->thread_mutex, NULL) != 0) - error("Failed to initialize mutexex."); - if (pthread_cond_init(&tp->control_cond, NULL) != 0 || - pthread_cond_init(&tp->thread_cond, NULL) != 0) - error("Failed to initialize condition variables."); + /* Init the barriers. */ + if (pthread_barrier_init(&tp->wait_barrier, NULL, num_threads) != 0 || + pthread_barrier_init(&tp->run_barrier, NULL, num_threads) != 0) + error("Failed to initialize barriers."); /* Set the task counter to zero. */ tp->map_data_size = 0; @@ -109,24 +223,21 @@ void threadpool_init(struct threadpool *tp, int num_threads) { tp->map_data_chunk = 0; tp->map_function = NULL; - /* Allocate the threads. */ - if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads)) == - NULL) { + /* Allocate the threads, one less than requested since the calling thread + works as well. */ + if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * + (num_threads - 1))) == NULL) { error("Failed to allocate thread array."); } /* Create and start the threads. */ - pthread_mutex_lock(&tp->thread_mutex); - for (int k = 0; k < num_threads; k++) { + for (int k = 0; k < num_threads - 1; k++) { if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0) error("Failed to create threadpool runner thread."); } /* Wait for all the threads to be up and running. */ - while (tp->num_threads_waiting < tp->num_threads) { - pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); - } - pthread_mutex_unlock(&tp->thread_mutex); + pthread_barrier_wait(&tp->wait_barrier); } /** @@ -140,7 +251,8 @@ void threadpool_init(struct threadpool *tp, int num_threads) { * @param map_data The data on which the mapping function will be called. * @param N Number of elements in @c map_data. * @param stride Size, in bytes, of each element of @c map_data. - * @param chunk Number of map data elements to pass to the function at a time. + * @param chunk Number of map data elements to pass to the function at a time, + * or zero to choose the number automatically. * @param extra_data Addtitional pointer that will be passed to the mapping * function, may contain additional data. */ @@ -148,37 +260,84 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function, void *map_data, size_t N, int stride, int chunk, void *extra_data) { +#ifdef SWIFT_DEBUG_THREADPOOL + ticks tic = getticks(); +#endif + /* If we just have a single thread, call the map function directly. */ if (tp->num_threads == 1) { map_function(map_data, N, extra_data); +#ifdef SWIFT_DEBUG_THREADPOOL + threadpool_log(tp, 0, N, tic, getticks()); +#endif return; } /* Set the map data and signal the threads. */ - pthread_mutex_lock(&tp->thread_mutex); tp->map_data_stride = stride; tp->map_data_size = N; tp->map_data_count = 0; - tp->map_data_chunk = chunk; + tp->map_data_chunk = + chunk ? chunk + : max((int)(N / (tp->num_threads * threadpool_default_chunk_ratio)), + 1); tp->map_function = map_function; tp->map_data = map_data; tp->map_extra_data = extra_data; tp->num_threads_running = 0; - pthread_cond_broadcast(&tp->thread_cond); /* Wait for all the threads to be up and running. */ - while (tp->num_threads_running < tp->num_threads) { - pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); - } + pthread_barrier_wait(&tp->run_barrier); + + /* Do some work while I'm at it. */ + threadpool_chomp(tp, tp->num_threads - 1); /* Wait for all threads to be done. */ - while (tp->num_threads_waiting < tp->num_threads) { - pthread_cond_wait(&tp->control_cond, &tp->thread_mutex); - } - pthread_mutex_unlock(&tp->thread_mutex); + pthread_barrier_wait(&tp->wait_barrier); + +#ifdef SWIFT_DEBUG_THREADPOOL + /* Log the total call time to thread id -1. */ + threadpool_log(tp, -1, N, tic, getticks()); +#endif +} + +/** + * @brief Re-sets the log for this #threadpool. + */ +#ifdef SWIFT_DEBUG_THREADPOOL +void threadpool_reset_log(struct threadpool *tp) { + for (int k = 0; k < tp->num_threads; k++) tp->logs[k].count = 0; } +#endif /** * @brief Frees up the memory allocated for this #threadpool. */ -void threadpool_clean(struct threadpool *tp) { free(tp->threads); } +void threadpool_clean(struct threadpool *tp) { + + if (tp->num_threads > 1) { + /* Destroy the runner threads by calling them with a NULL mapper function + * and waiting for all the threads to terminate. This ensures that no + * thread is still waiting at a barrier. */ + tp->map_function = NULL; + pthread_barrier_wait(&tp->run_barrier); + for (int k = 0; k < tp->num_threads - 1; k++) { + void *retval; + pthread_join(tp->threads[k], &retval); + } + + /* Release the barriers. */ + if (pthread_barrier_destroy(&tp->wait_barrier) != 0 || + pthread_barrier_destroy(&tp->run_barrier) != 0) + error("Failed to destroy threadpool barriers."); + } + + /* Clean up memory. */ + free(tp->threads); +#ifdef SWIFT_DEBUG_THREADPOOL + for (int k = 0; k < tp->num_threads; k++) { + free(tp->logs[k].log); + } + free(tp->logs); +#endif +} diff --git a/src/threadpool.h b/src/threadpool.h index f9c7eeffb700adc579ec05902193b888cdd6363d..019403f658a22d36c4a6e1ec1ae1fdc47c62658d 100644 --- a/src/threadpool.h +++ b/src/threadpool.h @@ -25,10 +25,44 @@ /* Some standard headers. */ #include <pthread.h> +/* Local includes. */ +#include "cycle.h" + +/* Local defines. */ +#define threadpool_log_initial_size 1000 +#define threadpool_default_chunk_ratio 7 + /* Function type for mappings. */ typedef void (*threadpool_map_function)(void *map_data, int num_elements, void *extra_data); +/* Data for threadpool logging. */ +struct mapper_log_entry { + + /* ID of the thread executing the chunk. */ + int tid; + + /* Size of the chunk processed. */ + int chunk_size; + + /* Pointer to the mapper function. */ + threadpool_map_function map_function; + + /*! Start and end time of this task */ + ticks tic, toc; +}; + +struct mapper_log { + /* Log of threadpool mapper calls. */ + struct mapper_log_entry *log; + + /* Size of the allocated log. */ + int size; + + /* Number of entries in the log. */ + int count; +}; + /* Data of a threadpool. */ struct threadpool { @@ -36,8 +70,8 @@ struct threadpool { pthread_t *threads; /* This is where threads go to rest. */ - pthread_mutex_t thread_mutex; - pthread_cond_t control_cond, thread_cond; + pthread_barrier_t wait_barrier; + pthread_barrier_t run_barrier; /* Current map data and count. */ void *map_data, *map_extra_data; @@ -49,7 +83,11 @@ struct threadpool { int num_threads; /* Counter for the number of threads that are done. */ - volatile int num_threads_waiting, num_threads_running; + volatile int num_threads_running; + +#ifdef SWIFT_DEBUG_THREADPOOL + struct mapper_log *logs; +#endif }; /* Function prototypes. */ @@ -58,5 +96,10 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function, void *map_data, size_t N, int stride, int chunk, void *extra_data); void threadpool_clean(struct threadpool *tp); +#ifdef SWIFT_DEBUG_THREADPOOL +void threadpool_reset_log(struct threadpool *tp); +void threadpool_dump_log(struct threadpool *tp, const char *filename, + int reset); +#endif #endif /* SWIFT_THREADPOOL_H */ diff --git a/tests/testThreadpool.c b/tests/testThreadpool.c index aa65d533a29afbe4e7e8384fb887281822a31e58..81fee0a15ae1287276ada61de5eed79051b77e1e 100644 --- a/tests/testThreadpool.c +++ b/tests/testThreadpool.c @@ -17,6 +17,8 @@ * ******************************************************************************/ +#include "../config.h" + // Standard includes. #include <stdio.h> #include <stdlib.h> @@ -78,6 +80,11 @@ int main(int argc, char *argv[]) { threadpool_map(&tp, map_function_first, data, N, sizeof(int), 2, NULL); } +/* If logging was enabled, dump the log. */ +#ifdef SWIFT_DEBUG_THREADPOOL + threadpool_dump_log(&tp, "threadpool_log.txt", 1); +#endif + /* Be clean */ threadpool_clean(&tp);