-
Mladen Ivkovic authoredMladen Ivkovic authored
plot_task_dependencies.py 13.89 KiB
#!/usr/bin/env python3
description = """
This file generates a graphviz file that represents the SWIFT tasks
dependencies.
Example: ./plot_task_dependencies.py dependency_graph_*.csv
"""
from pandas import read_csv
import numpy as np
from subprocess import call
import argparse
from os import path
# define task colours in the dict here:
task_colours = {
"black_holes": "forestgreen",
"stars": "darkorange1",
"hydro": "blue3",
"gravity": "red3",
"RT": "springgreen",
}
def parse_args():
"""
Parses command line arguments.
Returns
-------
args: Namespace
Namespace returned by argparse.ArgumentParser.parse_args()
containing all arguments
files:
List of files parsed from the command line.
Raises
------
FileNotFoundError
If any of the files provided on the command line doesn't exist
"""
# description is string at the top of this file.
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"-c",
"--with-calls",
dest="with_calls",
help="Add the function calls of the tasks to the graph",
action="store_true",
)
parser.add_argument(
"files",
nargs="+",
type=str,
help="Required file name(s) of .csv file(s) of the task dependencies generated by swift.",
)
args = parser.parse_args()
files = args.files
for f in files:
if not path.exists(f):
raise FileNotFoundError("You need to provide one file")
return args, files
def get_git_version(f, git):
"""
Read the git version from the file
Parameters
----------
f: str
Filename
git: str
Git version of previous file
Returns
-------
new_git: str
Git version of current file
"""
# read comment in csv file
with open(f, "r") as f:
line = f.readline()
# check if really a comment
if line[0] != "#":
return None
# remove trailing characters
new_git = line[2:].rstrip()
# check if previous and current are the same
if git is not None and git != new_git:
raise Exception("Files were not produced by the same version")
return new_git
def append_single_data(data0, datai):
"""
Append two DataFrame together
Parameters
----------
data0: DataFrame
One of the dataframe
datai: DataFrame
The second dataframe
Returns
-------
data0: DataFrame
The updated dataframe
"""
# loop over all rows in datai
for i, row in datai.iterrows():
# get data
ta = datai["task_in"][i]
tb = datai["task_out"][i]
ind = np.logical_and(data0["task_in"] == ta, data0["task_out"] == tb)
# check number of ta->tb
N = np.sum(ind)
if N > 1:
raise Exception("Same dependency written multiple times %s->%s" % (ta, tb))
# if not present in data0
if N == 0:
data0.append(row)
else:
# otherwise just update the number of link
ind = ind[ind].index[0]
tmp = data0["number_link"][ind] + datai["number_link"][i]
data0.at[ind, "number_link"] = tmp
return data0
def append_data(data):
"""
Append all the dataframe together, and add a column for colour type
Parameters
----------
data: list
List containing all the dataframe to append together
Returns
-------
data: DataFrame
The complete dataframe
"""
N = len(data)
if N == 1:
data[0]["task_colour"] = "black"
return data[0]
# add number link to data[0]
for i in range(N - 1):
i += 1
data[0] = append_single_data(data[0], data[i])
data[0]["task_colour"] = "black"
return data[0]
def get_task_colour(taskname):
"""
Get the task colour based on its name.
Default colour is black.
Parameters
----------
taskname: str
name of the task at hand
Returns
-------
colour: str
colour string directly applicable in the dot file.
"""
colour = "black"
if task_is_black_holes(taskname):
colour = task_colours["black_holes"]
elif task_is_stars(taskname):
colour = task_colours["stars"]
elif task_is_hydro(taskname):
colour = task_colours["hydro"]
elif task_is_gravity(taskname):
colour = task_colours["gravity"]
elif task_is_RT(taskname):
colour = task_colours["RT"]
return colour
def task_is_black_holes(name):
"""
Does the task concern black holes?
Parameters
----------
name: str
Task name
"""
if "bh" in name or "bpart" in name or "swallow" in name:
return True
return False
def task_is_stars(name):
"""
Does the task concern stars?
Parameters
----------
name: str
Task name
"""
if "stars" in name or "spart" in name:
return True
if "sf_count" in name:
return True
return False
def task_is_hydro(name):
"""
Does the task concern the hydro?
Parameters
----------
name: str
Task name
"""
if "_part" in name:
return True
if "density" in name and "stars" not in name and "bh" not in name:
return True
if "rho" in name and "bpart" not in name:
return True
if "gradient" in name:
return True
if "force" in name and "grav" not in name:
return True
if "xv" in name and "bpart" not in name:
return True
task_name = [
"sort",
"ghost_in",
"ghost",
"ghost_out",
"extra_ghost",
"cooling",
"star_formation",
]
if name in task_name:
return True
return False
def task_is_gravity(name):
"""
Does the task concern the gravity?
Parameters
----------
name: str
Task name
"""
if "gpart" in name:
return True
if "grav" in name:
return True
return False
def task_is_RT(name):
"""
Does the task concern Radiative Transfer?
Parameters
----------
name: str
Task name
"""
if "_rt" in name:
return True
elif name.startswith("rt_"):
return True
return False
def get_function_calls(name):
txt = None
if name == "ghost":
txt = """hydro_end_density, chemistry_end_density,<br/>
hydro_prepare_gradient, hydro_reset_gradient,<br/>
hydro_prepare_force, hydro_reset_acceleration,<br/>
hydro_init_part, chemistry_init_part,<br/>
hydro_has_no_neighbours, chemistry_part_has_no_neighbours
"""
elif name == "cooling":
txt = "cooling_cool_part"
elif name == "timestep":
txt = "tracers_after_timestep"
elif name == "drift_part":
txt = """drift_part, tracers_after_drift,<br/>
hydro_init_part, chemistry_init_part,<br/>
tracers_after_init
"""
elif name == "kick1":
txt = "kick_part, kick_gpart, kick_spart"
elif name == "kick2":
txt = """kick_part, kick_gpart, kick_spart,<br/>
hydro_reset_predicted_values,
gravity_reset_predicted_Values,<br/>
stars_reset_predicted_values,
"""
elif name == "end_force":
txt = """hydro_end_force, gravity_end_force,<br/>
stars_end_force"""
elif name == "drift_gpart":
txt = """drift_gpart, gravity_init_gpart,<br/>
drift_spart
"""
if "density" in name and "stars" not in name:
txt = """runner_iact_nonsym_chemistry, runner_iact_chemistry,<br/>
runner_iact_nonsym_density, runner_iact_density"""
if "force" in name and "end" not in name:
txt = "runner_iact_nonsym_density, runner_iact_density"
if txt is None:
return None
else:
pre = "<" + name + "<BR/> <Font POINT-SIZE='10'>Calls: "
app = "</Font>>"
return pre + txt + app
def write_task(f, name, implicit, mpi, with_calls):
"""
Write the special task (e.g. implicit and mpi)
Parameters
----------
f: File
File where to write the data
name: str
Task name
implicit: int
Is the task implicit
mpi: int
Is the task MPI related
with_calls: bool
if true, write down the function calls
"""
# generate text
txt = "\t " + name + "["
if implicit:
txt += "style=filled,fillcolor=grey90,"
if mpi:
txt += "shape=diamond,style=filled,fillcolor=azure,"
col = get_task_colour(name)
txt += "color=%s," % col
if with_calls:
func = get_function_calls(name)
if func is not None:
txt += "label=" + func + ","
# remove extra ','
if txt[-1] == ",":
txt = txt[:-1]
txt += "];\n"
# write it
f.write(txt)
def write_header(f, data, git, opt):
"""
Write the header and the special tasks
Parameters
----------
f: File
File where to write the data
data: DataFrame
The dataframe to write
git: str
The git version
opt: object
The options provided to this script
"""
# write header
f.write("digraph task_dep {\n")
f.write("\t # Header\n")
f.write('\t label="Task dependencies for SWIFT %s";\n' % git)
f.write("\t compound=true;\n")
f.write("\t ratio=0.66;\n")
f.write("\t node[nodesep=0.15, fontsize=30, penwidth=5.];\n")
f.write("\t ranksep=1.2;\n")
f.write("\n")
# write the special task
f.write("\t # Special tasks\n")
N = len(data)
written = []
# do task in
for i in range(N):
ta = data["task_in"][i]
if ta in written:
continue
written.append(ta)
write_task(f, ta, data["implicit_in"][i], data["mpi_in"][i], opt.with_calls)
# do task out
for i in range(N):
tb = data["task_out"][i]
if tb in written:
continue
written.append(tb)
write_task(f, tb, data["implicit_out"][i], data["mpi_out"][i], opt.with_calls)
f.write("\n")
def write_cluster(f, tasks, cluster):
"""
Write a single cluster
Parameters
----------
f: File
File where to write the data
tasks: list
List of all tasks in the cluster
cluster: str
Cluster name
"""
f.write("\t subgraph cluster%s {\n" % cluster)
f.write('\t\t label="";\n')
f.write('\t\t bgcolor="grey99";\n')
for t in tasks:
f.write("\t\t %s;\n" % t)
f.write("\t };\n\n")
def write_clusters(f, data):
"""
Write all the clusters
Parameters
----------
f: File
File where to write the data
data: DataFrame
The dataframe to write
"""
f.write("\t # Clusters\n")
# get list of all the clusters
clusters = data[["cluster_in", "cluster_out"]]
clusters = np.unique(clusters)
cluster_in = data["cluster_in"]
cluster_out = data["cluster_out"]
# loop over all clusters
for cluster in clusters:
# is it a cluster?
if cluster == "None":
continue
# get all the task in current cluster
ta = data["task_in"][cluster_in == cluster]
tb = data["task_out"][cluster_out == cluster]
# make them unique
tasks = np.append(ta, tb)
tasks = np.unique(tasks)
# write current cluster
write_cluster(f, tasks, cluster)
f.write("\n")
def write_dependencies(f, data):
"""
Write all the dependencies between tasks
Parameters
----------
f: File
File where to write the data
data: DataFrame
The dataframe to write
"""
f.write("\t # Dependencies\n")
N = len(data)
written = []
max_rank = data["number_rank"].max()
for i in range(N):
# get data
ta = data["task_in"][i]
tb = data["task_out"][i]
number_link = data["number_link"][i]
# check if already done
name = "%s_%s" % (ta, tb)
if name in written:
raise Exception("Found two same task dependencies")
written.append(name)
# write relation
arrow = ",color=%s" % data["task_colour"][i]
if data["number_rank"][i] != max_rank:
arrow += ",style=dashed"
f.write(
"\t %s->%s[label=%i%s,fontcolor=%s]\n"
% (ta, tb, number_link, arrow, data["task_colour"][i])
)
def write_footer(f):
"""
Write the footer
Parameters
----------
f: File
File where to write the data
"""
f.write("}")
def set_task_colours(data):
"""
Set the value of the task colour for the plot if you
want it non-black. the `task_colours` dict is defined
at the top of this script.
Parameters
----------
data: DataFrame
DataFrame of all the tasks
Returns
-------
data: DataFrame
modified DataFrame of all the tasks, now with more
colour
"""
N = len(data)
for i in range(N):
taskname = data["task_in"][i]
col = get_task_colour(taskname)
# set the colour
data.loc[i, "task_colour"] = col
return data
if __name__ == "__main__":
args, files = parse_args()
# output
dot_output = "dependency_graph.dot"
png_output = "dependency_graph.png"
# read files
data = []
git = None
for f in files:
tmp = read_csv(f, delimiter=",", comment="#")
git = get_git_version(f, git)
data.append(tmp)
data = append_data(data)
data = set_task_colours(data)
# write output
with open(dot_output, "w") as f:
write_header(f, data, git, args)
write_clusters(f, data)
write_dependencies(f, data)
write_footer(f)
call(["dot", "-Tpng", dot_output, "-o", png_output])
print("You will find the graph in %s" % png_output)
if args.with_calls:
print("We recommand to use the python package xdot available on pypi:")
print(" python -m xdot %s" % dot_output)