diff --git a/examples/plot_scaling_results.py b/examples/plot_scaling_results.py index 26864fe675f502b025f0bf10d73bbba6f8162957..9e938c720f81dcf2246d2e48922ec5b4dd404f3e 100755 --- a/examples/plot_scaling_results.py +++ b/examples/plot_scaling_results.py @@ -16,6 +16,8 @@ import glob import re import numpy as np import matplotlib.pyplot as plt +import scipy.stats +import ntpath params = {'axes.labelsize': 14, 'axes.titlesize': 18, @@ -48,29 +50,20 @@ threadList = [] hexcols = ['#332288', '#88CCEE', '#44AA99', '#117733', '#999933', '#DDCC77', '#CC6677', '#882255', '#AA4499', '#661100', '#6699CC', '#AA4466', '#4477AA'] -linestyle = (hexcols[0],hexcols[1],hexcols[3],hexcols[5],hexcols[6],hexcols[8]) -#cmdLine = './swift -s -t 16 cosmoVolume.yml' -#platform = 'KNL' +linestyle = (hexcols[0],hexcols[1],hexcols[3],hexcols[5],hexcols[6],hexcols[8],hexcols[2],hexcols[4],hexcols[7],hexcols[9]) +numTimesteps = 0 +legendTitle = ' ' + +inputFileNames = [] # Work out how many data series there are -if len(sys.argv) == 2: - inputFileNames = (sys.argv[1],"") - numOfSeries = 1 -elif len(sys.argv) == 3: - inputFileNames = (sys.argv[1],sys.argv[2]) - numOfSeries = 2 -elif len(sys.argv) == 4: - inputFileNames = (sys.argv[1],sys.argv[2],sys.argv[3]) - numOfSeries = 3 -elif len(sys.argv) == 5: - inputFileNames = (sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4]) - numOfSeries = 4 -elif len(sys.argv) == 6: - inputFileNames = (sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4],sys.argv[5]) - numOfSeries = 5 -elif len(sys.argv) == 7: - inputFileNames = (sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4],sys.argv[5],sys.argv[6]) - numOfSeries = 6 +if len(sys.argv) == 1: + print "Please specify an input file in the arguments." + sys.exit() +else: + for fileName in sys.argv[1:]: + inputFileNames.append(fileName) + numOfSeries = int(len(sys.argv) - 1) # Get the names of the branch, Git revision, hydro scheme and hydro kernel def parse_header(inputFile): @@ -105,23 +98,35 @@ def parse_header(inputFile): # Parse file and return total time taken, speed up and parallel efficiency def parse_files(): - times = [] totalTime = [] - serialTime = [] + sumTotal = [] speedUp = [] parallelEff = [] - - for i in range(0,numOfSeries): # Loop over each data series + for i in range(0,numOfSeries): # Loop over each data series + + # Get path to set of files + path, name = ntpath.split(inputFileNames[i]) + # Get each file that starts with the cmd line arg file_list = glob.glob(inputFileNames[i] + "*") - + threadList.append([]) + # Remove path from file names + for j in range(0,len(file_list)): + p, filename = ntpath.split(file_list[j]) + file_list[j] = filename + # Create a list of threads using the list of files for fileName in file_list: s = re.split(r'[_.]+',fileName) threadList[i].append(int(s[1])) + + # Re-add path once each file has been found + if len(path) != 0: + for j in range(0,len(file_list)): + file_list[j] = path + '/' + file_list[j] # Sort the thread list in ascending order and save the indices sorted_indices = np.argsort(threadList[i]) @@ -133,38 +138,50 @@ def parse_files(): parse_header(file_list[0]) branch[i] = branch[i].replace("_", "\\_") - - version.append("$\\textrm{%s}$"%str(branch[i]) + " " + revision[i] + "\n" + hydro_scheme[i] + - "\n" + hydro_kernel[i] + r", $N_{ngb}=%d$"%float(hydro_neighbours[i]) + - r", $\eta=%.3f$"%float(hydro_eta[i])) - times.append([]) + + #version.append("$\\textrm{%s}$"%str(branch[i]))# + " " + revision[i])# + "\n" + hydro_scheme[i] + +# "\n" + hydro_kernel[i] + r", $N_{ngb}=%d$"%float(hydro_neighbours[i]) + +# r", $\eta=%.3f$"%float(hydro_eta[i])) totalTime.append([]) speedUp.append([]) parallelEff.append([]) - + # Loop over all files for a given series and load the times for j in range(0,len(file_list)): - times[i].append([]) - times[i][j].append(np.loadtxt(file_list[j],usecols=(5,), skiprows=11)) - totalTime[i].append(np.sum(times[i][j])) + times = np.loadtxt(file_list[j],usecols=(6,)) + updates = np.loadtxt(file_list[j],usecols=(3,)) + totalTime[i].append(np.sum(times)) + + sumTotal.append(np.sum(totalTime[i])) - serialTime.append(totalTime[i][0]) - - # Loop over all files for a given series and calculate speed up and parallel efficiency + # Sort the total times in descending order + sorted_indices = np.argsort(sumTotal)[::-1] + + totalTime = [ totalTime[j] for j in sorted_indices] + branchNew = [ branch[j] for j in sorted_indices] + + for i in range(0,numOfSeries): + version.append("$\\textrm{%s}$"%str(branchNew[i])) + + global numTimesteps + numTimesteps = len(times) + + # Find speed-up and parallel efficiency + for i in range(0,numOfSeries): for j in range(0,len(file_list)): - speedUp[i].append(serialTime[i] / totalTime[i][j]) + speedUp[i].append(totalTime[i][0] / totalTime[i][j]) parallelEff[i].append(speedUp[i][j] / threadList[i][j]) - return (times,totalTime,speedUp,parallelEff) + return (totalTime,speedUp,parallelEff) -def print_results(times,totalTime,parallelEff,version): +def print_results(totalTime,parallelEff,version): for i in range(0,numOfSeries): print " " print "------------------------------------" print version[i] print "------------------------------------" - print "Wall clock time for: {} time steps".format(len(times[0][0][0])) + print "Wall clock time for: {} time steps".format(numTimesteps) print "------------------------------------" for j in range(0,len(threadList[i])): @@ -172,7 +189,7 @@ def print_results(times,totalTime,parallelEff,version): print " " print "------------------------------------" - print "Parallel Efficiency for: {} time steps".format(len(times[0][0][0])) + print "Parallel Efficiency for: {} time steps".format(numTimesteps) print "------------------------------------" for j in range(0,len(threadList[i])): @@ -180,8 +197,18 @@ def print_results(times,totalTime,parallelEff,version): return -def plot_results(times,totalTime,speedUp,parallelEff): +# Returns a lighter/darker version of the colour +def color_variant(hex_color, brightness_offset=1): + + rgb_hex = [hex_color[x:x+2] for x in [1, 3, 5]] + new_rgb_int = [int(hex_value, 16) + brightness_offset for hex_value in rgb_hex] + new_rgb_int = [min([255, max([0, i])]) for i in new_rgb_int] # make sure new values are between 0 and 255 + # hex() produces "0x88", we want just "88" + return "#" + "".join([hex(i)[2:] for i in new_rgb_int]) + +def plot_results(totalTime,speedUp,parallelEff,numSeries): + fig, axarr = plt.subplots(2, 2, figsize=(10,10), frameon=True) speedUpPlot = axarr[0, 0] parallelEffPlot = axarr[0, 1] @@ -190,27 +217,27 @@ def plot_results(times,totalTime,speedUp,parallelEff): # Plot speed up speedUpPlot.plot(threadList[0],threadList[0], linestyle='--', lw=1.5, color='0.2') - for i in range(0,numOfSeries): - speedUpPlot.plot(threadList[i],speedUp[i],linestyle[i],label=version[i]) - + for i in range(0,numSeries): + speedUpPlot.plot(threadList[0],speedUp[i],linestyle[i],label=version[i]) + speedUpPlot.set_ylabel("${\\rm Speed\\textendash up}$", labelpad=0.) speedUpPlot.set_xlabel("${\\rm Threads}$", labelpad=0.) - speedUpPlot.set_xlim([0.7,threadList[i][-1]+1]) - speedUpPlot.set_ylim([0.7,threadList[i][-1]+1]) + speedUpPlot.set_xlim([0.7,threadList[0][-1]+1]) + speedUpPlot.set_ylim([0.7,threadList[0][-1]+1]) # Plot parallel efficiency parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [1,1], 'k--', lw=1.5, color='0.2') parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.9,0.9], 'k--', lw=1.5, color='0.2') parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.75,0.75], 'k--', lw=1.5, color='0.2') parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.5,0.5], 'k--', lw=1.5, color='0.2') - for i in range(0,numOfSeries): - parallelEffPlot.plot(threadList[i],parallelEff[i],linestyle[i]) + for i in range(0,numSeries): + parallelEffPlot.plot(threadList[0],parallelEff[i],linestyle[i]) parallelEffPlot.set_xscale('log') parallelEffPlot.set_ylabel("${\\rm Parallel~efficiency}$", labelpad=0.) parallelEffPlot.set_xlabel("${\\rm Threads}$", labelpad=0.) parallelEffPlot.set_ylim([0,1.1]) - parallelEffPlot.set_xlim([0.9,10**(np.floor(np.log10(threadList[i][-1]))+0.5)]) + parallelEffPlot.set_xlim([0.9,10**(np.floor(np.log10(threadList[0][-1]))+0.5)]) # Plot time to solution for i in range(0,numOfSeries): @@ -218,20 +245,15 @@ def plot_results(times,totalTime,speedUp,parallelEff): totalTimePlot.loglog(pts,totalTime[i][0]/pts, 'k--', lw=1., color='0.2') totalTimePlot.loglog(threadList[i],totalTime[i],linestyle[i],label=version[i]) - y_min = 10**np.floor(np.log10(np.min(totalTime[:][-1])*0.6)) - y_max = 1.2*10**np.floor(np.log10(np.max(totalTime[:][0]) * 1.5)+1) + y_min = 10**np.floor(np.log10(np.min(totalTime[:][0])*0.6)) + y_max = 1.0*10**np.floor(np.log10(np.max(totalTime[:][0]) * 1.5)+1) totalTimePlot.set_xscale('log') totalTimePlot.set_xlabel("${\\rm Threads}$", labelpad=0.) totalTimePlot.set_ylabel("${\\rm Time~to~solution}~[{\\rm ms}]$", labelpad=0.) - totalTimePlot.set_xlim([0.9, 10**(np.floor(np.log10(threadList[i][-1]))+0.5)]) + totalTimePlot.set_xlim([0.9, 10**(np.floor(np.log10(threadList[0][-1]))+0.5)]) totalTimePlot.set_ylim(y_min, y_max) - ax2 = totalTimePlot.twinx() - ax2.set_yscale('log') - ax2.set_ylabel("${\\rm Time~to~solution}~[{\\rm hr}]$", labelpad=0.) - ax2.set_ylim(y_min / 3.6e6, y_max / 3.6e6) - - totalTimePlot.legend(bbox_to_anchor=(1.21, 0.97), loc=2, borderaxespad=0.,prop={'size':12}, frameon=False) + totalTimePlot.legend(bbox_to_anchor=(1.21, 0.97), loc=2, borderaxespad=0.,prop={'size':12}, frameon=False,title=legendTitle) emptyPlot.axis('off') for i, txt in enumerate(threadList[0]): @@ -240,17 +262,19 @@ def plot_results(times,totalTime,speedUp,parallelEff): parallelEffPlot.annotate("$%s$"%txt, (threadList[0][i],parallelEff[0][i]), (threadList[0][i], parallelEff[0][i]+0.02), color=hexcols[0]) totalTimePlot.annotate("$%s$"%txt, (threadList[0][i],totalTime[0][i]), (threadList[0][i], totalTime[0][i]*1.1), color=hexcols[0]) - #fig.suptitle("Thread Speed Up, Parallel Efficiency and Time To Solution for {} Time Steps of Cosmo Volume\n Cmd Line: {}, Platform: {}".format(len(times[0][0][0]),cmdLine,platform)) - fig.suptitle("${\\rm Speed\\textendash up,~parallel~efficiency~and~time~to~solution~for}~%d~{\\rm time\\textendash steps}$"%len(times[0][0][0]), fontsize=16) + #fig.suptitle("Thread Speed Up, Parallel Efficiency and Time To Solution for {} Time Steps of Cosmo Volume\n Cmd Line: {}, Platform: {}".format(numTimesteps),cmdLine,platform)) + fig.suptitle("${\\rm Speed\\textendash up,~parallel~efficiency~and~time~to~solution~for}~%d~{\\rm time\\textendash steps}$"%numTimesteps, fontsize=16) return # Calculate results -(times,totalTime,speedUp,parallelEff) = parse_files() +(totalTime,speedUp,parallelEff) = parse_files() + +legendTitle = version[0] -plot_results(times,totalTime,speedUp,parallelEff) +plot_results(totalTime,speedUp,parallelEff,numOfSeries) -print_results(times,totalTime,parallelEff,version) +print_results(totalTime,parallelEff,version) # And plot plt.show() diff --git a/examples/plot_scaling_results_breakdown.py b/examples/plot_scaling_results_breakdown.py new file mode 100755 index 0000000000000000000000000000000000000000..92a9564c326b9a4d33c047a87f7792d257c96b69 --- /dev/null +++ b/examples/plot_scaling_results_breakdown.py @@ -0,0 +1,289 @@ +#!/usr/bin/env python +# +# Usage: +# python plot_scaling_results.py input-file1-ext input-file2-ext ... +# +# Description: +# Plots speed up, parallel efficiency and time to solution given a "timesteps" output file generated by SWIFT. +# +# Example: +# python plot_scaling_results.py _hreads_cosma_stdout.txt _threads_knl_stdout.txt +# +# The working directory should contain files 1_threads_cosma_stdout.txt - 64_threads_cosma_stdout.txt and 1_threads_knl_stdout.txt - 64_threads_knl_stdout.txt, i.e wall clock time for each run using a given number of threads + +import sys +import glob +import re +import numpy as np +import matplotlib.pyplot as plt +import scipy.stats +import ntpath + +params = {'axes.labelsize': 14, +'axes.titlesize': 18, +'font.size': 12, +'legend.fontsize': 12, +'xtick.labelsize': 14, +'ytick.labelsize': 14, +'text.usetex': True, +'figure.subplot.left' : 0.055, +'figure.subplot.right' : 0.98 , +'figure.subplot.bottom' : 0.05 , +'figure.subplot.top' : 0.95 , +'figure.subplot.wspace' : 0.14 , +'figure.subplot.hspace' : 0.12 , +'lines.markersize' : 6, +'lines.linewidth' : 3., +'text.latex.unicode': True +} +plt.rcParams.update(params) +plt.rc('font',**{'family':'sans-serif','sans-serif':['Times']}) + +version = [] +branch = [] +revision = [] +hydro_scheme = [] +hydro_kernel = [] +hydro_neighbours = [] +hydro_eta = [] +threadList = [] +hexcols = ['#332288', '#88CCEE', '#44AA99', '#117733', '#999933', '#DDCC77', + '#CC6677', '#882255', '#AA4499', '#661100', '#6699CC', '#AA4466', + '#4477AA'] +linestyle = (hexcols[0],hexcols[1],hexcols[3],hexcols[5],hexcols[6],hexcols[8],hexcols[2],hexcols[4],hexcols[7],hexcols[9]) +numTimesteps = 0 +legendTitle = ' ' + +inputFileNames = [] + +# Work out how many data series there are +if len(sys.argv) == 1: + print "Please specify an input file in the arguments." + sys.exit() +else: + for fileName in sys.argv[1:]: + inputFileNames.append(fileName) + numOfSeries = int(len(sys.argv) - 1) + +# Get the names of the branch, Git revision, hydro scheme and hydro kernel +def parse_header(inputFile): + with open(inputFile, 'r') as f: + found_end = False + for line in f: + if 'Branch:' in line: + s = line.split() + line = s[2:] + branch.append(" ".join(line)) + elif 'Revision:' in line: + s = line.split() + revision.append(s[2]) + elif 'Hydrodynamic scheme:' in line: + line = line[2:-1] + s = line.split() + line = s[2:] + hydro_scheme.append(" ".join(line)) + elif 'Hydrodynamic kernel:' in line: + line = line[2:-1] + s = line.split() + line = s[2:5] + hydro_kernel.append(" ".join(line)) + elif 'neighbours:' in line: + s = line.split() + hydro_neighbours.append(s[4]) + elif 'Eta:' in line: + s = line.split() + hydro_eta.append(s[2]) + return + +# Parse file and return total time taken, speed up and parallel efficiency +def parse_files(): + + totalTime = [] + sumTotal = [] + speedUp = [] + parallelEff = [] + + for i in range(0,numOfSeries): # Loop over each data series + + # Get path to set of files + path, name = ntpath.split(inputFileNames[i]) + + # Get each file that starts with the cmd line arg + file_list = glob.glob(inputFileNames[i] + "*") + + threadList.append([]) + + # Remove path from file names + for j in range(0,len(file_list)): + p, filename = ntpath.split(file_list[j]) + file_list[j] = filename + + # Create a list of threads using the list of files + for fileName in file_list: + s = re.split(r'[_.]+',fileName) + threadList[i].append(int(s[1])) + + # Re-add path once each file has been found + if len(path) != 0: + for j in range(0,len(file_list)): + file_list[j] = path + '/' + file_list[j] + + # Sort the thread list in ascending order and save the indices + sorted_indices = np.argsort(threadList[i]) + threadList[i].sort() + + # Sort the file list in ascending order acording to the thread number + file_list = [ file_list[j] for j in sorted_indices] + + parse_header(file_list[0]) + + branch[i] = branch[i].replace("_", "\\_") + + + #version.append("$\\textrm{%s}$"%str(branch[i]))# + " " + revision[i])# + "\n" + hydro_scheme[i] + +# "\n" + hydro_kernel[i] + r", $N_{ngb}=%d$"%float(hydro_neighbours[i]) + +# r", $\eta=%.3f$"%float(hydro_eta[i])) + totalTime.append([]) + speedUp.append([]) + parallelEff.append([]) + + # Loop over all files for a given series and load the times + for j in range(0,len(file_list)): + times = np.loadtxt(file_list[j],usecols=(6,), skiprows=11) + updates = np.loadtxt(file_list[j],usecols=(3,), skiprows=11) + totalTime[i].append(np.sum(times)) + + sumTotal.append(np.sum(totalTime[i])) + + # Sort the total times in descending order + sorted_indices = np.argsort(sumTotal)[::-1] + + totalTime = [ totalTime[j] for j in sorted_indices] + branchNew = [ branch[j] for j in sorted_indices] + + for i in range(0,numOfSeries): + version.append("$\\textrm{%s}$"%str(branchNew[i])) + + global numTimesteps + numTimesteps = len(times) + + # Find speed-up and parallel efficiency + for i in range(0,numOfSeries): + for j in range(0,len(file_list)): + speedUp[i].append(totalTime[i][0] / totalTime[i][j]) + parallelEff[i].append(speedUp[i][j] / threadList[i][j]) + + return (totalTime,speedUp,parallelEff) + +def print_results(totalTime,parallelEff,version): + + for i in range(0,numOfSeries): + print " " + print "------------------------------------" + print version[i] + print "------------------------------------" + print "Wall clock time for: {} time steps".format(numTimesteps) + print "------------------------------------" + + for j in range(0,len(threadList[i])): + print str(threadList[i][j]) + " threads: {}".format(totalTime[i][j]) + + print " " + print "------------------------------------" + print "Parallel Efficiency for: {} time steps".format(numTimesteps) + print "------------------------------------" + + for j in range(0,len(threadList[i])): + print str(threadList[i][j]) + " threads: {}".format(parallelEff[i][j]) + + return + +# Returns a lighter/darker version of the colour +def color_variant(hex_color, brightness_offset=1): + + rgb_hex = [hex_color[x:x+2] for x in [1, 3, 5]] + new_rgb_int = [int(hex_value, 16) + brightness_offset for hex_value in rgb_hex] + new_rgb_int = [min([255, max([0, i])]) for i in new_rgb_int] # make sure new values are between 0 and 255 + # hex() produces "0x88", we want just "88" + + return "#" + "".join([hex(i)[2:] for i in new_rgb_int]) + +def plot_results(totalTime,speedUp,parallelEff,numSeries): + + fig, axarr = plt.subplots(2, 2, figsize=(10,10), frameon=True) + speedUpPlot = axarr[0, 0] + parallelEffPlot = axarr[0, 1] + totalTimePlot = axarr[1, 0] + emptyPlot = axarr[1, 1] + + # Plot speed up + speedUpPlot.plot(threadList[0],threadList[0], linestyle='--', lw=1.5, color='0.2') + for i in range(0,numSeries): + speedUpPlot.plot(threadList[0],speedUp[i],linestyle[i],label=version[i]) + + speedUpPlot.set_ylabel("${\\rm Speed\\textendash up}$", labelpad=0.) + speedUpPlot.set_xlabel("${\\rm Threads}$", labelpad=0.) + speedUpPlot.set_xlim([0.7,threadList[0][-1]+1]) + speedUpPlot.set_ylim([0.7,threadList[0][-1]+1]) + + # Plot parallel efficiency + parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [1,1], 'k--', lw=1.5, color='0.2') + parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.9,0.9], 'k--', lw=1.5, color='0.2') + parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.75,0.75], 'k--', lw=1.5, color='0.2') + parallelEffPlot.plot([threadList[0][0], 10**np.floor(np.log10(threadList[0][-1])+1)], [0.5,0.5], 'k--', lw=1.5, color='0.2') + for i in range(0,numSeries): + parallelEffPlot.plot(threadList[0],parallelEff[i],linestyle[i]) + + parallelEffPlot.set_xscale('log') + parallelEffPlot.set_ylabel("${\\rm Parallel~efficiency}$", labelpad=0.) + parallelEffPlot.set_xlabel("${\\rm Threads}$", labelpad=0.) + parallelEffPlot.set_ylim([0,1.1]) + parallelEffPlot.set_xlim([0.9,10**(np.floor(np.log10(threadList[0][-1]))+0.5)]) + + # Plot time to solution + for i in range(0,numSeries): + for j in range(0,len(threadList[0])): + totalTime[i][j] = totalTime[i][j] * threadList[i][j] + if i > 1: + totalTime[i][j] = totalTime[i][j] + totalTime[i-1][j] + totalTimePlot.plot(threadList[0],totalTime[i],linestyle[i],label=version[i]) + + if i > 1: + colour = color_variant(linestyle[i],100) + totalTimePlot.fill_between(threadList[0],np.array(totalTime[i]),np.array(totalTime[i-1]), facecolor=colour) + elif i==1: + colour = color_variant(linestyle[i],100) + totalTimePlot.fill_between(threadList[0], totalTime[i],facecolor=colour) + + totalTimePlot.set_xscale('log') + totalTimePlot.ticklabel_format(style='sci', axis='y', scilimits=(0,0)) + totalTimePlot.set_xlabel("${\\rm Threads}$", labelpad=0.) + totalTimePlot.set_ylabel("${\\rm Time~to~solution~x~No.~of~cores}~[{\\rm ms}]$", labelpad=0.) + totalTimePlot.set_xlim([0.9, 10**(np.floor(np.log10(threadList[0][-1]))+0.5)]) + #totalTimePlot.set_ylim(y_min, y_max) + + totalTimePlot.legend(bbox_to_anchor=(1.21, 0.97), loc=2, borderaxespad=0.,prop={'size':12}, frameon=False,title=legendTitle) + emptyPlot.axis('off') + + for i, txt in enumerate(threadList[0]): + if 2**np.floor(np.log2(threadList[0][i])) == threadList[0][i]: # only powers of 2 + speedUpPlot.annotate("$%s$"%txt, (threadList[0][i],speedUp[0][i]), (threadList[0][i],speedUp[0][i] + 0.3), color=hexcols[0]) + parallelEffPlot.annotate("$%s$"%txt, (threadList[0][i],parallelEff[0][i]), (threadList[0][i], parallelEff[0][i]+0.02), color=hexcols[0]) + totalTimePlot.annotate("$%s$"%txt, (threadList[0][i],totalTime[0][i]), (threadList[0][i], totalTime[0][i]*1.1), color=hexcols[0]) + + #fig.suptitle("Thread Speed Up, Parallel Efficiency and Time To Solution for {} Time Steps of Cosmo Volume\n Cmd Line: {}, Platform: {}".format(numTimesteps),cmdLine,platform)) + fig.suptitle("${\\rm Speed\\textendash up,~parallel~efficiency~and~time~to~solution~x~no.~of~cores~for}~%d~{\\rm time\\textendash steps}$"%numTimesteps, fontsize=16) + + return + +# Calculate results +(totalTime,speedUp,parallelEff) = parse_files() + +legendTitle = version[0] + +plot_results(totalTime,speedUp,parallelEff,numOfSeries) + +print_results(totalTime,parallelEff,version) + +# And plot +plt.show() diff --git a/src/Makefile.am b/src/Makefile.am index 14e435f663f01d8faa5f12720398b58633300093..1da7a0d955e488c0b96ee209080b5438356a36bc 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -46,7 +46,7 @@ include_HEADERS = space.h runner.h queue.h task.h lock.h cell.h part.h const.h \ hydro_properties.h riemann.h threadpool.h cooling.h cooling_struct.h sourceterms.h \ sourceterms_struct.h statistics.h memswap.h cache.h runner_doiact_vec.h profiler.h \ dump.h logger.h active.h timeline.h xmf.h gravity_properties.h gravity_derivatives.h \ - vector_power.h hydro_space.h sort_part.h + vector_power.h collectgroup.h hydro_space.h sort_part.h # Common source files AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \ @@ -57,7 +57,7 @@ AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \ runner_doiact_fft.c threadpool.c cooling.c sourceterms.c \ statistics.c runner_doiact_vec.c profiler.c dump.c logger.c \ part_type.c xmf.c gravity_properties.c gravity.c \ - hydro_space.c + collectgroup.c hydro_space.c # Include files for distribution, not installation. nobase_noinst_HEADERS = align.h approx_math.h atomic.h cycle.h error.h inline.h kernel_hydro.h kernel_gravity.h \ diff --git a/src/collectgroup.c b/src/collectgroup.c new file mode 100644 index 0000000000000000000000000000000000000000..d115f5ab1b612fc0c82fb219fca392c5e28d7a6f --- /dev/null +++ b/src/collectgroup.c @@ -0,0 +1,200 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2017 Peter W. Draper (p.w.draper@durham.ac.uk) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ + +/* Config parameters. */ +#include "../config.h" + +/* MPI headers. */ +#ifdef WITH_MPI +#include <mpi.h> +#endif + +/* This object's header. */ +#include "collectgroup.h" + +/* Local headers. */ +#include "engine.h" +#include "error.h" + +#ifdef WITH_MPI + +/* Local collections for MPI reduces. */ +struct mpicollectgroup1 { + size_t updates, g_updates, s_updates; + integertime_t ti_end_min; + int forcerebuild; +}; + +/* Forward declarations. */ +static void mpicollect_create_MPI_type(); + +/** + * @brief MPI datatype for the #mpicollectgroup1 structure. + */ +static MPI_Datatype mpicollectgroup1_type; + +/** + * @brief MPI operator to reduce #mpicollectgroup1 structures. + */ +static MPI_Op mpicollectgroup1_reduce_op; + +#endif + +/** + * @brief Perform any once only initialisations. Must be called once. + */ +void collectgroup_init() { + +#ifdef WITH_MPI + /* Initialise the MPI types. */ + mpicollect_create_MPI_type(); +#endif +} + +/** + * @brief Apply the collectgroup1 values to the engine by copying all the + * values to the engine fields. + * + * @param grp1 The #collectgroup1 + * @param e The #engine + */ +void collectgroup1_apply(struct collectgroup1 *grp1, struct engine *e) { + e->ti_end_min = grp1->ti_end_min; + e->ti_end_max = grp1->ti_end_max; + e->ti_beg_max = grp1->ti_beg_max; + e->updates = grp1->updates; + e->g_updates = grp1->g_updates; + e->s_updates = grp1->s_updates; + e->forcerebuild = grp1->forcerebuild; +} + +/** + * @brief Initialises a collectgroup1 struct ready for processing. + * + * @param grp1 The #collectgroup1 to initialise + * @param updates the number of updated hydro particles on this node this step. + * @param g_updates the number of updated gravity particles on this node this step. + * @param s_updates the number of updated star particles on this node this step. + * @param ti_end_min the minimum end time for next time step after this step. + * @param ti_end_max the maximum end time for next time step after this step. + * @param ti_beg_max the maximum begin time for next time step after this step. + * @param forcerebuild whether a rebuild is required after this step. + */ +void collectgroup1_init(struct collectgroup1 *grp1, size_t updates, + size_t g_updates, size_t s_updates, + integertime_t ti_end_min, + integertime_t ti_end_max, + integertime_t ti_beg_max, + int forcerebuild) { + grp1->updates = updates; + grp1->g_updates = g_updates; + grp1->s_updates = s_updates; + grp1->ti_end_min = ti_end_min; + grp1->ti_end_max = ti_end_max; + grp1->ti_beg_max = ti_beg_max; + grp1->forcerebuild = forcerebuild; +} + +/** + * @brief Do any processing necessary to the group before it can be used. + * + * This may involve an MPI reduction across all nodes. + * + * @param grp1 the #collectgroup1 struct already initialised by a call + * to collectgroup1_init. + */ +void collectgroup1_reduce(struct collectgroup1 *grp1) { + +#ifdef WITH_MPI + + /* Populate an MPI group struct and reduce this across all nodes. */ + struct mpicollectgroup1 mpigrp11; + mpigrp11.updates = grp1->updates; + mpigrp11.g_updates = grp1->g_updates; + mpigrp11.s_updates = grp1->s_updates; + mpigrp11.ti_end_min = grp1->ti_end_min; + mpigrp11.forcerebuild = grp1->forcerebuild; + + struct mpicollectgroup1 mpigrp12; + if (MPI_Allreduce(&mpigrp11, &mpigrp12, 1, mpicollectgroup1_type, + mpicollectgroup1_reduce_op, MPI_COMM_WORLD) + != MPI_SUCCESS) + error("Failed to reduce mpicollection1."); + + /* And update. */ + grp1->updates = mpigrp12.updates; + grp1->g_updates = mpigrp12.g_updates; + grp1->s_updates = mpigrp12.s_updates; + grp1->ti_end_min = mpigrp12.ti_end_min; + grp1->forcerebuild = mpigrp12.forcerebuild; + +#endif +} + + +#ifdef WITH_MPI +/** + * @brief Do the reduction of two structs. + * + * @param mpigrp11 the first struct, this is updated on exit. + * @param mpigrp12 the second struct + */ +static void doreduce1(struct mpicollectgroup1 *mpigrp11, + const struct mpicollectgroup1 *mpigrp12) { + + /* Do what is needed for each part of the collection. */ + /* Sum of updates. */ + mpigrp11->updates += mpigrp12->updates; + mpigrp11->g_updates += mpigrp12->g_updates; + mpigrp11->s_updates += mpigrp12->s_updates; + + /* Minimum end time. */ + mpigrp11->ti_end_min = min(mpigrp11->ti_end_min, mpigrp12->ti_end_min); + + /* Everyone must agree to not rebuild. */ + if (mpigrp11->forcerebuild || mpigrp12->forcerebuild) + mpigrp11->forcerebuild = 1; +} + +/** + * @brief MPI reduce operator for #mpicollectgroup structures. + */ +static void mpicollectgroup1_reduce(void *in, void *inout, int *len, + MPI_Datatype *datatype) { + + for (int i = 0; i < *len; ++i) + doreduce1(&((struct mpicollectgroup1 *)inout)[0], + &((const struct mpicollectgroup1 *)in)[i]); +} + +/** + * @brief Registers any MPI collection types and reduction functions. + */ +static void mpicollect_create_MPI_type() { + + if (MPI_Type_contiguous(sizeof(struct mpicollectgroup1), MPI_BYTE, + &mpicollectgroup1_type) != MPI_SUCCESS || + MPI_Type_commit(&mpicollectgroup1_type) != MPI_SUCCESS) { + error("Failed to create MPI type for mpicollection1."); + } + + /* Create the reduction operation */ + MPI_Op_create(mpicollectgroup1_reduce, 1, &mpicollectgroup1_reduce_op); +} +#endif diff --git a/src/collectgroup.h b/src/collectgroup.h new file mode 100644 index 0000000000000000000000000000000000000000..f9b8e9ccca335a1fdfe2d6cd60af1573754feccd --- /dev/null +++ b/src/collectgroup.h @@ -0,0 +1,57 @@ +/******************************************************************************* + * This file is part of SWIFT. + * Copyright (c) 2017 Peter W. Draper (p.w.draper@durham.ac.uk) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + ******************************************************************************/ +#ifndef SWIFT_COLLECTGROUP_H +#define SWIFT_COLLECTGROUP_H + +/* Config parameters. */ +#include "../config.h" + +/* Standard headers. */ +#include <stddef.h> + +/* Local headers. */ +#include "timeline.h" + +/* Forward declaration of engine struct (to avoid cyclic include). */ +struct engine; + +/* A collection of global quantities that can be processed at the same time. */ +struct collectgroup1 { + + /* Number of particles updated */ + size_t updates, g_updates, s_updates; + + /* Times for the time-step */ + integertime_t ti_end_min, ti_end_max, ti_beg_max; + + /* Force the engine to rebuild? */ + int forcerebuild; +}; + +void collectgroup_init(); +void collectgroup1_apply(struct collectgroup1 *grp1, struct engine *e); +void collectgroup1_init(struct collectgroup1 *grp1, size_t updates, + size_t g_updates, size_t s_updates, + integertime_t ti_end_min, + integertime_t ti_end_max, + integertime_t ti_beg_max, + int forcerebuild); +void collectgroup1_reduce(struct collectgroup1 *grp1); + +#endif /* SWIFT_COLLECTGROUP_H */ diff --git a/src/engine.c b/src/engine.c index e22e5d569aa70f4add5a57db3e0b6f5ac9c66404..0a0091a38bc198e79960ba74fb6edf4e61b1245d 100644 --- a/src/engine.c +++ b/src/engine.c @@ -2805,12 +2805,23 @@ void engine_collect_kick(struct cell *c) { } /** - * @brief Collects the next time-step by making each super-cell recurse - * to collect the minimal of ti_end and the number of updated particles. + * @brief Collects the next time-step and rebuild flag. + * + * The next time-step is determined by making each super-cell recurse to + * collect the minimal of ti_end and the number of updated particles. When in + * MPI mode this routines reduces these across all nodes and also collects the + * forcerebuild flag -- this is so that we only use a single collective MPI + * call per step for all these values. + * + * Note that the results are stored in e->collect_group1 struct not in the + * engine fields, unless apply is true. These can be applied field-by-field + * or all at once using collectgroup1_copy(); * * @param e The #engine. + * @param apply whether to apply the results to the engine or just keep in the + * group1 struct. */ -void engine_collect_timestep(struct engine *e) { +void engine_collect_timestep_and_rebuild(struct engine *e, int apply) { const ticks tic = getticks(); int updates = 0, g_updates = 0, s_updates = 0; @@ -2840,37 +2851,58 @@ void engine_collect_timestep(struct engine *e) { } } -/* Aggregate the data from the different nodes. */ + /* Store these in the temporary collection group. */ + collectgroup1_init(&e->collect_group1, updates,g_updates, s_updates, + ti_end_min,ti_end_max,ti_beg_max,e->forcerebuild); + +/* Aggregate collective data from the different nodes for this step. */ #ifdef WITH_MPI + collectgroup1_reduce(&e->collect_group1); + +#ifdef SWIFT_DEBUG_CHECKS { + /* Check the above using the original MPI calls. */ integertime_t in_i[1], out_i[1]; in_i[0] = 0; out_i[0] = ti_end_min; if (MPI_Allreduce(out_i, in_i, 1, MPI_LONG_LONG_INT, MPI_MIN, MPI_COMM_WORLD) != MPI_SUCCESS) - error("Failed to aggregate t_end_min."); - ti_end_min = in_i[0]; - } - { + error("Failed to aggregate ti_end_min."); + if (in_i[0] != (long long)e->collect_group1.ti_end_min) + error("Failed to get same ti_end_min, is %lld, should be %lld", + in_i[0], e->collect_group1.ti_end_min); + long long in_ll[3], out_ll[3]; out_ll[0] = updates; out_ll[1] = g_updates; out_ll[2] = s_updates; if (MPI_Allreduce(out_ll, in_ll, 3, MPI_LONG_LONG_INT, MPI_SUM, MPI_COMM_WORLD) != MPI_SUCCESS) - error("Failed to aggregate energies."); - updates = in_ll[0]; - g_updates = in_ll[1]; - s_updates = in_ll[2]; + error("Failed to aggregate particle counts."); + if (in_ll[0] != (long long)e->collect_group1.updates) + error("Failed to get same updates, is %lld, should be %ld", + in_ll[0], e->collect_group1.updates); + if (in_ll[1] != (long long)e->collect_group1.g_updates) + error("Failed to get same g_updates, is %lld, should be %ld", + in_ll[1], e->collect_group1.g_updates); + if (in_ll[2] != (long long)e->collect_group1.s_updates) + error("Failed to get same s_updates, is %lld, should be %ld", + in_ll[2], e->collect_group1.s_updates); + + int buff = 0; + if (MPI_Allreduce(&e->forcerebuild, &buff, 1, MPI_INT, MPI_MAX, + MPI_COMM_WORLD) != MPI_SUCCESS) + error("Failed to aggregate the rebuild flag across nodes."); + if (!!buff != !!e->collect_group1.forcerebuild) + error("Failed to get same rebuild flag from all nodes, is %d," + "should be %d", buff, e->collect_group1.forcerebuild); } +#endif #endif - e->ti_end_min = ti_end_min; - e->ti_end_max = ti_end_max; - e->ti_beg_max = ti_beg_max; - e->updates = updates; - e->g_updates = g_updates; - e->s_updates = s_updates; + /* Apply to the engine, if requested. */ + if (apply) + collectgroup1_apply(&e->collect_group1, e); if (e->verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), @@ -3112,7 +3144,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs) { #endif /* Recover the (integer) end of the next time-step */ - engine_collect_timestep(e); + engine_collect_timestep_and_rebuild(e, 1); clocks_gettime(&time2); @@ -3212,14 +3244,12 @@ void engine_step(struct engine *e) { gravity_exact_force_check(e->s, e, 1e-1); #endif -/* Collect the values of rebuild from all nodes. */ -#ifdef WITH_MPI - int buff = 0; - if (MPI_Allreduce(&e->forcerebuild, &buff, 1, MPI_INT, MPI_MAX, - MPI_COMM_WORLD) != MPI_SUCCESS) - error("Failed to aggregate the rebuild flag across nodes."); - e->forcerebuild = buff; -#endif + /* Collect the values of rebuild from all nodes and recover the (integer) + * end of the next time-step. Do these together to reduce the collective MPI + * calls per step, but some of the gathered information is not applied just + * yet (in case we save a snapshot or drift). */ + engine_collect_timestep_and_rebuild(e, 0); + e->forcerebuild = e->collect_group1.forcerebuild; /* Save some statistics ? */ if (e->time - e->timeLastStatistics >= e->deltaTimeStatistics) { @@ -3255,8 +3285,8 @@ void engine_step(struct engine *e) { e->timeLastStatistics += e->deltaTimeStatistics; } - /* Recover the (integer) end of the next time-step */ - engine_collect_timestep(e); + /* Now apply all the collected time step updates and particle counts. */ + collectgroup1_apply(&e->collect_group1, e); TIMER_TOC2(timer_step); @@ -4041,12 +4071,15 @@ void engine_init(struct engine *e, struct space *s, /* Find the time of the first output */ engine_compute_next_snapshot_time(e); -/* Construct types for MPI communications */ + /* Construct types for MPI communications */ #ifdef WITH_MPI part_create_mpi_types(); stats_create_MPI_type(); #endif + /* Initialise the collection group. */ + collectgroup_init(); + /* Initialize the threadpool. */ threadpool_init(&e->threadpool, e->nr_threads); diff --git a/src/engine.h b/src/engine.h index e4611aec9a07d88ba01335286d16950580d7f629..22ee122bb082895131584942bde50509952b98ff 100644 --- a/src/engine.h +++ b/src/engine.h @@ -38,6 +38,7 @@ /* Includes. */ #include "clocks.h" +#include "collectgroup.h" #include "cooling_struct.h" #include "gravity_properties.h" #include "parser.h" @@ -243,6 +244,10 @@ struct engine { /* The (parsed) parameter file */ const struct swift_params *parameter_file; + + /* Temporary struct to hold a group of deferable properties (in MPI mode + * these are reduced together, but may not be required just yet). */ + struct collectgroup1 collect_group1; }; /* Function prototypes. */