Skip to content
Snippets Groups Projects
Commit e8083205 authored by Matthieu Schaller's avatar Matthieu Schaller
Browse files

Merge branch 'master' into gravity_multi_dt

parents 76a2d1db b90e24d0
Branches
Tags
1 merge request!328Gravity multi dt
......@@ -16,6 +16,8 @@ import glob
import re
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats
import ntpath
params = {'axes.labelsize': 14,
'axes.titlesize': 18,
......@@ -48,29 +50,20 @@ threadList = []
hexcols = ['#332288', '#88CCEE', '#44AA99', '#117733', '#999933', '#DDCC77',
'#CC6677', '#882255', '#AA4499', '#661100', '#6699CC', '#AA4466',
'#4477AA']
linestyle = (hexcols[0],hexcols[1],hexcols[3],hexcols[5],hexcols[6],hexcols[8])
#cmdLine = './swift -s -t 16 cosmoVolume.yml'
#platform = 'KNL'
linestyle = (hexcols[0],hexcols[1],hexcols[3],hexcols[5],hexcols[6],hexcols[8],hexcols[2],hexcols[4],hexcols[7],hexcols[9])
numTimesteps = 0
legendTitle = ' '
inputFileNames = []
# Work out how many data series there are
if len(sys.argv) == 2:
inputFileNames = (sys.argv[1],"")
numOfSeries = 1
elif len(sys.argv) == 3:
inputFileNames = (sys.argv[1],sys.argv[2])
numOfSeries = 2
elif len(sys.argv) == 4:
inputFileNames = (sys.argv[1],sys.argv[2],sys.argv[3])
numOfSeries = 3
elif len(sys.argv) == 5:
inputFileNames = (sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4])
numOfSeries = 4
elif len(sys.argv) == 6:
inputFileNames = (sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4],sys.argv[5])
numOfSeries = 5
elif len(sys.argv) == 7:
inputFileNames = (sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4],sys.argv[5],sys.argv[6])
numOfSeries = 6
if len(sys.argv) == 1:
print "Please specify an input file in the arguments."
sys.exit()
else:
for fileName in sys.argv[1:]:
inputFileNames.append(fileName)
numOfSeries = int(len(sys.argv) - 1)
# Get the names of the branch, Git revision, hydro scheme and hydro kernel
def parse_header(inputFile):
......@@ -105,23 +98,35 @@ def parse_header(inputFile):
# Parse file and return total time taken, speed up and parallel efficiency
def parse_files():
times = []
totalTime = []
serialTime = []
sumTotal = []
speedUp = []
parallelEff = []
for i in range(0,numOfSeries): # Loop over each data series
for i in range(0,numOfSeries): # Loop over each data series
# Get path to set of files
path, name = ntpath.split(inputFileNames[i])
# Get each file that starts with the cmd line arg
file_list = glob.glob(inputFileNames[i] + "*")
threadList.append([])
# Remove path from file names
for j in range(0,len(file_list)):
p, filename = ntpath.split(file_list[j])
file_list[j] = filename
# Create a list of threads using the list of files
for fileName in file_list:
s = re.split(r'[_.]+',fileName)
threadList[i].append(int(s[1]))
# Re-add path once each file has been found
if len(path) != 0:
for j in range(0,len(file_list)):
file_list[j] = path + '/' + file_list[j]
# Sort the thread list in ascending order and save the indices
sorted_indices = np.argsort(threadList[i])
......@@ -133,38 +138,50 @@ def parse_files():
parse_header(file_list[0])
branch[i] = branch[i].replace("_", "\\_")
version.append("$\\textrm{%s}$"%str(branch[i]) + " " + revision[i] + "\n" + hydro_scheme[i] +
"\n" + hydro_kernel[i] + r", $N_{ngb}=%d$"%float(hydro_neighbours[i]) +
r", $\eta=%.3f$"%float(hydro_eta[i]))
times.append([])
#version.append("$\\textrm{%s}$"%str(branch[i]))# + " " + revision[i])# + "\n" + hydro_scheme[i] +
# "\n" + hydro_kernel[i] + r", $N_{ngb}=%d$"%float(hydro_neighbours[i]) +
# r", $\eta=%.3f$"%float(hydro_eta[i]))
totalTime.append([])
speedUp.append([])
parallelEff.append([])
# Loop over all files for a given series and load the times
for j in range(0,len(file_list)):
times[i].append([])
times[i][j].append(np.loadtxt(file_list[j],usecols=(5,), skiprows=11))
totalTime[i].append(np.sum(times[i][j]))
times = np.loadtxt(file_list[j],usecols=(6,))
updates = np.loadtxt(file_list[j],usecols=(3,))
totalTime[i].append(np.sum(times))
sumTotal.append(np.sum(totalTime[i]))
serialTime.append(totalTime[i][0])
# Loop over all files for a given series and calculate speed up and parallel efficiency
# Sort the total times in descending order
sorted_indices = np.argsort(sumTotal)[::-1]
totalTime = [ totalTime[j] for j in sorted_indices]
branchNew = [ branch[j] for j in sorted_indices]
for i in range(0,numOfSeries):
version.append("$\\textrm{%s}$"%str(branchNew[i]))
global numTimesteps
numTimesteps = len(times)
# Find speed-up and parallel efficiency
for i in range(0,numOfSeries):
for j in range(0,len(file_list)):
speedUp[i].append(serialTime[i] / totalTime[i][j])
speedUp[i].append(totalTime[i][0] / totalTime[i][j])
parallelEff[i].append(speedUp[i][j] / threadList[i][j])
return (times,totalTime,speedUp,parallelEff)
return (totalTime,speedUp,parallelEff)
def print_results(times,totalTime,parallelEff,version):
def print_results(totalTime,parallelEff,version):
for i in range(0,numOfSeries):
print " "
print "------------------------------------"
print version[i]
print "------------------------------------"
print "Wall clock time for: {} time steps".format(len(times[0][0][0]))
print "Wall clock time for: {} time steps".format(numTimesteps)
print "------------------------------------"
for j in range(0,len(threadList[i])):
......@@ -172,7 +189,7 @@ def print_results(times,totalTime,parallelEff,version):
print " "
print "------------------------------------"
print "Parallel Efficiency for: {} time steps".format(len(times[0][0][0]))
print "Parallel Efficiency for: {} time steps".format(numTimesteps)
print "------------------------------------"
for j in range(0,len(threadList[i])):
......@@ -180,8 +197,18 @@ def print_results(times,totalTime,parallelEff,version):
return
def plot_results(times,totalTime,speedUp,parallelEff):
# Returns a lighter/darker version of the colour
def color_variant(hex_color, brightness_offset=1):
rgb_hex = [hex_color[x:x+2] for x in [1, 3, 5]]
new_rgb_int = [int(hex_value, 16) + brightness_offset for hex_value in rgb_hex]
new_rgb_int = [min([255, max([0, i])]) for i in new_rgb_int] # make sure new values are between 0 and 255
# hex() produces "0x88", we want just "88"
return "#" + "".join([hex(i)[2:] for i in new_rgb_int])
def plot_results(totalTime,speedUp,parallelEff,numSeries):
fig, axarr = plt.subplots(2, 2, figsize=(10,10), frameon=True)
speedUpPlot = axarr[0, 0]
parallelEffPlot = axarr[0, 1]
......@@ -190,27 +217,27 @@ def plot_results(times,totalTime,speedUp,parallelEff):
# Plot speed up
speedUpPlot.plot(threadList[0],threadList[0], linestyle='--', lw=1.5, color='0.2')
for i in range(0,numOfSeries):
speedUpPlot.plot(threadList[i],speedUp[i],linestyle[i],label=version[i])
for i in range(0,numSeries):
speedUpPlot.plot(threadList[0],speedUp[i],linestyle[i],label=version[i])
speedUpPlot.set_ylabel("${\\rm Speed\\textendash up}$", labelpad=0.)
speedUpPlot.set_xlabel("${\\rm Threads}$", labelpad=0.)
speedUpPlot.set_xlim([0.7,threadList[i][-1]+1])
speedUpPlot.set_ylim([0.7,threadList[i][-1]+1])
speedUpPlot.set_xlim([0.7,threadList[0][-1]+1])
speedUpPlot.set_ylim([0.7,threadList[0][-1]+1])
# Plot parallel efficiency
parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [1,1], 'k--', lw=1.5, color='0.2')
parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.9,0.9], 'k--', lw=1.5, color='0.2')
parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.75,0.75], 'k--', lw=1.5, color='0.2')
parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.5,0.5], 'k--', lw=1.5, color='0.2')
for i in range(0,numOfSeries):
parallelEffPlot.plot(threadList[i],parallelEff[i],linestyle[i])
for i in range(0,numSeries):
parallelEffPlot.plot(threadList[0],parallelEff[i],linestyle[i])
parallelEffPlot.set_xscale('log')
parallelEffPlot.set_ylabel("${\\rm Parallel~efficiency}$", labelpad=0.)
parallelEffPlot.set_xlabel("${\\rm Threads}$", labelpad=0.)
parallelEffPlot.set_ylim([0,1.1])
parallelEffPlot.set_xlim([0.9,10**(np.floor(np.log10(threadList[i][-1]))+0.5)])
parallelEffPlot.set_xlim([0.9,10**(np.floor(np.log10(threadList[0][-1]))+0.5)])
# Plot time to solution
for i in range(0,numOfSeries):
......@@ -218,20 +245,15 @@ def plot_results(times,totalTime,speedUp,parallelEff):
totalTimePlot.loglog(pts,totalTime[i][0]/pts, 'k--', lw=1., color='0.2')
totalTimePlot.loglog(threadList[i],totalTime[i],linestyle[i],label=version[i])
y_min = 10**np.floor(np.log10(np.min(totalTime[:][-1])*0.6))
y_max = 1.2*10**np.floor(np.log10(np.max(totalTime[:][0]) * 1.5)+1)
y_min = 10**np.floor(np.log10(np.min(totalTime[:][0])*0.6))
y_max = 1.0*10**np.floor(np.log10(np.max(totalTime[:][0]) * 1.5)+1)
totalTimePlot.set_xscale('log')
totalTimePlot.set_xlabel("${\\rm Threads}$", labelpad=0.)
totalTimePlot.set_ylabel("${\\rm Time~to~solution}~[{\\rm ms}]$", labelpad=0.)
totalTimePlot.set_xlim([0.9, 10**(np.floor(np.log10(threadList[i][-1]))+0.5)])
totalTimePlot.set_xlim([0.9, 10**(np.floor(np.log10(threadList[0][-1]))+0.5)])
totalTimePlot.set_ylim(y_min, y_max)
ax2 = totalTimePlot.twinx()
ax2.set_yscale('log')
ax2.set_ylabel("${\\rm Time~to~solution}~[{\\rm hr}]$", labelpad=0.)
ax2.set_ylim(y_min / 3.6e6, y_max / 3.6e6)
totalTimePlot.legend(bbox_to_anchor=(1.21, 0.97), loc=2, borderaxespad=0.,prop={'size':12}, frameon=False)
totalTimePlot.legend(bbox_to_anchor=(1.21, 0.97), loc=2, borderaxespad=0.,prop={'size':12}, frameon=False,title=legendTitle)
emptyPlot.axis('off')
for i, txt in enumerate(threadList[0]):
......@@ -240,17 +262,19 @@ def plot_results(times,totalTime,speedUp,parallelEff):
parallelEffPlot.annotate("$%s$"%txt, (threadList[0][i],parallelEff[0][i]), (threadList[0][i], parallelEff[0][i]+0.02), color=hexcols[0])
totalTimePlot.annotate("$%s$"%txt, (threadList[0][i],totalTime[0][i]), (threadList[0][i], totalTime[0][i]*1.1), color=hexcols[0])
#fig.suptitle("Thread Speed Up, Parallel Efficiency and Time To Solution for {} Time Steps of Cosmo Volume\n Cmd Line: {}, Platform: {}".format(len(times[0][0][0]),cmdLine,platform))
fig.suptitle("${\\rm Speed\\textendash up,~parallel~efficiency~and~time~to~solution~for}~%d~{\\rm time\\textendash steps}$"%len(times[0][0][0]), fontsize=16)
#fig.suptitle("Thread Speed Up, Parallel Efficiency and Time To Solution for {} Time Steps of Cosmo Volume\n Cmd Line: {}, Platform: {}".format(numTimesteps),cmdLine,platform))
fig.suptitle("${\\rm Speed\\textendash up,~parallel~efficiency~and~time~to~solution~for}~%d~{\\rm time\\textendash steps}$"%numTimesteps, fontsize=16)
return
# Calculate results
(times,totalTime,speedUp,parallelEff) = parse_files()
(totalTime,speedUp,parallelEff) = parse_files()
legendTitle = version[0]
plot_results(times,totalTime,speedUp,parallelEff)
plot_results(totalTime,speedUp,parallelEff,numOfSeries)
print_results(times,totalTime,parallelEff,version)
print_results(totalTime,parallelEff,version)
# And plot
plt.show()
#!/usr/bin/env python
#
# Usage:
# python plot_scaling_results.py input-file1-ext input-file2-ext ...
#
# Description:
# Plots speed up, parallel efficiency and time to solution given a "timesteps" output file generated by SWIFT.
#
# Example:
# python plot_scaling_results.py _hreads_cosma_stdout.txt _threads_knl_stdout.txt
#
# The working directory should contain files 1_threads_cosma_stdout.txt - 64_threads_cosma_stdout.txt and 1_threads_knl_stdout.txt - 64_threads_knl_stdout.txt, i.e wall clock time for each run using a given number of threads
import sys
import glob
import re
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats
import ntpath
params = {'axes.labelsize': 14,
'axes.titlesize': 18,
'font.size': 12,
'legend.fontsize': 12,
'xtick.labelsize': 14,
'ytick.labelsize': 14,
'text.usetex': True,
'figure.subplot.left' : 0.055,
'figure.subplot.right' : 0.98 ,
'figure.subplot.bottom' : 0.05 ,
'figure.subplot.top' : 0.95 ,
'figure.subplot.wspace' : 0.14 ,
'figure.subplot.hspace' : 0.12 ,
'lines.markersize' : 6,
'lines.linewidth' : 3.,
'text.latex.unicode': True
}
plt.rcParams.update(params)
plt.rc('font',**{'family':'sans-serif','sans-serif':['Times']})
version = []
branch = []
revision = []
hydro_scheme = []
hydro_kernel = []
hydro_neighbours = []
hydro_eta = []
threadList = []
hexcols = ['#332288', '#88CCEE', '#44AA99', '#117733', '#999933', '#DDCC77',
'#CC6677', '#882255', '#AA4499', '#661100', '#6699CC', '#AA4466',
'#4477AA']
linestyle = (hexcols[0],hexcols[1],hexcols[3],hexcols[5],hexcols[6],hexcols[8],hexcols[2],hexcols[4],hexcols[7],hexcols[9])
numTimesteps = 0
legendTitle = ' '
inputFileNames = []
# Work out how many data series there are
if len(sys.argv) == 1:
print "Please specify an input file in the arguments."
sys.exit()
else:
for fileName in sys.argv[1:]:
inputFileNames.append(fileName)
numOfSeries = int(len(sys.argv) - 1)
# Get the names of the branch, Git revision, hydro scheme and hydro kernel
def parse_header(inputFile):
with open(inputFile, 'r') as f:
found_end = False
for line in f:
if 'Branch:' in line:
s = line.split()
line = s[2:]
branch.append(" ".join(line))
elif 'Revision:' in line:
s = line.split()
revision.append(s[2])
elif 'Hydrodynamic scheme:' in line:
line = line[2:-1]
s = line.split()
line = s[2:]
hydro_scheme.append(" ".join(line))
elif 'Hydrodynamic kernel:' in line:
line = line[2:-1]
s = line.split()
line = s[2:5]
hydro_kernel.append(" ".join(line))
elif 'neighbours:' in line:
s = line.split()
hydro_neighbours.append(s[4])
elif 'Eta:' in line:
s = line.split()
hydro_eta.append(s[2])
return
# Parse file and return total time taken, speed up and parallel efficiency
def parse_files():
totalTime = []
sumTotal = []
speedUp = []
parallelEff = []
for i in range(0,numOfSeries): # Loop over each data series
# Get path to set of files
path, name = ntpath.split(inputFileNames[i])
# Get each file that starts with the cmd line arg
file_list = glob.glob(inputFileNames[i] + "*")
threadList.append([])
# Remove path from file names
for j in range(0,len(file_list)):
p, filename = ntpath.split(file_list[j])
file_list[j] = filename
# Create a list of threads using the list of files
for fileName in file_list:
s = re.split(r'[_.]+',fileName)
threadList[i].append(int(s[1]))
# Re-add path once each file has been found
if len(path) != 0:
for j in range(0,len(file_list)):
file_list[j] = path + '/' + file_list[j]
# Sort the thread list in ascending order and save the indices
sorted_indices = np.argsort(threadList[i])
threadList[i].sort()
# Sort the file list in ascending order acording to the thread number
file_list = [ file_list[j] for j in sorted_indices]
parse_header(file_list[0])
branch[i] = branch[i].replace("_", "\\_")
#version.append("$\\textrm{%s}$"%str(branch[i]))# + " " + revision[i])# + "\n" + hydro_scheme[i] +
# "\n" + hydro_kernel[i] + r", $N_{ngb}=%d$"%float(hydro_neighbours[i]) +
# r", $\eta=%.3f$"%float(hydro_eta[i]))
totalTime.append([])
speedUp.append([])
parallelEff.append([])
# Loop over all files for a given series and load the times
for j in range(0,len(file_list)):
times = np.loadtxt(file_list[j],usecols=(6,), skiprows=11)
updates = np.loadtxt(file_list[j],usecols=(3,), skiprows=11)
totalTime[i].append(np.sum(times))
sumTotal.append(np.sum(totalTime[i]))
# Sort the total times in descending order
sorted_indices = np.argsort(sumTotal)[::-1]
totalTime = [ totalTime[j] for j in sorted_indices]
branchNew = [ branch[j] for j in sorted_indices]
for i in range(0,numOfSeries):
version.append("$\\textrm{%s}$"%str(branchNew[i]))
global numTimesteps
numTimesteps = len(times)
# Find speed-up and parallel efficiency
for i in range(0,numOfSeries):
for j in range(0,len(file_list)):
speedUp[i].append(totalTime[i][0] / totalTime[i][j])
parallelEff[i].append(speedUp[i][j] / threadList[i][j])
return (totalTime,speedUp,parallelEff)
def print_results(totalTime,parallelEff,version):
for i in range(0,numOfSeries):
print " "
print "------------------------------------"
print version[i]
print "------------------------------------"
print "Wall clock time for: {} time steps".format(numTimesteps)
print "------------------------------------"
for j in range(0,len(threadList[i])):
print str(threadList[i][j]) + " threads: {}".format(totalTime[i][j])
print " "
print "------------------------------------"
print "Parallel Efficiency for: {} time steps".format(numTimesteps)
print "------------------------------------"
for j in range(0,len(threadList[i])):
print str(threadList[i][j]) + " threads: {}".format(parallelEff[i][j])
return
# Returns a lighter/darker version of the colour
def color_variant(hex_color, brightness_offset=1):
rgb_hex = [hex_color[x:x+2] for x in [1, 3, 5]]
new_rgb_int = [int(hex_value, 16) + brightness_offset for hex_value in rgb_hex]
new_rgb_int = [min([255, max([0, i])]) for i in new_rgb_int] # make sure new values are between 0 and 255
# hex() produces "0x88", we want just "88"
return "#" + "".join([hex(i)[2:] for i in new_rgb_int])
def plot_results(totalTime,speedUp,parallelEff,numSeries):
fig, axarr = plt.subplots(2, 2, figsize=(10,10), frameon=True)
speedUpPlot = axarr[0, 0]
parallelEffPlot = axarr[0, 1]
totalTimePlot = axarr[1, 0]
emptyPlot = axarr[1, 1]
# Plot speed up
speedUpPlot.plot(threadList[0],threadList[0], linestyle='--', lw=1.5, color='0.2')
for i in range(0,numSeries):
speedUpPlot.plot(threadList[0],speedUp[i],linestyle[i],label=version[i])
speedUpPlot.set_ylabel("${\\rm Speed\\textendash up}$", labelpad=0.)
speedUpPlot.set_xlabel("${\\rm Threads}$", labelpad=0.)
speedUpPlot.set_xlim([0.7,threadList[0][-1]+1])
speedUpPlot.set_ylim([0.7,threadList[0][-1]+1])
# Plot parallel efficiency
parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [1,1], 'k--', lw=1.5, color='0.2')
parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.9,0.9], 'k--', lw=1.5, color='0.2')
parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.75,0.75], 'k--', lw=1.5, color='0.2')
parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.5,0.5], 'k--', lw=1.5, color='0.2')
for i in range(0,numSeries):
parallelEffPlot.plot(threadList[0],parallelEff[i],linestyle[i])
parallelEffPlot.set_xscale('log')
parallelEffPlot.set_ylabel("${\\rm Parallel~efficiency}$", labelpad=0.)
parallelEffPlot.set_xlabel("${\\rm Threads}$", labelpad=0.)
parallelEffPlot.set_ylim([0,1.1])
parallelEffPlot.set_xlim([0.9,10**(np.floor(np.log10(threadList[0][-1]))+0.5)])
# Plot time to solution
for i in range(0,numSeries):
for j in range(0,len(threadList[0])):
totalTime[i][j] = totalTime[i][j] * threadList[i][j]
if i > 1:
totalTime[i][j] = totalTime[i][j] + totalTime[i-1][j]
totalTimePlot.plot(threadList[0],totalTime[i],linestyle[i],label=version[i])
if i > 1:
colour = color_variant(linestyle[i],100)
totalTimePlot.fill_between(threadList[0],np.array(totalTime[i]),np.array(totalTime[i-1]), facecolor=colour)
elif i==1:
colour = color_variant(linestyle[i],100)
totalTimePlot.fill_between(threadList[0], totalTime[i],facecolor=colour)
totalTimePlot.set_xscale('log')
totalTimePlot.ticklabel_format(style='sci', axis='y', scilimits=(0,0))
totalTimePlot.set_xlabel("${\\rm Threads}$", labelpad=0.)
totalTimePlot.set_ylabel("${\\rm Time~to~solution~x~No.~of~cores}~[{\\rm ms}]$", labelpad=0.)
totalTimePlot.set_xlim([0.9, 10**(np.floor(np.log10(threadList[0][-1]))+0.5)])
#totalTimePlot.set_ylim(y_min, y_max)
totalTimePlot.legend(bbox_to_anchor=(1.21, 0.97), loc=2, borderaxespad=0.,prop={'size':12}, frameon=False,title=legendTitle)
emptyPlot.axis('off')
for i, txt in enumerate(threadList[0]):
if 2**np.floor(np.log2(threadList[0][i])) == threadList[0][i]: # only powers of 2
speedUpPlot.annotate("$%s$"%txt, (threadList[0][i],speedUp[0][i]), (threadList[0][i],speedUp[0][i] + 0.3), color=hexcols[0])
parallelEffPlot.annotate("$%s$"%txt, (threadList[0][i],parallelEff[0][i]), (threadList[0][i], parallelEff[0][i]+0.02), color=hexcols[0])
totalTimePlot.annotate("$%s$"%txt, (threadList[0][i],totalTime[0][i]), (threadList[0][i], totalTime[0][i]*1.1), color=hexcols[0])
#fig.suptitle("Thread Speed Up, Parallel Efficiency and Time To Solution for {} Time Steps of Cosmo Volume\n Cmd Line: {}, Platform: {}".format(numTimesteps),cmdLine,platform))
fig.suptitle("${\\rm Speed\\textendash up,~parallel~efficiency~and~time~to~solution~x~no.~of~cores~for}~%d~{\\rm time\\textendash steps}$"%numTimesteps, fontsize=16)
return
# Calculate results
(totalTime,speedUp,parallelEff) = parse_files()
legendTitle = version[0]
plot_results(totalTime,speedUp,parallelEff,numOfSeries)
print_results(totalTime,parallelEff,version)
# And plot
plt.show()
......@@ -46,7 +46,7 @@ include_HEADERS = space.h runner.h queue.h task.h lock.h cell.h part.h const.h \
hydro_properties.h riemann.h threadpool.h cooling.h cooling_struct.h sourceterms.h \
sourceterms_struct.h statistics.h memswap.h cache.h runner_doiact_vec.h profiler.h \
dump.h logger.h active.h timeline.h xmf.h gravity_properties.h gravity_derivatives.h \
vector_power.h hydro_space.h sort_part.h
vector_power.h collectgroup.h hydro_space.h sort_part.h
# Common source files
AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \
......@@ -57,7 +57,7 @@ AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \
runner_doiact_fft.c threadpool.c cooling.c sourceterms.c \
statistics.c runner_doiact_vec.c profiler.c dump.c logger.c \
part_type.c xmf.c gravity_properties.c gravity.c \
hydro_space.c
collectgroup.c hydro_space.c
# Include files for distribution, not installation.
nobase_noinst_HEADERS = align.h approx_math.h atomic.h cycle.h error.h inline.h kernel_hydro.h kernel_gravity.h \
......
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2017 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/* Config parameters. */
#include "../config.h"
/* MPI headers. */
#ifdef WITH_MPI
#include <mpi.h>
#endif
/* This object's header. */
#include "collectgroup.h"
/* Local headers. */
#include "engine.h"
#include "error.h"
#ifdef WITH_MPI
/* Local collections for MPI reduces. */
struct mpicollectgroup1 {
size_t updates, g_updates, s_updates;
integertime_t ti_end_min;
int forcerebuild;
};
/* Forward declarations. */
static void mpicollect_create_MPI_type();
/**
* @brief MPI datatype for the #mpicollectgroup1 structure.
*/
static MPI_Datatype mpicollectgroup1_type;
/**
* @brief MPI operator to reduce #mpicollectgroup1 structures.
*/
static MPI_Op mpicollectgroup1_reduce_op;
#endif
/**
* @brief Perform any once only initialisations. Must be called once.
*/
void collectgroup_init() {
#ifdef WITH_MPI
/* Initialise the MPI types. */
mpicollect_create_MPI_type();
#endif
}
/**
* @brief Apply the collectgroup1 values to the engine by copying all the
* values to the engine fields.
*
* @param grp1 The #collectgroup1
* @param e The #engine
*/
void collectgroup1_apply(struct collectgroup1 *grp1, struct engine *e) {
e->ti_end_min = grp1->ti_end_min;
e->ti_end_max = grp1->ti_end_max;
e->ti_beg_max = grp1->ti_beg_max;
e->updates = grp1->updates;
e->g_updates = grp1->g_updates;
e->s_updates = grp1->s_updates;
e->forcerebuild = grp1->forcerebuild;
}
/**
* @brief Initialises a collectgroup1 struct ready for processing.
*
* @param grp1 The #collectgroup1 to initialise
* @param updates the number of updated hydro particles on this node this step.
* @param g_updates the number of updated gravity particles on this node this step.
* @param s_updates the number of updated star particles on this node this step.
* @param ti_end_min the minimum end time for next time step after this step.
* @param ti_end_max the maximum end time for next time step after this step.
* @param ti_beg_max the maximum begin time for next time step after this step.
* @param forcerebuild whether a rebuild is required after this step.
*/
void collectgroup1_init(struct collectgroup1 *grp1, size_t updates,
size_t g_updates, size_t s_updates,
integertime_t ti_end_min,
integertime_t ti_end_max,
integertime_t ti_beg_max,
int forcerebuild) {
grp1->updates = updates;
grp1->g_updates = g_updates;
grp1->s_updates = s_updates;
grp1->ti_end_min = ti_end_min;
grp1->ti_end_max = ti_end_max;
grp1->ti_beg_max = ti_beg_max;
grp1->forcerebuild = forcerebuild;
}
/**
* @brief Do any processing necessary to the group before it can be used.
*
* This may involve an MPI reduction across all nodes.
*
* @param grp1 the #collectgroup1 struct already initialised by a call
* to collectgroup1_init.
*/
void collectgroup1_reduce(struct collectgroup1 *grp1) {
#ifdef WITH_MPI
/* Populate an MPI group struct and reduce this across all nodes. */
struct mpicollectgroup1 mpigrp11;
mpigrp11.updates = grp1->updates;
mpigrp11.g_updates = grp1->g_updates;
mpigrp11.s_updates = grp1->s_updates;
mpigrp11.ti_end_min = grp1->ti_end_min;
mpigrp11.forcerebuild = grp1->forcerebuild;
struct mpicollectgroup1 mpigrp12;
if (MPI_Allreduce(&mpigrp11, &mpigrp12, 1, mpicollectgroup1_type,
mpicollectgroup1_reduce_op, MPI_COMM_WORLD)
!= MPI_SUCCESS)
error("Failed to reduce mpicollection1.");
/* And update. */
grp1->updates = mpigrp12.updates;
grp1->g_updates = mpigrp12.g_updates;
grp1->s_updates = mpigrp12.s_updates;
grp1->ti_end_min = mpigrp12.ti_end_min;
grp1->forcerebuild = mpigrp12.forcerebuild;
#endif
}
#ifdef WITH_MPI
/**
* @brief Do the reduction of two structs.
*
* @param mpigrp11 the first struct, this is updated on exit.
* @param mpigrp12 the second struct
*/
static void doreduce1(struct mpicollectgroup1 *mpigrp11,
const struct mpicollectgroup1 *mpigrp12) {
/* Do what is needed for each part of the collection. */
/* Sum of updates. */
mpigrp11->updates += mpigrp12->updates;
mpigrp11->g_updates += mpigrp12->g_updates;
mpigrp11->s_updates += mpigrp12->s_updates;
/* Minimum end time. */
mpigrp11->ti_end_min = min(mpigrp11->ti_end_min, mpigrp12->ti_end_min);
/* Everyone must agree to not rebuild. */
if (mpigrp11->forcerebuild || mpigrp12->forcerebuild)
mpigrp11->forcerebuild = 1;
}
/**
* @brief MPI reduce operator for #mpicollectgroup structures.
*/
static void mpicollectgroup1_reduce(void *in, void *inout, int *len,
MPI_Datatype *datatype) {
for (int i = 0; i < *len; ++i)
doreduce1(&((struct mpicollectgroup1 *)inout)[0],
&((const struct mpicollectgroup1 *)in)[i]);
}
/**
* @brief Registers any MPI collection types and reduction functions.
*/
static void mpicollect_create_MPI_type() {
if (MPI_Type_contiguous(sizeof(struct mpicollectgroup1), MPI_BYTE,
&mpicollectgroup1_type) != MPI_SUCCESS ||
MPI_Type_commit(&mpicollectgroup1_type) != MPI_SUCCESS) {
error("Failed to create MPI type for mpicollection1.");
}
/* Create the reduction operation */
MPI_Op_create(mpicollectgroup1_reduce, 1, &mpicollectgroup1_reduce_op);
}
#endif
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2017 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
#ifndef SWIFT_COLLECTGROUP_H
#define SWIFT_COLLECTGROUP_H
/* Config parameters. */
#include "../config.h"
/* Standard headers. */
#include <stddef.h>
/* Local headers. */
#include "timeline.h"
/* Forward declaration of engine struct (to avoid cyclic include). */
struct engine;
/* A collection of global quantities that can be processed at the same time. */
struct collectgroup1 {
/* Number of particles updated */
size_t updates, g_updates, s_updates;
/* Times for the time-step */
integertime_t ti_end_min, ti_end_max, ti_beg_max;
/* Force the engine to rebuild? */
int forcerebuild;
};
void collectgroup_init();
void collectgroup1_apply(struct collectgroup1 *grp1, struct engine *e);
void collectgroup1_init(struct collectgroup1 *grp1, size_t updates,
size_t g_updates, size_t s_updates,
integertime_t ti_end_min,
integertime_t ti_end_max,
integertime_t ti_beg_max,
int forcerebuild);
void collectgroup1_reduce(struct collectgroup1 *grp1);
#endif /* SWIFT_COLLECTGROUP_H */
......@@ -2805,12 +2805,23 @@ void engine_collect_kick(struct cell *c) {
}
/**
* @brief Collects the next time-step by making each super-cell recurse
* to collect the minimal of ti_end and the number of updated particles.
* @brief Collects the next time-step and rebuild flag.
*
* The next time-step is determined by making each super-cell recurse to
* collect the minimal of ti_end and the number of updated particles. When in
* MPI mode this routines reduces these across all nodes and also collects the
* forcerebuild flag -- this is so that we only use a single collective MPI
* call per step for all these values.
*
* Note that the results are stored in e->collect_group1 struct not in the
* engine fields, unless apply is true. These can be applied field-by-field
* or all at once using collectgroup1_copy();
*
* @param e The #engine.
* @param apply whether to apply the results to the engine or just keep in the
* group1 struct.
*/
void engine_collect_timestep(struct engine *e) {
void engine_collect_timestep_and_rebuild(struct engine *e, int apply) {
const ticks tic = getticks();
int updates = 0, g_updates = 0, s_updates = 0;
......@@ -2840,37 +2851,58 @@ void engine_collect_timestep(struct engine *e) {
}
}
/* Aggregate the data from the different nodes. */
/* Store these in the temporary collection group. */
collectgroup1_init(&e->collect_group1, updates,g_updates, s_updates,
ti_end_min,ti_end_max,ti_beg_max,e->forcerebuild);
/* Aggregate collective data from the different nodes for this step. */
#ifdef WITH_MPI
collectgroup1_reduce(&e->collect_group1);
#ifdef SWIFT_DEBUG_CHECKS
{
/* Check the above using the original MPI calls. */
integertime_t in_i[1], out_i[1];
in_i[0] = 0;
out_i[0] = ti_end_min;
if (MPI_Allreduce(out_i, in_i, 1, MPI_LONG_LONG_INT, MPI_MIN,
MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to aggregate t_end_min.");
ti_end_min = in_i[0];
}
{
error("Failed to aggregate ti_end_min.");
if (in_i[0] != (long long)e->collect_group1.ti_end_min)
error("Failed to get same ti_end_min, is %lld, should be %lld",
in_i[0], e->collect_group1.ti_end_min);
long long in_ll[3], out_ll[3];
out_ll[0] = updates;
out_ll[1] = g_updates;
out_ll[2] = s_updates;
if (MPI_Allreduce(out_ll, in_ll, 3, MPI_LONG_LONG_INT, MPI_SUM,
MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to aggregate energies.");
updates = in_ll[0];
g_updates = in_ll[1];
s_updates = in_ll[2];
error("Failed to aggregate particle counts.");
if (in_ll[0] != (long long)e->collect_group1.updates)
error("Failed to get same updates, is %lld, should be %ld",
in_ll[0], e->collect_group1.updates);
if (in_ll[1] != (long long)e->collect_group1.g_updates)
error("Failed to get same g_updates, is %lld, should be %ld",
in_ll[1], e->collect_group1.g_updates);
if (in_ll[2] != (long long)e->collect_group1.s_updates)
error("Failed to get same s_updates, is %lld, should be %ld",
in_ll[2], e->collect_group1.s_updates);
int buff = 0;
if (MPI_Allreduce(&e->forcerebuild, &buff, 1, MPI_INT, MPI_MAX,
MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to aggregate the rebuild flag across nodes.");
if (!!buff != !!e->collect_group1.forcerebuild)
error("Failed to get same rebuild flag from all nodes, is %d,"
"should be %d", buff, e->collect_group1.forcerebuild);
}
#endif
#endif
e->ti_end_min = ti_end_min;
e->ti_end_max = ti_end_max;
e->ti_beg_max = ti_beg_max;
e->updates = updates;
e->g_updates = g_updates;
e->s_updates = s_updates;
/* Apply to the engine, if requested. */
if (apply)
collectgroup1_apply(&e->collect_group1, e);
if (e->verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
......@@ -3112,7 +3144,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs) {
#endif
/* Recover the (integer) end of the next time-step */
engine_collect_timestep(e);
engine_collect_timestep_and_rebuild(e, 1);
clocks_gettime(&time2);
......@@ -3212,14 +3244,12 @@ void engine_step(struct engine *e) {
gravity_exact_force_check(e->s, e, 1e-1);
#endif
/* Collect the values of rebuild from all nodes. */
#ifdef WITH_MPI
int buff = 0;
if (MPI_Allreduce(&e->forcerebuild, &buff, 1, MPI_INT, MPI_MAX,
MPI_COMM_WORLD) != MPI_SUCCESS)
error("Failed to aggregate the rebuild flag across nodes.");
e->forcerebuild = buff;
#endif
/* Collect the values of rebuild from all nodes and recover the (integer)
* end of the next time-step. Do these together to reduce the collective MPI
* calls per step, but some of the gathered information is not applied just
* yet (in case we save a snapshot or drift). */
engine_collect_timestep_and_rebuild(e, 0);
e->forcerebuild = e->collect_group1.forcerebuild;
/* Save some statistics ? */
if (e->time - e->timeLastStatistics >= e->deltaTimeStatistics) {
......@@ -3255,8 +3285,8 @@ void engine_step(struct engine *e) {
e->timeLastStatistics += e->deltaTimeStatistics;
}
/* Recover the (integer) end of the next time-step */
engine_collect_timestep(e);
/* Now apply all the collected time step updates and particle counts. */
collectgroup1_apply(&e->collect_group1, e);
TIMER_TOC2(timer_step);
......@@ -4041,12 +4071,15 @@ void engine_init(struct engine *e, struct space *s,
/* Find the time of the first output */
engine_compute_next_snapshot_time(e);
/* Construct types for MPI communications */
/* Construct types for MPI communications */
#ifdef WITH_MPI
part_create_mpi_types();
stats_create_MPI_type();
#endif
/* Initialise the collection group. */
collectgroup_init();
/* Initialize the threadpool. */
threadpool_init(&e->threadpool, e->nr_threads);
......
......@@ -38,6 +38,7 @@
/* Includes. */
#include "clocks.h"
#include "collectgroup.h"
#include "cooling_struct.h"
#include "gravity_properties.h"
#include "parser.h"
......@@ -243,6 +244,10 @@ struct engine {
/* The (parsed) parameter file */
const struct swift_params *parameter_file;
/* Temporary struct to hold a group of deferable properties (in MPI mode
* these are reduced together, but may not be required just yet). */
struct collectgroup1 collect_group1;
};
/* Function prototypes. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment