Commit b4bcb886 authored by Loic Hausammann's avatar Loic Hausammann
Browse files

Add a mpi implementation of the task graph

parent 4318bff5
......@@ -2785,7 +2785,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs,
gravity_exact_force_compute(e->s, e);
#endif
if (e->nodeID == 0) scheduler_write_dependencies(&e->sched, e->verbose);
scheduler_write_dependencies(&e->sched, e->verbose);
if (e->nodeID == 0) scheduler_write_task_level(&e->sched);
/* Run the 0th time-step */
......
......@@ -135,10 +135,66 @@ void scheduler_task_dependency_name(int ta_type, int ta_subtype,
if (ta_subtype == task_subtype_none)
sprintf(ta_name, "%s", taskID_names[ta_type]);
else
sprintf(ta_name, "\"%s %s\"", taskID_names[ta_type],
sprintf(ta_name, "%s_%s", taskID_names[ta_type],
subtaskID_names[ta_subtype]);
}
/**
* @brief Get the cluster name of a task.
*
* @param ta The #task
* @param cluster (output) The cluster name (should be allocated)
*/
void scheduler_get_cluster_name(const struct task *ta, char *cluster) {
strcpy(cluster, "None");
if (ta->subtype == task_subtype_density)
strcpy(cluster, "Density");
else if (ta->subtype == task_subtype_gradient)
strcpy(cluster, "Gradient");
else if (ta->subtype == task_subtype_force)
strcpy(cluster, "Force");
else if (ta->subtype == task_subtype_grav ||
ta->type == task_type_grav_long_range ||
ta->type == task_type_grav_mm ||
ta->type == task_type_grav_mesh)
strcpy(cluster, "Gravity");
else if (ta->subtype == task_subtype_stars_density)
strcpy(cluster, "Stars");
}
/**
* @brief compute the number of same dependencies
*
* @param s The #scheduler
* @param ta The #task
* @param tb The dependent #task
*
* @return Number of dependencies
*/
int scheduler_get_number_relation(const struct scheduler *s,
const struct task *ta, const struct task *tb) {
int count = 0;
/* loop over all tasks */
for (int i = 0; i < s->nr_tasks; i++) {
const struct task *ta_tmp = &s->tasks[i];
/* and their dependencies */
for (int j = 0; j < ta->nr_unlock_tasks; j++) {
const struct task *tb_tmp = ta->unlock_tasks[j];
if (ta->type == ta_tmp->type &&
ta->subtype == ta_tmp->subtype &&
tb->type == tb_tmp->type &&
tb->subtype == tb_tmp->subtype) {
count += 1;
}
}
}
return count;
}
/**
* @brief Write a dot file with the task dependencies.
*
......@@ -167,25 +223,18 @@ void scheduler_write_dependencies(struct scheduler *s, int verbose) {
if (table == NULL)
error("Error allocating memory for task-dependency graph (table).");
int *count_rel = (int *)malloc(nber_relation * sizeof(int) / 2);
if (count_rel == NULL)
error("Error allocating memory for task-dependency graph (count_rel).");
/* Reset everything */
for (int i = 0; i < nber_relation; i++) table[i] = -1;
for (int i = 0; i < nber_relation / 2; i++) count_rel[i] = 0;
/* Create file */
char filename[200] = "dependency_graph.dot";
char filename[200];
sprintf(filename, "dependency_graph_%04i.csv", s->nodeID);
FILE *f = fopen(filename, "w");
if (f == NULL) error("Error opening dependency graph file.");
/* Write header */
fprintf(f, "digraph task_dep {\n");
fprintf(f, "label=\"Task dependencies for SWIFT %s\";\n", git_revision());
fprintf(f, "\t compound=true;\n");
fprintf(f, "\t ratio=0.66;\n");
fprintf(f, "\t node[nodesep=0.15];\n");
fprintf(f, "# %s\n", git_revision());
fprintf(f, "task_in,task_out,implicit_in,implicit_out,mpi_in,mpi_out,cluster_in,cluster_out,number_link\n");
/* loop over all tasks */
for (int i = 0; i < s->nr_tasks; i++) {
......@@ -227,12 +276,11 @@ void scheduler_write_dependencies(struct scheduler *s, int verbose) {
if (k == max_nber_dep)
error("Not enough memory, please increase max_nber_dep");
/* Increase counter of relation */
count_rel[ind / 2 + k] += 1;
/* Not written yet => write it */
if (!written) {
int count = scheduler_get_number_relation(s, ta, tb);
/* text to write */
char ta_name[200];
char tb_name[200];
......@@ -241,154 +289,36 @@ void scheduler_write_dependencies(struct scheduler *s, int verbose) {
scheduler_task_dependency_name(ta->type, ta->subtype, ta_name);
scheduler_task_dependency_name(tb->type, tb->subtype, tb_name);
/* Change colour of implicit tasks */
if (ta->implicit)
fprintf(f, "\t %s [style = filled];\n\t %s [color = lightgrey];\n",
ta_name, ta_name);
if (tb->implicit)
fprintf(f, "\t %s [style = filled];\n\t %s [color = lightgrey];\n",
tb_name, tb_name);
/* Change shape of MPI communications */
/* Check if MPI */
int ta_mpi = 0;
if (ta->type == task_type_send || ta->type == task_type_recv)
fprintf(f, "\t \"%s %s\" [shape = diamond];\n",
taskID_names[ta->type], subtaskID_names[ta->subtype]);
if (tb->type == task_type_send || tb->type == task_type_recv)
fprintf(f, "\t \"%s %s\" [shape = diamond];\n",
taskID_names[tb->type], subtaskID_names[tb->subtype]);
}
}
}
ta_mpi = 1;
int density_cluster[4] = {0};
int gradient_cluster[4] = {0};
int force_cluster[4] = {0};
int gravity_cluster[5] = {0};
int stars_density_cluster[4] = {0};
/* Check whether we need to construct a group of tasks */
for (int type = 0; type < task_type_count; ++type) {
for (int subtype = 0; subtype < task_subtype_count; ++subtype) {
const int ind = 2 * (type * task_subtype_count + subtype) * max_nber_dep;
/* Does this task/sub-task exist? */
if (table[ind] != -1) {
for (int k = 0; k < 4; ++k) {
if (type == task_type_self + k && subtype == task_subtype_density)
density_cluster[k] = 1;
if (type == task_type_self + k && subtype == task_subtype_gradient)
gradient_cluster[k] = 1;
if (type == task_type_self + k && subtype == task_subtype_force)
force_cluster[k] = 1;
if (type == task_type_self + k && subtype == task_subtype_grav)
gravity_cluster[k] = 1;
if (type == task_type_self + k &&
subtype == task_subtype_stars_density)
stars_density_cluster[k] = 1;
}
if (type == task_type_grav_mesh) gravity_cluster[2] = 1;
if (type == task_type_grav_long_range) gravity_cluster[3] = 1;
if (type == task_type_grav_mm) gravity_cluster[4] = 1;
}
}
}
/* Make a cluster for the density tasks */
fprintf(f, "\t subgraph cluster0{\n");
fprintf(f, "\t\t label=\"\";\n");
for (int k = 0; k < 4; ++k)
if (density_cluster[k])
fprintf(f, "\t\t \"%s %s\";\n", taskID_names[task_type_self + k],
subtaskID_names[task_subtype_density]);
fprintf(f, "\t};\n");
/* Make a cluster for the force tasks */
fprintf(f, "\t subgraph cluster1{\n");
fprintf(f, "\t\t label=\"\";\n");
for (int k = 0; k < 4; ++k)
if (force_cluster[k])
fprintf(f, "\t\t \"%s %s\";\n", taskID_names[task_type_self + k],
subtaskID_names[task_subtype_force]);
fprintf(f, "\t};\n");
/* Make a cluster for the gradient tasks */
fprintf(f, "\t subgraph cluster2{\n");
fprintf(f, "\t\t label=\"\";\n");
for (int k = 0; k < 4; ++k)
if (gradient_cluster[k])
fprintf(f, "\t\t \"%s %s\";\n", taskID_names[task_type_self + k],
subtaskID_names[task_subtype_gradient]);
fprintf(f, "\t};\n");
/* Make a cluster for the gravity tasks */
fprintf(f, "\t subgraph cluster3{\n");
fprintf(f, "\t\t label=\"\";\n");
for (int k = 0; k < 2; ++k)
if (gravity_cluster[k])
fprintf(f, "\t\t \"%s %s\";\n", taskID_names[task_type_self + k],
subtaskID_names[task_subtype_grav]);
if (gravity_cluster[2])
fprintf(f, "\t\t %s;\n", taskID_names[task_type_grav_mesh]);
if (gravity_cluster[3])
fprintf(f, "\t\t %s;\n", taskID_names[task_type_grav_long_range]);
if (gravity_cluster[4])
fprintf(f, "\t\t %s;\n", taskID_names[task_type_grav_mm]);
fprintf(f, "\t};\n");
/* Make a cluster for the density tasks */
fprintf(f, "\t subgraph cluster4{\n");
fprintf(f, "\t\t label=\"\";\n");
for (int k = 0; k < 4; ++k)
if (stars_density_cluster[k])
fprintf(f, "\t\t \"%s %s\";\n", taskID_names[task_type_self + k],
subtaskID_names[task_subtype_stars_density]);
fprintf(f, "\t};\n");
/* Write down the number of relation */
for (int ta_type = 0; ta_type < task_type_count; ta_type++) {
for (int ta_subtype = 0; ta_subtype < task_subtype_count; ta_subtype++) {
/* Get task indice */
const int ind =
(ta_type * task_subtype_count + ta_subtype) * max_nber_dep;
/* Loop over dependencies */
for (int k = 0; k < max_nber_dep; k++) {
if (count_rel[ind + k] == 0) continue;
/* Get task type */
const int i = 2 * (ind + k);
int tb_type = table[i];
int tb_subtype = table[i + 1];
/* Get names */
char ta_name[200];
char tb_name[200];
scheduler_task_dependency_name(ta_type, ta_subtype, ta_name);
scheduler_task_dependency_name(tb_type, tb_subtype, tb_name);
int tb_mpi = 0;
if (tb->type == task_type_send || tb->type == task_type_recv)
tb_mpi = 1;
/* Get cluster name */
char ta_cluster[20];
scheduler_get_cluster_name(ta, ta_cluster);
char tb_cluster[20];
scheduler_get_cluster_name(tb, tb_cluster);
fprintf(f, "%s,%s,%i,%i,%i,%i,%s,%s,%i\n",
ta_name, tb_name, ta->implicit, tb->implicit,
ta_mpi, tb_mpi, ta_cluster, tb_cluster, count);
/* Write to the fle */
fprintf(f, "\t %s->%s[label=%i];\n", ta_name, tb_name,
count_rel[ind + k]);
}
}
}
/* Close the file */
fprintf(f, "}");
fclose(f);
/* Be clean */
free(table);
free(count_rel);
if (verbose)
if (verbose && s->nodeID == 0)
message("Printing task graph took %.3f %s.",
clocks_from_ticks(getticks() - tic), clocks_getunit());
}
......
#!/usr/bin/env python3
"""
This file generates a graphviz file that represents the SWIFT tasks
dependencies.
Example: ./plot_task_dependencies.py dependency_graph_*.csv
"""
import sys
from pandas import read_csv
import numpy as np
from subprocess import call
def getGitVersion(f, git):
"""
Read the git version from the file
Parameters
----------
f: str
Filename
git: str
Git version of previous file
Returns
-------
new_git: str
Git version of current file
"""
# read comment in csv file
with open(f, "r") as f:
line = f.readline()
# check if really a comment
if line[0] != "#":
return None
# remove trailing characters
new_git = line[2:].rstrip()
# check if previous and current are the same
if git is not None and git != new_git:
raise Exception("Files were not produced by the same version")
return new_git
def appendSingleData(data0, datai):
"""
Append two DataFrame together
Parameters
----------
data0: DataFrame
One of the dataframe
datai: DataFrame
The second dataframe
Returns
-------
data0: DataFrame
The updated dataframe
"""
# loop over all rows in datai
for i, row in datai.iterrows():
# get data
ta = datai["task_in"][i]
tb = datai["task_out"][i]
ind = np.logical_and(data0["task_in"] == ta,
data0["task_out"] == tb)
# check number of ta->tb
N = np.sum(ind)
if N > 1:
raise Exception("Same dependency written multiple times %s->%s" %
(ta, tb))
# if not present in data0
if N == 0:
data0.append(row)
else:
# otherwise just update the number of link
ind = ind[ind].index[0]
tmp = data0["number_link"][ind] + datai["number_link"][i]
data0.at[ind, "number_link"] = tmp
return data0
def appendData(data):
"""
Append all the dataframe together
Parameters
----------
data: list
List containing all the dataframe to append together
Returns
-------
data: DataFrame
The complete dataframe
"""
N = len(data)
if N == 1:
return data
# add number link to data[0]
for i in range(N-1):
i += 1
data[0] = appendSingleData(data[0], data[i])
return data[0]
def writeTask(f, name, implicit, mpi):
"""
Write the special task (e.g. implicit and mpi)
Parameters
----------
f: File
File where to write the data
name: str
Task name
implicit: int
Is the task implicit
mpi: int
Is the task MPI related
"""
# do we need to do something?
if not implicit and not mpi:
return
# generate text
txt = "\t " + name + "["
if implicit:
txt += "style=filled, color=lightgrey"
if mpi:
txt += ","
if mpi:
txt += "shape=diamond"
txt += "];\n"
# write it
f.write(txt)
def writeHeader(f, data, git):
"""
Write the header and the special tasks
Parameters
----------
f: File
File where to write the data
data: DataFrame
The dataframe to write
git: str
The git version
"""
# write header
f.write("digraph task_dep {\n")
f.write("\t # Header\n")
f.write('\t label="Task dependencies for SWIFT %s";\n' % git)
f.write("\t compound=true;\n")
f.write("\t ratio=0.66;\n")
f.write("\t node[nodesep=0.15];\n")
f.write("\n")
# write the special task
f.write("\t # Special tasks\n")
N = len(data)
written = []
# do task in
for i in range(N):
ta = data["task_in"][i]
if ta in written:
continue
written.append(ta)
writeTask(f, ta, data["implicit_in"][i], data["mpi_in"][i])
# do task out
for i in range(N):
tb = data["task_out"][i]
if tb in written:
continue
written.append(tb)
writeTask(f, tb, data["implicit_out"][i], data["mpi_out"][i])
f.write("\n")
def writeCluster(f, tasks, cluster):
"""
Write a single cluster
Parameters
----------
f: File
File where to write the data
tasks: list
List of all tasks in the cluster
cluster: str
Cluster name
"""
f.write("\t subgraph cluster%s {\n" % cluster)
f.write('\t\t label="";\n')
for t in tasks:
f.write("\t\t %s;\n" % t)
f.write("\t };\n\n")
def writeClusters(f, data):
"""
Write all the clusters
Parameters
----------
f: File
File where to write the data
data: DataFrame
The dataframe to write
"""
f.write("\t # Clusters\n")
# get list of all the clusters
clusters = data[["cluster_in", "cluster_out"]]
clusters = np.unique(clusters)
cluster_in = data["cluster_in"]
cluster_out = data["cluster_out"]
# loop over all clusters
for cluster in clusters:
# is it a cluster?
if cluster == "None":
continue
# get all the task in current cluster
ta = data["task_in"][cluster_in == cluster]
tb = data["task_out"][cluster_out == cluster]
# make them unique
tasks = np.append(ta, tb)
tasks = np.unique(tasks)
# write current cluster
writeCluster(f, tasks, cluster)
f.write("\n")
def writeDependencies(f, data):
"""
Write all the dependencies between tasks
Parameters
----------
f: File
File where to write the data
data: DataFrame
The dataframe to write
"""
f.write("\t # Dependencies\n")
N = len(data)
written = []
for i in range(N):
# get data
ta = data["task_in"][i]
tb = data["task_out"][i]
number_link = data["number_link"][i]
# check if already done
name = "%s_%s" % (ta, tb)
if name in written:
raise Exception("Found two same task dependencies")
written.append(name)
# write relation
f.write("\t %s->%s[label=%i]\n" %
(ta, tb, number_link))
def writeFooter(f):
"""
Write the footer
Parameters
----------
f: File
File where to write the data
"""
f.write("}")
if __name__ == "__main__":
# get input
filenames = sys.argv[1:]