plot_task_dependencies.py 11.49 KiB
#!/usr/bin/env python3
"""
This file generates a graphviz file that represents the SWIFT tasks
dependencies.
Example: ./plot_task_dependencies.py dependency_graph_*.csv
"""
from pandas import read_csv
import numpy as np
from subprocess import call
from optparse import OptionParser
def parseOption():
parser = OptionParser()
parser.add_option(
"-c",
"--with-calls",
dest="with_calls",
help="Add the function calls in the graph",
action="store_true",
)
opt, files = parser.parse_args()
if len(files) != 1:
raise Exception("You need to provide one file")
return opt, files
def getGitVersion(f, git):
"""
Read the git version from the file
Parameters
----------
f: str
Filename
git: str
Git version of previous file
Returns
-------
new_git: str
Git version of current file
"""
# read comment in csv file
with open(f, "r") as f:
line = f.readline()
# check if really a comment
if line[0] != "#":
return None
# remove trailing characters
new_git = line[2:].rstrip()
# check if previous and current are the same
if git is not None and git != new_git:
raise Exception("Files were not produced by the same version")
return new_git
def appendSingleData(data0, datai):
"""
Append two DataFrame together
Parameters
----------
data0: DataFrame
One of the dataframe
datai: DataFrame
The second dataframe
Returns
-------
data0: DataFrame
The updated dataframe
"""
# loop over all rows in datai
for i, row in datai.iterrows():
# get data
ta = datai["task_in"][i]
tb = datai["task_out"][i]
ind = np.logical_and(data0["task_in"] == ta, data0["task_out"] == tb)
# check number of ta->tb
N = np.sum(ind)
if N > 1:
raise Exception("Same dependency written multiple times %s->%s" % (ta, tb))
# if not present in data0
if N == 0:
data0.append(row)
else:
# otherwise just update the number of link
ind = ind[ind].index[0]
tmp = data0["number_link"][ind] + datai["number_link"][i]
data0.at[ind, "number_link"] = tmp
return data0
def appendData(data):
"""
Append all the dataframe together
Parameters
----------
data: list
List containing all the dataframe to append together
Returns
-------
data: DataFrame
The complete dataframe
"""
N = len(data)
if N == 1:
return data[0]
# add number link to data[0]
for i in range(N - 1):
i += 1
data[0] = appendSingleData(data[0], data[i])
return data[0]
def taskIsBlackHoles(name):
"""
Does the task concern black holes?
Parameters
----------
name: str
Task name
"""
if "bh" in name or "bpart" in name or "swallow" in name:
return True
return False
def taskIsStars(name):
"""
Does the task concern stars?
Parameters
----------
name: str
Task name
"""
if "stars" in name or "spart" in name:
return True
if "sf_count" in name:
return True
return False
def taskIsHydro(name):
"""
Does the task concern the hydro?
Parameters
----------
name: str
Task name
"""
if "_part" in name:
return True
if "density" in name and "stars" not in name and "bh" not in name:
return True
if "rho" in name and "bpart" not in name:
return True
if "gradient" in name:
return True
if "force" in name:
return True
if "xv" in name and "bpart" not in name:
return True
task_name = [
"sort",
"ghost_in",
"ghost",
"ghost_out",
"extra_ghost",
"cooling",
"star_formation",
]
if name in task_name:
return True
return False
def taskIsGravity(name):
"""
Does the task concern the gravity?
Parameters
----------
name: str
Task name
"""
if "gpart" in name:
return True
if "grav" in name:
return True
return False
def taskIsRT(name):
"""
Does the task concern Radiative Transfer?
Parameters
----------
name: str
Task name
"""
if "_rt" in name:
return True
elif name.startswith("rt_"):
return True
return False
def getFunctionCalls(name):
txt = None
if name == "ghost":
txt = """hydro_end_density, chemistry_end_density,<br/>
hydro_prepare_gradient, hydro_reset_gradient,<br/>
hydro_prepare_force, hydro_reset_acceleration,<br/>
hydro_init_part, chemistry_init_part,<br/>
hydro_has_no_neighbours, chemistry_part_has_no_neighbours
"""
elif name == "cooling":
txt = "cooling_cool_part"
elif name == "timestep":
txt = "tracers_after_timestep"
elif name == "drift_part":
txt = """drift_part, tracers_after_drift,<br/>
hydro_init_part, chemistry_init_part,<br/>
tracers_after_init
"""
elif name == "kick1":
txt = "kick_part, kick_gpart, kick_spart"
elif name == "kick2":
txt = """kick_part, kick_gpart, kick_spart,<br/>
hydro_reset_predicted_values,
gravity_reset_predicted_Values,<br/>
stars_reset_predicted_values,
"""
elif name == "end_force":
txt = """hydro_end_force, gravity_end_force,<br/>
stars_end_force"""
elif name == "drift_gpart":
txt = """drift_gpart, gravity_init_gpart,<br/>
drift_spart
"""
if "density" in name and "stars" not in name:
txt = """runner_iact_nonsym_chemistry, runner_iact_chemistry,<br/>
runner_iact_nonsym_density, runner_iact_density"""
if "force" in name and "end" not in name:
txt = "runner_iact_nonsym_density, runner_iact_density"
if txt is None:
return None
else:
pre = "<" + name + "<BR/> <Font POINT-SIZE='10'>Calls: "
app = "</Font>>"
return pre + txt + app
def writeTask(f, name, implicit, mpi, with_calls):
"""
Write the special task (e.g. implicit and mpi)
Parameters
----------
f: File
File where to write the data
name: str
Task name
implicit: int
Is the task implicit
mpi: int
Is the task MPI related
with_calls: bool
if true, write down the function calls
"""
# generate text
txt = "\t " + name + "["
if implicit:
txt += "style=filled,fillcolor=lightgrey,"
if mpi:
txt += "shape=diamond,"
if taskIsBlackHoles(name):
txt += "color=forestgreen,"
if taskIsStars(name):
txt += "color=darkorange1,"
if taskIsHydro(name):
txt += "color=blue3,"
if taskIsGravity(name):
txt += "color=red3,"
if taskIsRT(name):
txt += 'color="springgreen"'
if with_calls:
func = getFunctionCalls(name)
if func is not None:
txt += "label=" + func + ","
# remove extra ','
if txt[-1] == ",":
txt = txt[:-1]
txt += "];\n"
# write it
f.write(txt)
def writeHeader(f, data, git, opt):
"""
Write the header and the special tasks
Parameters
----------
f: File
File where to write the data
data: DataFrame
The dataframe to write
git: str
The git version
opt: object
The options provided to this script
"""
# write header
f.write("digraph task_dep {\n")
f.write("\t # Header\n")
f.write('\t label="Task dependencies for SWIFT %s";\n' % git)
f.write("\t compound=true;\n")
f.write("\t ratio=0.66;\n")
f.write("\t node[nodesep=0.15];\n")
f.write("\n")
# write the special task
f.write("\t # Special tasks\n")
N = len(data)
written = []
# do task in
for i in range(N):
ta = data["task_in"][i]
if ta in written:
continue
written.append(ta)
writeTask(f, ta, data["implicit_in"][i], data["mpi_in"][i], opt.with_calls)
# do task out
for i in range(N):
tb = data["task_out"][i]
if tb in written:
continue
written.append(tb)
writeTask(f, tb, data["implicit_out"][i], data["mpi_out"][i], opt.with_calls)
f.write("\n")
def writeCluster(f, tasks, cluster):
"""
Write a single cluster
Parameters
----------
f: File
File where to write the data
tasks: list
List of all tasks in the cluster
cluster: str
Cluster name
"""
f.write("\t subgraph cluster%s {\n" % cluster)
f.write('\t\t label="";\n')
for t in tasks:
f.write("\t\t %s;\n" % t)
f.write("\t };\n\n")
def writeClusters(f, data):
"""
Write all the clusters
Parameters
----------
f: File
File where to write the data
data: DataFrame
The dataframe to write
"""
f.write("\t # Clusters\n")
# get list of all the clusters
clusters = data[["cluster_in", "cluster_out"]]
clusters = np.unique(clusters)
cluster_in = data["cluster_in"]
cluster_out = data["cluster_out"]
# loop over all clusters
for cluster in clusters:
# is it a cluster?
if cluster == "None":
continue
# get all the task in current cluster
ta = data["task_in"][cluster_in == cluster]
tb = data["task_out"][cluster_out == cluster]
# make them unique
tasks = np.append(ta, tb)
tasks = np.unique(tasks)
# write current cluster
writeCluster(f, tasks, cluster)
f.write("\n")
def writeDependencies(f, data):
"""
Write all the dependencies between tasks
Parameters
----------
f: File
File where to write the data
data: DataFrame
The dataframe to write
"""
f.write("\t # Dependencies\n")
N = len(data)
written = []
max_rank = data["number_rank"].max()
for i in range(N):
# get data
ta = data["task_in"][i]
tb = data["task_out"][i]
number_link = data["number_link"][i]
# check if already done
name = "%s_%s" % (ta, tb)
if name in written:
raise Exception("Found two same task dependencies")
written.append(name)
# write relation
arrow = ""
if data["number_rank"][i] != max_rank:
arrow = ",style=dashed"
f.write("\t %s->%s[label=%i%s]\n" % (ta, tb, number_link, arrow))
def writeFooter(f):
"""
Write the footer
Parameters
----------
f: File
File where to write the data
"""
f.write("}")
if __name__ == "__main__":
opt, files = parseOption()
# output
dot_output = "dependency_graph.dot"
png_output = "dependency_graph.png"
# read files
data = []
git = None
for f in files:
tmp = read_csv(f, delimiter=",", comment="#")
git = getGitVersion(f, git)
data.append(tmp)
data = appendData(data)
# write output
with open(dot_output, "w") as f:
writeHeader(f, data, git, opt)
writeClusters(f, data)
writeDependencies(f, data)
writeFooter(f)
call(["dot", "-Tpng", dot_output, "-o", png_output])
print("You will find the graph in %s" % png_output)
if opt.with_calls:
print("We recommand to use the python package xdot available on pypi:")
print(" python -m xdot %s" % dot_output)