Commit c5321323 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'threadpool_task_plots2' into 'master'

Threadpool task plots2

Should fix the problems in the rolled-back merge request of the previous branch, `threadpool_task_plots`.

Which adds the following:

 - --enable-threadpool-debugging to the ./configure flags,
 - -Y to main.c to enable dumping threadpool timers much like the task timers.

This generates threadpool_info-step%d.dat files every -Y steps which contain the mapper function name, the id of the thread that executed it, the number of chunks processed, and the tic and toc timers for the mapper function.

In this merge we also have new scripts to plot and analyse these files and have a new
technique using barriers to start and control the threads. This gives significant
speedups in start up time and time to collective wait (i.e. coming together at task
completion).


See merge request !385
parents 3196f542 f8f55372
......@@ -38,6 +38,7 @@ Valid options are:
1: MPI-rank 0 writes,
2: All MPI-ranks write.
-y {int} Time-step frequency at which task graphs are dumped.
-Y {int} Time-step frequency at which threadpool tasks are dumped.
-h Print this help message and exit.
See the file parameter_example.yml for an example of parameter file.
......
......@@ -189,6 +189,19 @@ if test "$enable_task_debugging" = "yes"; then
AC_DEFINE([SWIFT_DEBUG_TASKS],1,[Enable task debugging])
fi
# Check if threadpool debugging is on.
AC_ARG_ENABLE([threadpool-debugging],
[AS_HELP_STRING([--enable-threadpool-debugging],
[Store threadpool mapper timing information and generate threadpool dump files @<:@yes/no@:>@]
)],
[enable_threadpool_debugging="$enableval"],
[enable_threadpool_debugging="no"]
)
if test "$enable_threadpool_debugging" = "yes"; then
AC_DEFINE([SWIFT_DEBUG_THREADPOOL],1,[Enable threadpool debugging])
LDFLAGS="$LDFLAGS -rdynamic"
fi
# Check if the general timers are switched on.
AC_ARG_ENABLE([timers],
[AS_HELP_STRING([--enable-timers],
......@@ -897,9 +910,10 @@ AC_MSG_RESULT([
Multipole order : $with_multipole_order
No gravity below ID : $no_gravity_below_id
Individual timers : $enable_timers
Task debugging : $enable_task_debugging
Debugging checks : $enable_debugging_checks
Gravity checks : $gravity_force_checks
Individual timers : $enable_timers
Task debugging : $enable_task_debugging
Threadpool debugging : $enable_threadpool_debugging
Debugging checks : $enable_debugging_checks
Gravity checks : $gravity_force_checks
------------------------])
#!/usr/bin/env python
"""
Usage:
analsyse_threadpool_tasks.py [options] input.dat
where input.dat is a threadpool dump for a step. Use the '-Y interval' flag
of the swift command to create these.
The output is an analysis of the threadpool task timings, including deadtime
per thread and step, total amount of time spent for each task type, for the
whole step and per thread and the minimum and maximum times spent per task
type.
This file is part of SWIFT.
Copyright (c) 2017 Peter W. Draper (p.w.draper@durham.ac.uk)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import matplotlib
matplotlib.use("Agg")
import matplotlib.collections as collections
import matplotlib.ticker as plticker
import pylab as pl
import sys
import argparse
# Handle the command line.
parser = argparse.ArgumentParser(description="Analyse task dumps")
parser.add_argument("input", help="Threadpool data file (-y output)")
parser.add_argument("-v", "--verbose", dest="verbose",
help="Verbose output (default: False)",
default=False, action="store_true")
args = parser.parse_args()
infile = args.input
# Read header. First two lines.
with open(infile) as infid:
head = [next(infid) for x in xrange(2)]
header = head[1][2:].strip()
header = eval(header)
nthread = int(header['num_threads']) + 1
CPU_CLOCK = float(header['cpufreq']) / 1000.0
print "Number of threads: ", nthread - 1
if args.verbose:
print "CPU frequency:", CPU_CLOCK * 1000.0
# Read input.
data = pl.genfromtxt(infile, dtype=None, delimiter=" ")
# Mixed types, so need to separate.
tics = []
tocs = []
funcs = []
threads = []
chunks = []
for i in data:
if i[0] != "#":
funcs.append(i[0])
if i[1] < 0:
threads.append(nthread-1)
else:
threads.append(i[1])
chunks.append(i[2])
tics.append(i[3])
tocs.append(i[4])
tics = pl.array(tics)
tocs = pl.array(tocs)
funcs = pl.array(funcs)
threads = pl.array(threads)
chunks = pl.array(chunks)
# Recover the start and end time
tic_step = min(tics)
toc_step = max(tocs)
# Calculate the time range.
total_t = (toc_step - tic_step)/ CPU_CLOCK
print "# Data range: ", total_t, "ms"
print
# Correct times to relative millisecs.
start_t = float(tic_step)
tics = (tics - start_t) / CPU_CLOCK
tocs = (tocs - start_t) / CPU_CLOCK
tasks = {}
tasks[-1] = []
for i in range(nthread):
tasks[i] = []
# Gather into by thread data.
for i in range(len(tics)):
tasks[threads[i]].append([tics[i],tocs[i],funcs[i]])
# Don't actually process the fake thread.
nthread = nthread - 1
# Sort by tic and gather used thread ids.
threadids = []
for i in range(nthread):
if len(tasks[i]) > 0:
tasks[i] = sorted(tasks[i], key=lambda task: task[0])
threadids.append(i)
# Times per task.
print "# Task times:"
print "# -----------"
print "# {0:<31s}: {1:>7s} {2:>9s} {3:>9s} {4:>9s} {5:>9s} {6:>9s}"\
.format("type/subtype", "count","minimum", "maximum",
"sum", "mean", "percent")
alltasktimes = {}
sidtimes = {}
for i in threadids:
tasktimes = {}
for task in tasks[i]:
key = task[2]
dt = task[1] - task[0]
if not key in tasktimes:
tasktimes[key] = []
tasktimes[key].append(dt)
if not key in alltasktimes:
alltasktimes[key] = []
alltasktimes[key].append(dt)
print "# Thread : ", i
for key in sorted(tasktimes.keys()):
taskmin = min(tasktimes[key])
taskmax = max(tasktimes[key])
tasksum = sum(tasktimes[key])
print "{0:33s}: {1:7d} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.4f} {6:9.2f}"\
.format(key, len(tasktimes[key]), taskmin, taskmax, tasksum,
tasksum / len(tasktimes[key]), tasksum / total_t * 100.0)
print
print "# All threads : "
for key in sorted(alltasktimes.keys()):
taskmin = min(alltasktimes[key])
taskmax = max(alltasktimes[key])
tasksum = sum(alltasktimes[key])
print "{0:33s}: {1:7d} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.4f} {6:9.2f}"\
.format(key, len(alltasktimes[key]), taskmin, taskmax, tasksum,
tasksum / len(alltasktimes[key]),
tasksum / (len(threadids) * total_t) * 100.0)
print
# Dead times.
print "# Times not in tasks (deadtimes)"
print "# ------------------------------"
print "# Time before first task:"
print "# no. : {0:>9s} {1:>9s}".format("value", "percent")
predeadtimes = []
for i in threadids:
predeadtime = tasks[i][0][0]
print "thread {0:2d}: {1:9.4f} {2:9.4f}"\
.format(i, predeadtime, predeadtime / total_t * 100.0)
predeadtimes.append(predeadtime)
predeadmin = min(predeadtimes)
predeadmax = max(predeadtimes)
predeadsum = sum(predeadtimes)
print "# : {0:>9s} {1:>9s} {2:>9s} {3:>9s} {4:>9s} {5:>9s}"\
.format("count", "minimum", "maximum", "sum", "mean", "percent")
print "all : {0:9d} {1:9.4f} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.2f}"\
.format(len(predeadtimes), predeadmin, predeadmax, predeadsum,
predeadsum / len(predeadtimes),
predeadsum / (len(threadids) * total_t ) * 100.0)
print
print "# Time after last task:"
print "# no. : {0:>9s} {1:>9s}".format("value", "percent")
postdeadtimes = []
for i in threadids:
postdeadtime = total_t - tasks[i][-1][1]
print "thread {0:2d}: {1:9.4f} {2:9.4f}"\
.format(i, postdeadtime, postdeadtime / total_t * 100.0)
postdeadtimes.append(postdeadtime)
postdeadmin = min(postdeadtimes)
postdeadmax = max(postdeadtimes)
postdeadsum = sum(postdeadtimes)
print "# : {0:>9s} {1:>9s} {2:>9s} {3:>9s} {4:>9s} {5:>9s}"\
.format("count", "minimum", "maximum", "sum", "mean", "percent")
print "all : {0:9d} {1:9.4f} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.2f}"\
.format(len(postdeadtimes), postdeadmin, postdeadmax, postdeadsum,
postdeadsum / len(postdeadtimes),
postdeadsum / (len(threadids) * total_t ) * 100.0)
print
# Time in threadpool, i.e. from first to last tasks.
print "# Time between tasks (threadpool deadtime):"
print "# no. : {0:>9s} {1:>9s} {2:>9s} {3:>9s} {4:>9s} {5:>9s}"\
.format("count", "minimum", "maximum", "sum", "mean", "percent")
threadpooldeadtimes = []
for i in threadids:
deadtimes = []
last = tasks[i][0][0]
for task in tasks[i]:
dt = task[0] - last
deadtimes.append(dt)
last = task[1]
# Drop first value, last value already gone.
if len(deadtimes) > 1:
deadtimes = deadtimes[1:]
else:
# Only one task, so no deadtime by definition.
deadtimes = [0.0]
deadmin = min(deadtimes)
deadmax = max(deadtimes)
deadsum = sum(deadtimes)
print "thread {0:2d}: {1:9d} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.4f} {6:9.2f}"\
.format(i, len(deadtimes), deadmin, deadmax, deadsum,
deadsum / len(deadtimes), deadsum / total_t * 100.0)
threadpooldeadtimes.extend(deadtimes)
deadmin = min(threadpooldeadtimes)
deadmax = max(threadpooldeadtimes)
deadsum = sum(threadpooldeadtimes)
print "all : {0:9d} {1:9.4f} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.2f}"\
.format(len(threadpooldeadtimes), deadmin, deadmax, deadsum,
deadsum / len(threadpooldeadtimes),
deadsum / (len(threadids) * total_t ) * 100.0)
print
# All times in step.
print "# All deadtimes:"
print "# no. : {0:>9s} {1:>9s} {2:>9s} {3:>9s} {4:>9s} {5:>9s}"\
.format("count", "minimum", "maximum", "sum", "mean", "percent")
alldeadtimes = []
for i in threadids:
deadtimes = []
last = 0
for task in tasks[i]:
dt = task[0] - last
deadtimes.append(dt)
last = task[1]
dt = total_t - last
deadtimes.append(dt)
deadmin = min(deadtimes)
deadmax = max(deadtimes)
deadsum = sum(deadtimes)
print "thread {0:2d}: {1:9d} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.4f} {6:9.2f}"\
.format(i, len(deadtimes), deadmin, deadmax, deadsum,
deadsum / len(deadtimes), deadsum / total_t * 100.0)
alldeadtimes.extend(deadtimes)
deadmin = min(alldeadtimes)
deadmax = max(alldeadtimes)
deadsum = sum(alldeadtimes)
print "all : {0:9d} {1:9.4f} {2:9.4f} {3:9.4f} {4:9.4f} {5:9.2f}"\
.format(len(alldeadtimes), deadmin, deadmax, deadsum,
deadsum / len(alldeadtimes),
deadsum / (len(threadids) * total_t ) * 100.0)
print
sys.exit(0)
......@@ -103,6 +103,8 @@ void print_help_message() {
printf(" %2s %14s %s\n", "", "", "2: All MPI-ranks write.");
printf(" %2s %14s %s\n", "-y", "{int}",
"Time-step frequency at which task graphs are dumped.");
printf(" %2s %14s %s\n", "-Y", "{int}",
"Time-step frequency at which threadpool tasks are dumped.");
printf(" %2s %14s %s\n", "-h", "", "Print this help message and exit.");
printf(
"\nSee the file parameter_example.yml for an example of "
......@@ -153,7 +155,8 @@ int main(int argc, char *argv[]) {
/* Let's pin the main thread */
#if defined(HAVE_SETAFFINITY) && defined(HAVE_LIBNUMA) && defined(_GNU_SOURCE)
if (((ENGINE_POLICY)&engine_policy_setaffinity) == engine_policy_setaffinity)
if (((ENGINE_POLICY) & engine_policy_setaffinity) ==
engine_policy_setaffinity)
engine_pin();
#endif
......@@ -163,6 +166,7 @@ int main(int argc, char *argv[]) {
int with_aff = 0;
int dry_run = 0;
int dump_tasks = 0;
int dump_threadpool = 0;
int nsteps = -2;
int with_cosmology = 0;
int with_external_gravity = 0;
......@@ -184,7 +188,7 @@ int main(int argc, char *argv[]) {
/* Parse the parameters */
int c;
while ((c = getopt(argc, argv, "acCdDef:FgGhMn:P:sSt:Tv:y:")) != -1)
while ((c = getopt(argc, argv, "acCdDef:FgGhMn:P:sSt:Tv:y:Y:")) != -1)
switch (c) {
case 'a':
with_aff = 1;
......@@ -273,6 +277,21 @@ int main(int argc, char *argv[]) {
"Task dumping is only possible if SWIFT was configured with the "
"--enable-task-debugging option.");
}
#endif
break;
case 'Y':
if (sscanf(optarg, "%d", &dump_threadpool) != 1) {
if (myrank == 0) printf("Error parsing dump_threadpool (-Y). \n");
if (myrank == 0) print_help_message();
return 1;
}
#ifndef SWIFT_DEBUG_THREADPOOL
if (dump_threadpool) {
error(
"Threadpool dumping is only possible if SWIFT was configured "
"with the "
"--enable-threadpool-debugging option.");
}
#endif
break;
case '?':
......@@ -753,6 +772,22 @@ int main(int argc, char *argv[]) {
#endif // WITH_MPI
}
#endif // SWIFT_DEBUG_TASKS
#ifdef SWIFT_DEBUG_THREADPOOL
/* Dump the task data using the given frequency. */
if (dump_threadpool && (dump_threadpool == 1 || j % dump_threadpool == 1)) {
char dumpfile[40];
#ifdef WITH_MPI
snprintf(dumpfile, 30, "threadpool_info-rank%d-step%d.dat", engine_rank,
j + 1);
#else
snprintf(dumpfile, 30, "threadpool_info-step%d.dat", j + 1);
#endif // WITH_MPI
threadpool_dump_log(&e.threadpool, dumpfile, 1);
} else {
threadpool_reset_log(&e.threadpool);
}
#endif // SWIFT_DEBUG_THREADPOOL
}
/* Print the values of the runner histogram. */
......
#!/usr/bin/env python
"""
Usage:
plot_threadpool.py [options] input.dat output.png
where input.dat is a threadpool info file for a step. Use the '-Y interval'
flag of the swift command to create these. The output plot will be called
'output.png'. The --limit option can be used to produce plots with the same
time span and the --expand option to expand each thread line into '*expand'
lines, so that adjacent tasks of the same type can be distinguished. Other
options can be seen using the --help flag.
This file is part of SWIFT.
Copyright (c) 2015 Pedro Gonnet (pedro.gonnet@durham.ac.uk),
Bert Vandenbroucke (bert.vandenbroucke@ugent.be)
Matthieu Schaller (matthieu.schaller@durham.ac.uk)
(c) 2017 Peter W. Draper (p.w.draper@durham.ac.uk)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import matplotlib
matplotlib.use("Agg")
import matplotlib.collections as collections
import matplotlib.ticker as plticker
import pylab as pl
import sys
import argparse
# Handle the command line.
parser = argparse.ArgumentParser(description="Plot threadpool function graphs")
parser.add_argument("input", help="Threadpool data file (-Y output)")
parser.add_argument("outpng", help="Name for output graphic file (PNG)")
parser.add_argument("-l", "--limit", dest="limit",
help="Upper time limit in millisecs (def: depends on data)",
default=0, type=int)
parser.add_argument("-e", "--expand", dest="expand",
help="Thread expansion factor (def: 1)",
default=1, type=int)
parser.add_argument("--height", dest="height",
help="Height of plot in inches (def: 4)",
default=4., type=float)
parser.add_argument("--width", dest="width",
help="Width of plot in inches (def: 16)",
default=16., type=float)
parser.add_argument("--nolegend", dest="nolegend",
help="Whether to show the legend (def: False)",
default=False, action="store_true")
parser.add_argument("-v", "--verbose", dest="verbose",
help="Show colour assignments and other details (def: False)",
default=False, action="store_true")
args = parser.parse_args()
infile = args.input
outpng = args.outpng
delta_t = args.limit
expand = args.expand
# Basic plot configuration.
PLOT_PARAMS = {"axes.labelsize": 10,
"axes.titlesize": 10,
"font.size": 12,
"legend.fontsize": 12,
"xtick.labelsize": 10,
"ytick.labelsize": 10,
"figure.figsize" : (args.width, args.height),
"figure.subplot.left" : 0.03,
"figure.subplot.right" : 0.995,
"figure.subplot.bottom" : 0.09,
"figure.subplot.top" : 0.99,
"figure.subplot.wspace" : 0.,
"figure.subplot.hspace" : 0.,
"lines.markersize" : 6,
"lines.linewidth" : 3.
}
pl.rcParams.update(PLOT_PARAMS)
# A number of colours for the various types. Recycled when there are
# more task types than colours...
colours = ["cyan", "lightgray", "darkblue", "yellow", "tan", "dodgerblue",
"sienna", "aquamarine", "bisque", "blue", "green", "lightgreen",
"brown", "purple", "moccasin", "olivedrab", "chartreuse",
"darksage", "darkgreen", "green", "mediumseagreen",
"mediumaquamarine", "darkslategrey", "mediumturquoise",
"black", "cadetblue", "skyblue", "red", "slategray", "gold",
"slateblue", "blueviolet", "mediumorchid", "firebrick",
"magenta", "hotpink", "pink", "orange", "lightgreen"]
maxcolours = len(colours)
# Read header. First two lines.
with open(infile) as infid:
head = [next(infid) for x in xrange(2)]
header = head[1][2:].strip()
header = eval(header)
nthread = int(header['num_threads']) + 1
CPU_CLOCK = float(header['cpufreq']) / 1000.0
print "Number of threads: ", nthread
if args.verbose:
print "CPU frequency:", CPU_CLOCK * 1000.0
# Read input.
data = pl.genfromtxt(infile, dtype=None, delimiter=" ")
# Mixed types, so need to separate.
tics = []
tocs = []
funcs = []
threads = []
chunks = []
for i in data:
if i[0] != "#":
funcs.append(i[0])
if i[1] < 0:
threads.append(nthread-1)
else:
threads.append(i[1])
chunks.append(i[2])
tics.append(i[3])
tocs.append(i[4])
tics = pl.array(tics)
tocs = pl.array(tocs)
funcs = pl.array(funcs)
threads = pl.array(threads)
chunks = pl.array(chunks)
# Recover the start and end time
tic_step = min(tics)
toc_step = max(tocs)
# Not known.
# Calculate the time range, if not given.
delta_t = delta_t * CPU_CLOCK
if delta_t == 0:
dt = toc_step - tic_step
if dt > delta_t:
delta_t = dt
print "Data range: ", delta_t / CPU_CLOCK, "ms"
# Once more doing the real gather and plots this time.
start_t = float(tic_step)
tics -= tic_step
tocs -= tic_step
end_t = (toc_step - start_t) / CPU_CLOCK
# Get all "task" names and assign colours.
TASKTYPES = pl.unique(funcs)
print TASKTYPES
# Set colours of task/subtype.
TASKCOLOURS = {}
ncolours = 0
for task in TASKTYPES:
TASKCOLOURS[task] = colours[ncolours]
ncolours = (ncolours + 1) % maxcolours
# For fiddling with colours...
if args.verbose:
print "#Selected colours:"
for task in sorted(TASKCOLOURS.keys()):
print "# " + task + ": " + TASKCOLOURS[task]
for task in sorted(SUBCOLOURS.keys()):
print "# " + task + ": " + SUBCOLOURS[task]
tasks = {}
tasks[-1] = []
for i in range(nthread*expand):
tasks[i] = []
# Counters for each thread when expanding.
ecounter = []
for i in range(nthread):
ecounter.append(0)
for i in range(len(threads)):
thread = threads[i]
# Expand to cover extra lines if expanding.
ethread = thread * expand + (ecounter[thread] % expand)
ecounter[thread] = ecounter[thread] + 1
thread = ethread
tasks[thread].append({})
tasks[thread][-1]["type"] = funcs[i]
tic = tics[i] / CPU_CLOCK
toc = tocs[i] / CPU_CLOCK
tasks[thread][-1]["tic"] = tic
tasks[thread][-1]["toc"] = toc
tasks[thread][-1]["colour"] = TASKCOLOURS[funcs[i]]
# Use expanded threads from now on.
nthread = nthread * expand
typesseen = []
fig = pl.figure()
ax = fig.add_subplot(1,1,1)
ax.set_xlim(-delta_t * 0.01 / CPU_CLOCK, delta_t * 1.01 / CPU_CLOCK)
ax.set_ylim(0, nthread)