Commit eac93513 authored by Loic Hausammann's avatar Loic Hausammann
Browse files

Use MPI_Reduce for the task graph

parent ae456d37
......@@ -146,6 +146,188 @@ int scheduler_get_number_relation(const struct scheduler *s,
return count;
}
/* Conservative number of dependencies per task type */
#define max_nber_dep 128
/**
* @brief Informations about all the task dependencies of
* a single task.
*/
struct task_dependency {
/* Main task */
/* ID of the task */
int type_in;
/* ID of the subtask */
int subtype_in;
/* Is the task implicit */
int implicit_in;
/* Dependent task */
/* ID of the dependent task */
int type_out[max_nber_dep];
/* ID of the dependent subtask */
int subtype_out[max_nber_dep];
/* Is the dependent task implicit */
int implicit_out[max_nber_dep];
/* Statistics */
/* number of link between the two task type */
int number_link[max_nber_dep];
/* number of ranks having this relation */
int number_rank[max_nber_dep];
};
#ifdef WITH_MPI
/**
* @brief Define the #task_dependency for MPI
*
* @param tstype The #MPI_Datatype to initialize
*/
void task_dependency_define(MPI_Datatype *tstype) {
/* Define the variables */
const int count = 8;
int blocklens[count];
MPI_Datatype types[count];
MPI_Aint disps[count];
/* all the type are int */
for (int i=0; i < count; i++) {
types[i] = MPI_INT;
}
/* Task in */
disps[0] = offsetof(struct task_dependency, type_in);
blocklens[0] = 1;
disps[1] = offsetof(struct task_dependency, subtype_in);
blocklens[1] = 1;
disps[2] = offsetof(struct task_dependency, implicit_in);
blocklens[2] = 1;
/* Task out */
disps[3] = offsetof(struct task_dependency, type_out);
blocklens[3] = max_nber_dep;
disps[4] = offsetof(struct task_dependency, subtype_out);
blocklens[4] = max_nber_dep;
disps[5] = offsetof(struct task_dependency, implicit_out);
blocklens[5] = max_nber_dep;
/* statistics */
disps[6] = offsetof(struct task_dependency, number_link);
blocklens[6] = max_nber_dep;
disps[7] = offsetof(struct task_dependency, number_rank);
blocklens[7] = max_nber_dep;
/* define it for MPI */
MPI_Type_create_struct(count, blocklens, disps, types, tstype);
MPI_Type_commit(tstype);
}
/**
* @brief Sum operator of #task_dependency for MPI
*
* @param in_p The #task_dependency to add
* @param out_p The #task_dependency where in_p is added
* @param len The length of the arrays
* @param type The MPI datatype
*/
void task_dependency_sum(
void *in_p, void *out_p, int *len, MPI_Datatype *type){
/* change pointer type */
struct task_dependency *in = in_p;
struct task_dependency *out = out_p;
/* Loop over all the current objects */
for (int i = 0; i < *len; i++) {
/* loop over all the object set in invals */
for (int j = 0; j < max_nber_dep; j++) {
/* Have we reached the end of the links? */
if (in[i].number_link[j] == -1) {
break;
}
/* get a few variables */
int tb_type = in[i].type_out[j];
int tb_subtype = in[i].subtype_out[j];
#ifdef SWIFT_DEBUG_CHECKS
/* Check tasks */
if (tb_type >= task_type_count) {
error("Unknown task type %i", tb_type);
}
if (tb_subtype >= task_subtype_count) {
error("Unknown subtask type %i", tb_subtype);
}
#endif
/* find the corresponding id */
int k = 0;
while (k < max_nber_dep) {
/* have we reached the end of the links? */
if (out[i].number_link[k] == -1) {
/* reset the counter in order to be safe */
out[i].number_link[k] = 0;
out[i].number_rank[k] = 0;
/* set the relation */
out[i].type_in = in[i].type_in;
out[i].subtype_in = in[i].subtype_in;
out[i].implicit_in = in[i].implicit_in;
out[i].type_out[k] = in[i].type_out[j];
out[i].subtype_out[k] = in[i].subtype_out[j];
out[i].implicit_out[k] = in[i].implicit_out[j];
break;
}
/* do we have the same relation? */
if (out[i].type_out[k] == tb_type &&
out[i].subtype_out[k] == tb_subtype) {
break;
}
k++;
}
/* Check if we are still in the memory */
if (k == max_nber_dep) {
error("Not enough memory, please increase max_nber_dep");
}
#ifdef SWIFT_DEBUG_CHECKS
/* Check if correct relation */
if (out[i].type_in != in[i].type_in ||
out[i].subtype_in != in[i].subtype_in ||
out[i].implicit_in != in[i].implicit_in ||
out[i].type_out[k] != in[i].type_out[j] ||
out[i].subtype_out[k] != in[i].subtype_out[j] ||
out[i].implicit_out[k] != in[i].implicit_out[j]) {
error("Tasks do not correspond");
}
#endif
/* sum the contributions */
out[i].number_link[k] += in[i].number_link[j];
out[i].number_rank[k] += in[i].number_rank[j];
}
}
return;
}
#endif // WITH_MPI
/**
* @brief Write a dot file with the task dependencies.
*
......@@ -159,134 +341,180 @@ void scheduler_write_dependencies(struct scheduler *s, int verbose) {
const ticks tic = getticks();
/* Conservative number of dependencies per task type */
const int max_nber_dep = 128;
/* Number of possible relations between tasks */
const int nber_relation =
2 * task_type_count * task_subtype_count * max_nber_dep;
const int nber_tasks =
task_type_count * task_subtype_count;
/* To get the table of max_nber_dep for a task:
* ind = (ta * task_subtype_count + sa) * max_nber_dep * 2
/* To get the table for a task:
* ind = (ta * task_subtype_count + sa)
* where ta is the value of task_type and sa is the value of
* task_subtype */
int *table = (int *)malloc(nber_relation * sizeof(int));
if (table == NULL)
error("Error allocating memory for task-dependency graph (table).");
struct task_dependency *task_dep = (struct task_dependency *)malloc(
nber_tasks * sizeof(struct task_dependency));
/* Reset everything */
for (int i = 0; i < nber_relation; i++) table[i] = -1;
if (task_dep == NULL)
error("Error allocating memory for task-dependency graph (table).");
/* Master create directory */
char dir_name[100] = "task_dependencies";
if (s->nodeID == 0) {
struct stat st = {0};
if (stat(dir_name, &st) == -1) {
int test = mkdir(dir_name, 0700);
if (test != 0) error("Failed to create the task dependencies directory");
/* Reset counter */
for (int i = 0; i < nber_tasks; i++) {
for (int j = 0; j < max_nber_dep; j++) {
/* Use number_link as indicator of the existance of a relation */
task_dep[i].number_link[j] = -1;
}
}
#ifdef WITH_MPI
/* wait on master */
int result = MPI_Barrier(MPI_COMM_WORLD);
if (result != MPI_SUCCESS) {
error("Unable to wait on other MPI rank");
}
#endif
/* Create file */
char filename[200];
sprintf(filename, "%s/dependency_graph_%04i.csv", dir_name, s->nodeID);
FILE *f = fopen(filename, "w");
if (f == NULL) error("Error opening dependency graph file.");
/* Write header */
fprintf(f, "# %s\n", git_revision());
fprintf(f,
"task_in,task_out,implicit_in,implicit_out,mpi_in,mpi_out,cluster_in,"
"cluster_out,number_link\n");
/* loop over all tasks */
for (int i = 0; i < s->nr_tasks; i++) {
const struct task *ta = &s->tasks[i];
/* Current index */
int ind = ta->type * task_subtype_count + ta->subtype;
struct task_dependency *cur = &task_dep[ind];
/* Set ta */
cur->type_in = ta->type;
cur->subtype_in = ta->subtype;
cur->implicit_in = ta->implicit;
/* and their dependencies */
for (int j = 0; j < ta->nr_unlock_tasks; j++) {
const struct task *tb = ta->unlock_tasks[j];
/* check if dependency already written */
int written = 0;
/* Current index */
int ind = ta->type * task_subtype_count + ta->subtype;
ind *= 2 * max_nber_dep;
int k = 0;
int *cur = &table[ind];
while (k < max_nber_dep) {
/* not written yet */
if (cur[0] == -1) {
cur[0] = tb->type;
cur[1] = tb->subtype;
if (cur->number_link[k] == -1) {
/* set tb */
cur->type_out[k] = tb->type;
cur->subtype_out[k] = tb->subtype;
cur->implicit_out[k] = tb->implicit;
/* statistics */
int count = scheduler_get_number_relation(s, ta, tb);
cur->number_link[k] = count;
cur->number_rank[k] = 1;
break;
}
/* already written */
if (cur[0] == tb->type && cur[1] == tb->subtype) {
written = 1;
if (cur->type_out[k] == tb->type && cur->subtype_out[k] == tb->subtype) {
break;
}
k += 1;
cur = &cur[2];
}
/* max_nber_dep is too small */
if (k == max_nber_dep)
error("Not enough memory, please increase max_nber_dep");
}
}
/* Not written yet => write it */
if (!written) {
int count = scheduler_get_number_relation(s, ta, tb);
#ifdef WITH_MPI
/* create MPI operator */
MPI_Datatype data_type;
task_dependency_define(&data_type);
/* text to write */
char ta_name[200];
char tb_name[200];
MPI_Op sum;
MPI_Op_create(task_dependency_sum, /* commute */1, &sum);
/* construct line */
task_get_full_name(ta->type, ta->subtype, ta_name);
task_get_full_name(tb->type, tb->subtype, tb_name);
/* create recv buffer */
struct task_dependency *recv = NULL;
/* Check if MPI */
int ta_mpi = 0;
if (ta->type == task_type_send || ta->type == task_type_recv)
ta_mpi = 1;
if (s->nodeID == 0) {
recv = (struct task_dependency *)malloc(
nber_tasks * sizeof(struct task_dependency));
/* reset counter */
for (int i = 0; i < nber_tasks; i++) {
for (int j = 0; j < max_nber_dep; j++) {
/* Use number_link as indicator of the existance of a relation */
recv[i].number_link[j] = -1;
}
}
}
int tb_mpi = 0;
if (tb->type == task_type_send || tb->type == task_type_recv)
tb_mpi = 1;
/* Do the reduction */
int test = MPI_Reduce(task_dep, recv, nber_tasks, data_type, sum, 0, MPI_COMM_WORLD);
if (test != MPI_SUCCESS)
error("MPI reduce failed");
/* Get group name */
char ta_cluster[20];
char tb_cluster[20];
task_get_group_name(ta, ta_cluster);
task_get_group_name(tb, tb_cluster);
/* free some memory */
if (s->nodeID == 0) {
free(task_dep);
task_dep = recv;
}
#endif
fprintf(f, "%s,%s,%d,%d,%d,%d,%s,%s,%d\n", ta_name, tb_name,
ta->implicit, tb->implicit, ta_mpi, tb_mpi, ta_cluster,
tb_cluster, count);
if (s->nodeID == 0) {
/* Create file */
char *filename = "dependency_graph.csv";
FILE *f = fopen(filename, "w");
if (f == NULL) error("Error opening dependency graph file.");
/* Write header */
fprintf(f, "# %s\n", git_revision());
fprintf(f,
"task_in,task_out,implicit_in,implicit_out,mpi_in,mpi_out,cluster_in,"
"cluster_out,number_link,number_rank\n");
for (int i = 0; i < nber_tasks; i++){
for (int j = 0; j < max_nber_dep; j++) {
/* Does this link exists */
if (task_dep[i].number_link[j] == -1) {
continue;
}
/* Define a few variables */
int ta_type = task_dep[i].type_in;
int ta_subtype = task_dep[i].subtype_in;
int ta_implicit = task_dep[i].implicit_in;
int tb_type = task_dep[i].type_out[j];
int tb_subtype = task_dep[i].subtype_out[j];
int tb_implicit = task_dep[i].implicit_out[j];
int count = task_dep[i].number_link[j];
int number_rank = task_dep[i].number_rank[j];
/* text to write */
char ta_name[200];
char tb_name[200];
/* construct line */
task_get_full_name(ta_type, ta_subtype, ta_name);
task_get_full_name(tb_type, tb_subtype, tb_name);
/* Check if MPI */
int ta_mpi = 0;
if (ta_type == task_type_send || ta_type == task_type_recv)
ta_mpi = 1;
int tb_mpi = 0;
if (tb_type == task_type_send || tb_type == task_type_recv)
tb_mpi = 1;
/* Get group name */
char ta_cluster[20];
char tb_cluster[20];
task_get_group_name(ta_type, ta_subtype, ta_cluster);
task_get_group_name(tb_type, tb_subtype, tb_cluster);
fprintf(f, "%s,%s,%d,%d,%d,%d,%s,%s,%d,%d\n", ta_name, tb_name,
ta_implicit, tb_implicit, ta_mpi, tb_mpi, ta_cluster,
tb_cluster, count, number_rank);
}
}
}
/* Close the file */
fclose(f);
/* Close the file */
fclose(f);
}
/* Be clean */
free(table);
free(task_dep);
if (verbose && s->nodeID == 0)
message("Printing task graph took %.3f %s.",
......
......@@ -572,19 +572,20 @@ void task_print(const struct task *t) {
* This is used to group tasks with similar actions in the task dependency
* graph.
*
* @param t The #task.
* @param group (return) The group name (should be allocated)
* @param type The #task type.
* @param subtype The #subtask type.
* @param cluster (return) The group name (should be allocated)
*/
void task_get_group_name(const struct task *t, char *cluster) {
void task_get_group_name(int type, int subtype, char *cluster) {
if (t->type == task_type_grav_long_range || t->type == task_type_grav_mm ||
t->type == task_type_grav_mesh) {
if (type == task_type_grav_long_range || type == task_type_grav_mm ||
type == task_type_grav_mesh) {
strcpy(cluster, "Gravity");
return;
}
switch (t->subtype) {
switch (subtype) {
case task_subtype_density:
strcpy(cluster, "Density");
break;
......@@ -618,10 +619,10 @@ void task_get_full_name(enum task_types type, enum task_subtypes subtype,
#ifdef SWIFT_DEBUG_CHECKS
/* Check input */
if ((type < 0) || (type >= task_type_count))
if (type >= task_type_count)
error("Unknown task type %i", type);
if ((subtype < 0) || (subtype >= task_subtype_count))
if (subtype >= task_subtype_count)
error("Unknown task subtype %i with type %s", subtype, taskID_names[type]);
#endif
......
......@@ -205,7 +205,7 @@ void task_do_rewait(struct task *t);
void task_print(const struct task *t);
void task_get_full_name(enum task_types type, enum task_subtypes subtype,
char *name);
void task_get_group_name(const struct task *t, char *cluster);
void task_get_group_name(int type, int subtype, char *cluster);
#ifdef WITH_MPI
void task_create_mpi_comms(void);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment