/******************************************************************************* * This file is part of SWIFT. * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk) * Matthieu Schaller (schaller@strw.leidenuniv.nl) * 2016 Peter W. Draper (p.w.draper@durham.ac.uk) * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published * by the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this program. If not, see . * ******************************************************************************/ /* Config parameters. */ #include /* Some standard headers. */ #include #include #include #include #include #include #include /* MPI headers. */ #ifdef WITH_MPI #include #endif /* This object's header. */ #include "scheduler.h" /* Local headers. */ #include "atomic.h" #include "cycle.h" #include "engine.h" #include "error.h" #include "intrinsics.h" #include "kernel_hydro.h" #include "memuse.h" #include "mpiuse.h" #include "queue.h" #include "sort_part.h" #include "space.h" #include "space_getsid.h" #include "task.h" #include "threadpool.h" #include "timers.h" #include "version.h" /** * @brief Re-set the list of active tasks. */ void scheduler_clear_active(struct scheduler *s) { s->active_count = 0; } /** * @brief Increase the space available for unlocks. Only call when * current index == s->size_unlock; */ static void scheduler_extend_unlocks(struct scheduler *s) { /* Allocate the new buffer. */ const int size_unlocks_new = s->size_unlocks * 2; struct task **unlocks_new = (struct task **)swift_malloc( "unlocks", sizeof(struct task *) * size_unlocks_new); int *unlock_ind_new = (int *)swift_malloc("unlock_ind", sizeof(int) * size_unlocks_new); if (unlocks_new == NULL || unlock_ind_new == NULL) error("Failed to re-allocate unlocks."); /* Wait for all writes to the old buffer to complete. */ while (s->completed_unlock_writes < s->size_unlocks) { /* Nothing to do here. */ } /* Copy the buffers. */ memcpy(unlocks_new, s->unlocks, sizeof(struct task *) * s->size_unlocks); memcpy(unlock_ind_new, s->unlock_ind, sizeof(int) * s->size_unlocks); swift_free("unlocks", s->unlocks); swift_free("unlock_ind", s->unlock_ind); s->unlocks = unlocks_new; s->unlock_ind = unlock_ind_new; /* Publish the new buffer size. */ s->size_unlocks = size_unlocks_new; } /** * @brief Add an unlock_task to the given task. * * @param s The #scheduler. * @param ta The unlocking #task. * @param tb The #task that will be unlocked. */ void scheduler_addunlock(struct scheduler *s, struct task *ta, struct task *tb) { #ifdef SWIFT_DEBUG_CHECKS if (ta == NULL) error("Unlocking task is NULL."); if (tb == NULL) error("Unlocked task is NULL."); #endif /* Get an index at which to store this unlock. */ const int ind = atomic_inc(&s->nr_unlocks); /* Does the buffer need to be grown? */ if (ind == s->size_unlocks) scheduler_extend_unlocks(s); #ifdef SWIFT_DEBUG_CHECKS if (ind > s->size_unlocks * 2) message("unlocks guard enabled: %d / %d", ind, s->size_unlocks); #endif /* Wait for there to actually be space at my index. */ while (ind > s->size_unlocks) { /* Nothing to do here. */ } /* Guard against case when more than (old) s->size_unlocks unlocks * are now pending. */ if (ind == s->size_unlocks) scheduler_extend_unlocks(s); /* Write the unlock to the scheduler. */ s->unlocks[ind] = tb; s->unlock_ind[ind] = ta - s->tasks; atomic_inc(&s->completed_unlock_writes); } /* Conservative number of dependencies per task type */ #define MAX_NUMBER_DEP 128 /** * @brief Describe the level at which the task are done. * WARNING: the order is supposed to be sorted from the root * to the leaf. */ enum task_dependency_level { task_dependency_level_top = 0, task_dependency_level_super, task_dependency_level_super_hydro, task_dependency_level_super_grav, task_dependency_level_none, }; /** * @brief Informations about all the task dependencies of * a single task. */ struct task_dependency { /* Main task */ /* ID of the task */ int type_in; /* ID of the subtask */ int subtype_in; /* Is the task implicit */ int implicit_in; /* Is the taks_in at the top level? */ int task_in_is_top; /* Is the taks_in at the grav.super level? */ int task_in_is_grav_super; /* Is the taks_in at the hydro.super level? */ int task_in_is_hydro_super; /* Dependent task */ /* ID of the dependent task */ int type_out[MAX_NUMBER_DEP]; /* ID of the dependent subtask */ int subtype_out[MAX_NUMBER_DEP]; /* Is the dependent task implicit */ int implicit_out[MAX_NUMBER_DEP]; /* Is the taks_out at the top level? */ int task_out_is_top[MAX_NUMBER_DEP]; /* Is the taks_out at the grav.super level? */ int task_out_is_grav_super[MAX_NUMBER_DEP]; /* Is the taks_out at the hydro.super level? */ int task_out_is_hydro_super[MAX_NUMBER_DEP]; /* Statistics */ /* number of link between the two task type */ int number_link[MAX_NUMBER_DEP]; /* number of ranks having this relation */ int number_rank[MAX_NUMBER_DEP]; }; #ifdef WITH_MPI /** * @brief Define the #task_dependency for MPI * * @param tstype The MPI_Datatype to initialize */ void task_dependency_define(MPI_Datatype *tstype) { /* Define the variables */ const int count = 14; int blocklens[count]; MPI_Datatype types[count]; MPI_Aint disps[count]; /* all the type are int */ for (int i = 0; i < count; i++) { types[i] = MPI_INT; } /* Task in */ disps[0] = offsetof(struct task_dependency, type_in); blocklens[0] = 1; disps[1] = offsetof(struct task_dependency, subtype_in); blocklens[1] = 1; disps[2] = offsetof(struct task_dependency, implicit_in); blocklens[2] = 1; disps[3] = offsetof(struct task_dependency, task_in_is_top); blocklens[3] = 1; disps[4] = offsetof(struct task_dependency, task_in_is_hydro_super); blocklens[4] = 1; disps[5] = offsetof(struct task_dependency, task_in_is_grav_super); blocklens[5] = 1; /* Task out */ disps[6] = offsetof(struct task_dependency, type_out); blocklens[6] = MAX_NUMBER_DEP; disps[7] = offsetof(struct task_dependency, subtype_out); blocklens[7] = MAX_NUMBER_DEP; disps[8] = offsetof(struct task_dependency, implicit_out); blocklens[8] = MAX_NUMBER_DEP; disps[9] = offsetof(struct task_dependency, task_out_is_top); blocklens[9] = MAX_NUMBER_DEP; disps[10] = offsetof(struct task_dependency, task_out_is_hydro_super); blocklens[10] = MAX_NUMBER_DEP; disps[11] = offsetof(struct task_dependency, task_out_is_grav_super); blocklens[11] = MAX_NUMBER_DEP; /* statistics */ disps[12] = offsetof(struct task_dependency, number_link); blocklens[12] = MAX_NUMBER_DEP; disps[13] = offsetof(struct task_dependency, number_rank); blocklens[13] = MAX_NUMBER_DEP; /* define it for MPI */ MPI_Type_create_struct(count, blocklens, disps, types, tstype); MPI_Type_commit(tstype); } /** * @brief Sum operator of #task_dependency for MPI * * @param in_p The #task_dependency to add * @param out_p The #task_dependency where in_p is added * @param len The length of the arrays * @param type The MPI datatype */ void task_dependency_sum(void *in_p, void *out_p, int *len, MPI_Datatype *type) { /* change pointer type */ struct task_dependency *in = (struct task_dependency *)in_p; struct task_dependency *out = (struct task_dependency *)out_p; /* Loop over all the current objects */ for (int i = 0; i < *len; i++) { /* loop over all the object set in invals */ for (int j = 0; j < MAX_NUMBER_DEP; j++) { /* Have we reached the end of the links? */ if (in[i].number_link[j] == -1) { break; } /* get a few variables */ int tb_type = in[i].type_out[j]; int tb_subtype = in[i].subtype_out[j]; #ifdef SWIFT_DEBUG_CHECKS /* Check tasks */ if (tb_type >= task_type_count) { error("Unknown task type %i", tb_type); } if (tb_subtype >= task_subtype_count) { error("Unknown subtask type %i", tb_subtype); } #endif /* find the corresponding id */ int k = 0; while (k < MAX_NUMBER_DEP) { /* have we reached the end of the links? */ if (out[i].number_link[k] == -1) { /* reset the counter in order to be safe */ out[i].number_link[k] = 0; out[i].number_rank[k] = 0; /* set the relation */ out[i].type_in = in[i].type_in; out[i].subtype_in = in[i].subtype_in; out[i].implicit_in = in[i].implicit_in; out[i].type_out[k] = in[i].type_out[j]; out[i].subtype_out[k] = in[i].subtype_out[j]; out[i].implicit_out[k] = in[i].implicit_out[j]; break; } /* do we have the same relation? */ if (out[i].type_out[k] == tb_type && out[i].subtype_out[k] == tb_subtype) { break; } k++; } /* Check if we are still in the memory */ if (k == MAX_NUMBER_DEP) { error("Not enough memory, please increase MAX_NUMBER_DEP"); } #ifdef SWIFT_DEBUG_CHECKS /* Check if correct relation */ if (out[i].type_in != in[i].type_in || out[i].subtype_in != in[i].subtype_in || out[i].implicit_in != in[i].implicit_in || out[i].type_out[k] != in[i].type_out[j] || out[i].subtype_out[k] != in[i].subtype_out[j] || out[i].implicit_out[k] != in[i].implicit_out[j]) { error("Tasks do not correspond"); } #endif /* sum the contributions */ out[i].number_link[k] += in[i].number_link[j]; out[i].number_rank[k] += in[i].number_rank[j]; /* Get the task in level */ out[i].task_in_is_top = min(out[i].task_in_is_top, in[i].task_in_is_top); out[i].task_in_is_hydro_super = min(out[i].task_in_is_hydro_super, in[i].task_in_is_hydro_super); out[i].task_in_is_grav_super = min(out[i].task_in_is_grav_super, in[i].task_in_is_grav_super); /* Get the task out level */ out[i].task_out_is_top[j] = min(out[i].task_out_is_top[j], in[i].task_out_is_top[j]); out[i].task_out_is_hydro_super[j] = min(out[i].task_out_is_hydro_super[j], in[i].task_out_is_hydro_super[j]); out[i].task_out_is_grav_super[j] = min(out[i].task_out_is_grav_super[j], in[i].task_out_is_grav_super[j]); } } return; } #endif // WITH_MPI /** * @brief Write a csv file with the task dependencies. * * Run plot_task_dependencies.py for an example of how to use it * to generate the figure. * * @param s The #scheduler we are working in. * @param verbose Are we verbose about this? * @param step The current step number. */ void scheduler_write_dependencies(struct scheduler *s, int verbose, int step) { const ticks tic = getticks(); /* Number of possible relations between tasks */ const int nber_tasks = task_type_count * task_subtype_count; /* To get the table for a task: * ind = (ta * task_subtype_count + sa) * where ta is the value of task_type and sa is the value of * task_subtype */ struct task_dependency *task_dep = (struct task_dependency *)malloc( nber_tasks * sizeof(struct task_dependency)); /* keep track of whether a task exists in this run */ int *task_exists = (int *)malloc(nber_tasks * sizeof(int)); /* keep track of whether a task has a dependency or an unlock, * and hence will be drawn in the task graph */ int *task_has_deps = (int *)malloc(nber_tasks * sizeof(int)); /* Special marker for tasks with no dependencies. */ const int no_dependency = -3; if (task_dep == NULL) error("Error allocating memory for task-dependency graph (table)."); /* Reset counter */ for (int i = 0; i < nber_tasks; i++) { /* Assume that the tasks are at all levels and correct later. */ task_dep[i].task_in_is_top = 1; task_dep[i].task_in_is_grav_super = 1; task_dep[i].task_in_is_hydro_super = 1; const int tt = i / task_subtype_count; const int tst = i % task_subtype_count; task_dep[i].type_in = tt; task_dep[i].subtype_in = tst; for (int j = 0; j < MAX_NUMBER_DEP; j++) { /* Use number_link as indicator of the existance of a relation */ task_dep[i].number_link[j] = -1; /* Assume that the tasks are at all levels and correct later. */ task_dep[i].task_out_is_top[j] = 1; task_dep[i].task_out_is_grav_super[j] = 1; task_dep[i].task_out_is_hydro_super[j] = 1; } task_exists[i] = 0; task_has_deps[i] = 0; } /* loop over all tasks */ for (int i = 0; i < s->nr_tasks; i++) { const struct task *ta = &s->tasks[i]; /* Are we using this task? * For the 0-step, we wish to show all the tasks (even the inactives). */ if (step != 0 && ta->skip) continue; /* Current index */ const int ind = ta->type * task_subtype_count + ta->subtype; struct task_dependency *cur = &task_dep[ind]; task_exists[ind]++; #ifdef SWIFT_DEBUG_CHECKS if (cur->type_in != ta->type) error("wrong indexing for task %d: Expect type %d got %d", i, cur->type_in, ta->type); if (cur->subtype_in != ta->subtype) error("wrong indexing for task %d: Expect subtype %d got %d", i, cur->subtype_in, ta->subtype); #endif /* Is ta implicit? */ cur->implicit_in = ta->implicit; /* Set the task level. */ const struct cell *ci = ta->ci; const struct cell *cj = ta->cj; const int is_ci_top = ci == NULL || (ci != NULL && ci == ci->top); const int is_cj_top = cj == NULL || (cj != NULL && cj == cj->top); const int is_hydro_super = (cj == NULL || (cj != NULL && cj == cj->hydro.super)) && (ci == NULL || (ci != NULL && ci == ci->hydro.super)); const int is_grav_super = (cj == NULL || (cj != NULL && cj == cj->grav.super)) && (ci == NULL || (ci != NULL && ci == ci->grav.super)); /* Are we dealing with a task at the top level? */ if (!(is_ci_top && is_cj_top)) { cur->task_in_is_top = 0; } /* At the hydro level? */ if (!is_hydro_super) { cur->task_in_is_hydro_super = 0; } /* At the gravity level? */ if (!is_grav_super) { cur->task_in_is_grav_super = 0; } /* If this task unlocks nothing, make a note of it. */ if (ta->nr_unlock_tasks == 0) { int k = 0; while (k < MAX_NUMBER_DEP) { /* not written yet */ if (cur->number_link[k] == -1) { cur->type_out[k] = no_dependency; cur->subtype_out[k] = no_dependency; cur->implicit_out[k] = no_dependency; /* statistics */ cur->number_link[k] = 0; cur->number_rank[k] = 1; /* Are we dealing with a task at the top level? */ cur->task_out_is_top[k] = no_dependency; cur->task_out_is_hydro_super[k] = no_dependency; cur->task_out_is_grav_super[k] = no_dependency; break; } /* already written */ if (cur->type_out[k] == no_dependency && cur->subtype_out[k] == no_dependency) { break; } k += 1; } /* if this task unlocks nothing, we have nothing left to do. * go to next. */ continue; } /* This task unlocks stuff, so check the dependencies */ for (int j = 0; j < ta->nr_unlock_tasks; j++) { const struct task *tb = ta->unlock_tasks[j]; /* Are we using this task? * For the 0-step, we wish to show all the tasks (even the inactive). */ if (step != 0 && tb->skip) continue; int indj = tb->type * task_subtype_count + tb->subtype; #ifdef SWIFT_DEBUG_CHECKS const struct task_dependency *target = &task_dep[indj]; if (target->type_in != tb->type) error("wrong indexing for task %d: Expect type %d got %d", i, target->type_in, tb->type); if (target->subtype_in != tb->subtype) error("wrong indexing for task %d: Expect subtype %d got %d", i, target->subtype_in, tb->subtype); #endif task_exists[indj]++; const struct cell *ci_b = tb->ci; const struct cell *cj_b = tb->cj; const int is_ci_b_top = ci_b == NULL || (ci_b != NULL && ci_b == ci_b->top); const int is_cj_b_top = cj_b == NULL || (cj_b != NULL && cj_b == cj_b->top); const int is_b_hydro_super = (cj_b == NULL || (cj_b != NULL && cj_b == cj_b->hydro.super)) && (ci_b == NULL || (ci_b != NULL && ci_b == ci_b->hydro.super)); const int is_b_grav_super = (cj_b == NULL || (cj_b != NULL && cj_b == cj_b->grav.super)) && (ci_b == NULL || (ci_b != NULL && ci_b == ci_b->grav.super)); int k = 0; while (k < MAX_NUMBER_DEP) { /* not written yet */ if (cur->number_link[k] == -1) { /* set tb */ cur->type_out[k] = tb->type; cur->subtype_out[k] = tb->subtype; cur->implicit_out[k] = tb->implicit; /* statistics */ cur->number_link[k] = 1; cur->number_rank[k] = 1; /* Are we dealing with a task at the top level? */ if (!(is_ci_b_top && is_cj_b_top)) { cur->task_out_is_top[k] = 0; } /* At the hydro level? */ if (!is_b_hydro_super) { cur->task_out_is_hydro_super[k] = 0; } /* At the gravity level? */ if (!is_b_grav_super) { cur->task_out_is_grav_super[k] = 0; } break; } /* already written */ if (cur->type_out[k] == tb->type && cur->subtype_out[k] == tb->subtype) { /* Increase the number of link. */ cur->number_link[k] += 1; /* Are we dealing with a task at the top level? */ if (!(is_ci_b_top && is_cj_b_top)) { cur->task_out_is_top[k] = 0; } /* At the hydro level? */ if (!is_b_hydro_super) { cur->task_out_is_hydro_super[k] = 0; } /* At the gravity level? */ if (!is_b_grav_super) { cur->task_out_is_grav_super[k] = 0; } break; } k += 1; } /* MAX_NUMBER_DEP is too small */ if (k == MAX_NUMBER_DEP) error("Not enough memory, please increase MAX_NUMBER_DEP"); } } #ifdef WITH_MPI /* create MPI operator */ MPI_Datatype dependency_data_type; task_dependency_define(&dependency_data_type); MPI_Op dependency_sum; MPI_Op_create(task_dependency_sum, /* commute */ 1, &dependency_sum); /* create recv buffer */ struct task_dependency *recv = NULL; int *recv_exists = NULL; if (s->nodeID == 0) { recv = (struct task_dependency *)malloc(nber_tasks * sizeof(struct task_dependency)); recv_exists = (int *)malloc(nber_tasks * sizeof(int)); /* reset counter */ for (int i = 0; i < nber_tasks; i++) { for (int j = 0; j < MAX_NUMBER_DEP; j++) { /* Use number_link as indicator of the existance of a relation */ recv[i].number_link[j] = -1; } recv_exists[i] = 0; } } /* Do the reduction */ int test = MPI_Reduce(task_dep, recv, nber_tasks, dependency_data_type, dependency_sum, 0, MPI_COMM_WORLD); if (test != MPI_SUCCESS) error("MPI reduce failed"); test = MPI_Reduce(task_exists, recv_exists, nber_tasks, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD); if (test != MPI_SUCCESS) error("MPI reduce failed"); /* free some memory */ if (s->nodeID == 0) { free(task_dep); task_dep = recv; free(task_exists); task_exists = recv_exists; } #endif if (s->nodeID == 0) { /* Create file */ char filename[50]; sprintf(filename, "dependency_graph_%i.csv", step); FILE *f = fopen(filename, "w"); if (f == NULL) error("Error opening dependency graph file."); /* Write header */ fprintf(f, "# %s\n", git_revision()); fprintf( f, "task_in,task_out,implicit_in,implicit_out,mpi_in,mpi_out,cluster_in," "cluster_out,number_link,number_rank,task_in_is_top,task_in_is_hydro_" "super,task_in_is_grav_super,task_out_is_top,task_out_is_hydro_super," "task_out_is_grav_super,cell_has_active_task\n"); for (int i = 0; i < nber_tasks; i++) { for (int j = 0; j < MAX_NUMBER_DEP; j++) { /* Does this link exists */ if (task_dep[i].number_link[j] == -1) continue; /* Don't write tasks without dependencies (yet) */ if (task_dep[i].type_out[j] == no_dependency) continue; /* Define a few variables */ const int ta_type = task_dep[i].type_in; const int ta_subtype = task_dep[i].subtype_in; const int ta_implicit = task_dep[i].implicit_in; const int tb_type = task_dep[i].type_out[j]; const int tb_subtype = task_dep[i].subtype_out[j]; const int tb_implicit = task_dep[i].implicit_out[j]; const int count = task_dep[i].number_link[j]; const int number_rank = task_dep[i].number_rank[j]; const int task_in_is_top = task_dep[i].task_in_is_top; const int task_in_is_grav_super = task_dep[i].task_in_is_grav_super; const int task_in_is_hydro_super = task_dep[i].task_in_is_hydro_super; const int task_out_is_top = task_dep[i].task_out_is_top[j]; const int task_out_is_grav_super = task_dep[i].task_out_is_grav_super[j]; const int task_out_is_hydro_super = task_dep[i].task_out_is_hydro_super[j]; /* text to write */ char ta_name[200]; char tb_name[200]; /* take note that these tasks have dependencies and unlocks */ task_has_deps[i]++; int indj = tb_type * task_subtype_count + tb_subtype; task_has_deps[indj]++; /* construct line */ task_get_full_name(ta_type, ta_subtype, ta_name); task_get_full_name(tb_type, tb_subtype, tb_name); /* Check if MPI */ int ta_mpi = 0; if (ta_type == task_type_send || ta_type == task_type_recv) ta_mpi = 1; int tb_mpi = 0; if (tb_type == task_type_send || tb_type == task_type_recv) tb_mpi = 1; /* Get group name */ char ta_cluster[20]; char tb_cluster[20]; task_get_group_name(ta_type, ta_subtype, ta_cluster); task_get_group_name(tb_type, tb_subtype, tb_cluster); fprintf(f, "%s,%s,%d,%d,%d,%d,%s,%s,%d,%d,%d,%d,%d,%d,%d,%d,%d\n", ta_name, tb_name, ta_implicit, tb_implicit, ta_mpi, tb_mpi, ta_cluster, tb_cluster, count, number_rank, task_in_is_top, task_in_is_hydro_super, task_in_is_grav_super, task_out_is_top, task_out_is_hydro_super, task_out_is_grav_super, /*cell_has_active_task=*/1); } } /* Now write the tasks without dependencies */ for (int i = 0; i < nber_tasks; i++) { /* There may be several tasks that don't unlock anything, * e.g. timestep_collect or kick1 tasks. Those are covered * in the graph by them being unlocked by some other task. * If a task however doesn't unlock anything, nor is unlocked * by any other task, it needs special treatment, which it * receives now. The condition to be written down is a) the * task must exist, i.e. we must have encountered it in the * list of tasks, and b) it must not have been unlocked by * anyting. */ if (task_exists[i] && !task_has_deps[i]) { /* Define a few variables */ const int ta_type = task_dep[i].type_in; const int ta_subtype = task_dep[i].subtype_in; const int ta_implicit = task_dep[i].implicit_in; const int tb_implicit = 0; const int count = 0; const int number_rank = 1; const int task_in_is_top = task_dep[i].task_in_is_top; const int task_in_is_grav_super = task_dep[i].task_in_is_grav_super; const int task_in_is_hydro_super = task_dep[i].task_in_is_hydro_super; const int task_out_is_top = -1; const int task_out_is_grav_super = -1; const int task_out_is_hydro_super = -1; /* text to write */ char ta_name[200]; task_get_full_name(ta_type, ta_subtype, ta_name); char *tb_name = "task_unlocks_nothing\0"; /* Check if MPI */ int ta_mpi = 0; if (ta_type == task_type_send || ta_type == task_type_recv) ta_mpi = 1; int tb_mpi = 0; /* Get group name */ char ta_cluster[20]; task_get_group_name(ta_type, ta_subtype, ta_cluster); char *tb_cluster = "None\0"; fprintf(f, "%s,%s,%d,%d,%d,%d,%s,%s,%d,%d,%d,%d,%d,%d,%d,%d,%d\n", ta_name, tb_name, ta_implicit, tb_implicit, ta_mpi, tb_mpi, ta_cluster, tb_cluster, count, number_rank, task_in_is_top, task_in_is_hydro_super, task_in_is_grav_super, task_out_is_top, task_out_is_hydro_super, task_out_is_grav_super, /*cell_has_active_task=*/1); } } /* Close the file */ fclose(f); } #if defined(SWIFT_DEBUG_CHECKS) /* Check if we have the correct number of dependencies. */ if (step == 0) { int count_total = 0; for (int i = 0; i < nber_tasks; i++) { for (int j = 0; j < MAX_NUMBER_DEP; j++) { if (task_dep[i].number_link[j] != -1) count_total += task_dep[i].number_link[j]; } } /* Get the number of unlocks from all the ranks */ int nr_unlocks = s->nr_unlocks; #ifdef WITH_MPI MPI_Allreduce(MPI_IN_PLACE, &nr_unlocks, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); #endif if (s->nodeID == 0 && count_total != nr_unlocks) { error("Not all the dependencies were found: %i != %i", count_total, nr_unlocks); } } #endif /* Be clean */ free(task_dep); free(task_exists); free(task_has_deps); #ifdef WITH_MPI MPI_Type_free(&dependency_data_type); MPI_Op_free(&dependency_sum); #endif if (verbose) message("Printing task graph took %.3f %s.", clocks_from_ticks(getticks() - tic), clocks_getunit()); } /** * @brief Write a csv file with the task dependencies for a single cell. * * Run plot_task_dependencies.py for an example of how to use it * to generate the figure. * * @param s The #scheduler we are working in. * @param verbose Are we verbose about this? * @param step The current step number. */ void scheduler_write_cell_dependencies(struct scheduler *s, int verbose, int step) { #if defined(SWIFT_DEBUG_CHECKS) || defined(SWIFT_CELL_GRAPH) const ticks tic = getticks(); const long long cellID = s->dependency_graph_cellID; if (cellID == 0LL) return; /* Number of possible relations between tasks */ const int nber_tasks = task_type_count * task_subtype_count; /* To get the table for a task: * ind = (ta * task_subtype_count + sa) * where ta is the value of task_type and sa is the value of * task_subtype */ struct task_dependency *task_dep = (struct task_dependency *)malloc( nber_tasks * sizeof(struct task_dependency)); /* Keep track whether the requested cell is also involved in the * dependency */ int cell_involved[nber_tasks][MAX_NUMBER_DEP]; if (task_dep == NULL) error("Error allocating memory for task-dependency graph (table)."); /* Reset counter */ for (int i = 0; i < nber_tasks; i++) { /* Assume that the tasks are at all levels and correct later. */ task_dep[i].task_in_is_top = 1; task_dep[i].task_in_is_grav_super = 1; task_dep[i].task_in_is_hydro_super = 1; const int tt = i / task_subtype_count; const int tst = i % task_subtype_count; task_dep[i].type_in = tt; task_dep[i].subtype_in = tst; for (int j = 0; j < MAX_NUMBER_DEP; j++) { /* Use number_link as indicator of the existance of a relation */ task_dep[i].number_link[j] = -1; /* Assume that the tasks are at all levels and correct later. */ task_dep[i].task_out_is_top[j] = 1; task_dep[i].task_out_is_grav_super[j] = 1; task_dep[i].task_out_is_hydro_super[j] = 1; cell_involved[i][j] = 0; } } /* loop over all tasks */ int local_count = 0; for (int i = 0; i < s->nr_tasks; i++) { const struct task *ta = &s->tasks[i]; /* Are we using this task? * For the 0-step, we wish to show all the tasks (even the inactives). */ if (step != 0 && ta->skip) continue; /* Note: task_type_none may have t->ci==NULL too */ if (!(((ta->ci != NULL) && ta->ci->cellID == cellID) || ((ta->cj != NULL) && ta->cj->cellID == cellID))) continue; /* Current index */ const int ind = ta->type * task_subtype_count + ta->subtype; struct task_dependency *cur = &task_dep[ind]; #ifdef SWIFT_DEBUG_CHECKS if (cur->type_in != ta->type) error("wrong indexing for task %d: Expect type %d got %d", i, cur->type_in, ta->type); if (cur->subtype_in != ta->subtype) error("wrong indexing for task %d: Expect subtype %d got %d", i, cur->subtype_in, ta->subtype); #endif /* Is ta implicit? */ cur->implicit_in = ta->implicit; /* Set the task level. */ const struct cell *ci = ta->ci; const struct cell *cj = ta->cj; const int is_ci_top = ci == NULL || (ci != NULL && ci == ci->top); const int is_cj_top = cj == NULL || (cj != NULL && cj == cj->top); const int is_hydro_super = (cj == NULL || (cj != NULL && cj == cj->hydro.super)) && (ci == NULL || (ci != NULL && ci == ci->hydro.super)); const int is_grav_super = (cj == NULL || (cj != NULL && cj == cj->grav.super)) && (ci == NULL || (ci != NULL && ci == ci->grav.super)); /* Are we dealing with a task at the top level? */ if (!(is_ci_top && is_cj_top)) { cur->task_in_is_top = 0; } /* At the hydro level? */ if (!is_hydro_super) { cur->task_in_is_hydro_super = 0; } /* At the gravity level? */ if (!is_grav_super) { cur->task_in_is_grav_super = 0; } /* and their dependencies */ for (int j = 0; j < ta->nr_unlock_tasks; j++) { const struct task *tb = ta->unlock_tasks[j]; /* Are we using this task? * For the 0-step, we wish to show all the tasks (even the inactive). */ if (step != 0 && tb->skip) continue; /* Found a task with a dependency. */ local_count++; const struct cell *ci_b = tb->ci; const struct cell *cj_b = tb->cj; const int is_ci_b_top = ci_b == NULL || (ci_b != NULL && ci_b == ci_b->top); const int is_cj_b_top = cj_b == NULL || (cj_b != NULL && cj_b == cj_b->top); const int is_b_hydro_super = (cj_b == NULL || (cj_b != NULL && cj_b == cj_b->hydro.super)) && (ci_b == NULL || (ci_b != NULL && ci_b == ci_b->hydro.super)); const int is_b_grav_super = (cj_b == NULL || (cj_b != NULL && cj_b == cj_b->grav.super)) && (ci_b == NULL || (ci_b != NULL && ci_b == ci_b->grav.super)); int k = 0; while (k < MAX_NUMBER_DEP) { /* not written yet */ if (cur->number_link[k] == -1) { /* set tb */ cur->type_out[k] = tb->type; cur->subtype_out[k] = tb->subtype; cur->implicit_out[k] = tb->implicit; /* statistics */ cur->number_link[k] = 1; cur->number_rank[k] = 1; /* Are we dealing with a task at the top level? */ if (!(is_ci_b_top && is_cj_b_top)) { cur->task_out_is_top[k] = 0; } /* At the hydro level? */ if (!is_b_hydro_super) { cur->task_out_is_hydro_super[k] = 0; } /* At the gravity level? */ if (!is_b_grav_super) { cur->task_out_is_grav_super[k] = 0; } if (ci_b->cellID == cellID) cell_involved[ind][k]++; if (cj_b != NULL && cj_b->cellID == cellID) cell_involved[ind][k]++; break; } /* already written */ if (cur->type_out[k] == tb->type && cur->subtype_out[k] == tb->subtype) { /* Increase the number of link. */ cur->number_link[k] += 1; /* Are we dealing with a task at the top level? */ if (!(is_ci_b_top && is_cj_b_top)) { cur->task_out_is_top[k] = 0; } /* At the hydro level? */ if (!is_b_hydro_super) { cur->task_out_is_hydro_super[k] = 0; } /* At the gravity level? */ if (!is_b_grav_super) { cur->task_out_is_grav_super[k] = 0; } if (ci_b->cellID == cellID) cell_involved[ind][k]++; if (cj_b != NULL && cj_b->cellID == cellID) cell_involved[ind][k]++; break; } k += 1; } /* MAX_NUMBER_DEP is too small */ if (k == MAX_NUMBER_DEP) error("Not enough memory, please increase MAX_NUMBER_DEP"); } /* Some tasks might not unlock anything, like the kick1 tasks. This is * expected, and they should turn up in the task graph because they are * being unlocked by some other task. However, if a dev missed a * dependency and has tasks with no unlocks nor dependencies, they * wouldn't show up in the graph. So we write these tasks with no * unlocks down too, but as a special case. */ if (ta->nr_unlock_tasks == 0) { cur->number_link[0] = 0; cur->type_out[0] = -1; cur->subtype_out[0] = -1; cur->implicit_out[0] = -1; cur->number_rank[0] = 1; cell_involved[ind][0] = 1; } } if (local_count > 0) { /* We have tasks involving the requested cell on this node */ /* Create file */ char filename[50]; sprintf(filename, "dependency_graph_cell_%lld_step_%i_rank_%i.csv", cellID, step, engine_rank); FILE *f = fopen(filename, "w"); if (f == NULL) error("Error opening dependency graph file."); /* Write header */ fprintf(f, "# %s\n", git_revision()); fprintf( f, "task_in,task_out,implicit_in,implicit_out,mpi_in,mpi_out,cluster_in," "cluster_out,number_link,number_rank,task_in_is_top,task_in_is_hydro_" "super,task_in_is_grav_super,task_out_is_top,task_out_is_hydro_super," "task_out_is_grav_super,cell_has_active_task\n"); for (int i = 0; i < nber_tasks; i++) { for (int j = 0; j < MAX_NUMBER_DEP; j++) { /* Does this link exists */ if (task_dep[i].number_link[j] == -1) { continue; } /* Define a few variables */ const int ta_type = task_dep[i].type_in; const int ta_subtype = task_dep[i].subtype_in; const int ta_implicit = task_dep[i].implicit_in; const int tb_type = task_dep[i].type_out[j]; const int tb_subtype = task_dep[i].subtype_out[j]; const int tb_implicit = task_dep[i].implicit_out[j]; const int count = task_dep[i].number_link[j]; const int number_rank = task_dep[i].number_rank[j]; const int task_in_is_top = task_dep[i].task_in_is_top; const int task_in_is_grav_super = task_dep[i].task_in_is_grav_super; const int task_in_is_hydro_super = task_dep[i].task_in_is_hydro_super; const int task_out_is_top = task_dep[i].task_out_is_top[j]; const int task_out_is_grav_super = task_dep[i].task_out_is_grav_super[j]; const int task_out_is_hydro_super = task_dep[i].task_out_is_hydro_super[j]; /* text to write */ char ta_name[200]; char tb_name[200]; /* construct line */ task_get_full_name(ta_type, ta_subtype, ta_name); if (tb_type == -1) { /* special handling of tasks which have no unlocks */ strcpy(tb_name, "task_unlocks_nothing"); } else { task_get_full_name(tb_type, tb_subtype, tb_name); } /* Check if MPI */ int ta_mpi = 0; if (ta_type == task_type_send || ta_type == task_type_recv) ta_mpi = 1; int tb_mpi = 0; if (tb_type == task_type_send || tb_type == task_type_recv) tb_mpi = 1; /* Get group name */ char ta_cluster[20]; char tb_cluster[20]; task_get_group_name(ta_type, ta_subtype, ta_cluster); task_get_group_name(tb_type, tb_subtype, tb_cluster); fprintf(f, "%s,%s,%d,%d,%d,%d,%s,%s,%d,%d,%d,%d,%d,%d,%d,%d,%d\n", ta_name, tb_name, ta_implicit, tb_implicit, ta_mpi, tb_mpi, ta_cluster, tb_cluster, count, number_rank, task_in_is_top, task_in_is_hydro_super, task_in_is_grav_super, task_out_is_top, task_out_is_hydro_super, task_out_is_grav_super, cell_involved[i][j]); } } /* Close the file */ fclose(f); } /* Clean up after yourself */ free(task_dep); if (verbose) message("Printing task graph took %.3f %s.", clocks_from_ticks(getticks() - tic), clocks_getunit()); #endif /* defined SWIFT_DEBUG_CHECKS || defined CELL_GRAPH */ } /** * @brief Split a hydrodynamic task if too large. * * @param t The #task * @param s The #scheduler we are working in. */ static void scheduler_splittask_hydro(struct task *t, struct scheduler *s) { /* Are we considering both stars and hydro when splitting? */ /* Note this is not very clean as the scheduler should not really access the engine... */ const int with_feedback = (s->space->e->policy & engine_policy_feedback); const int with_stars = (s->space->e->policy & engine_policy_stars); const int with_sinks = (s->space->e->policy & engine_policy_sinks); const int with_black_holes = (s->space->e->policy & engine_policy_black_holes); /* Iterate on this task until we're done with it. */ int redo = 1; while (redo) { /* Reset the redo flag. */ redo = 0; /* Is this a non-empty self-task? */ const int is_self = (t->type == task_type_self) && (t->ci != NULL) && ((t->ci->hydro.count > 0) || (with_stars && t->ci->stars.count > 0) || (with_sinks && t->ci->sinks.count > 0) || (with_black_holes && t->ci->black_holes.count > 0)); /* Is this a non-empty pair-task? */ const int is_pair = (t->type == task_type_pair) && (t->ci != NULL) && (t->cj != NULL) && ((t->ci->hydro.count > 0) || (with_feedback && t->ci->stars.count > 0) || (with_sinks && t->ci->sinks.count > 0) || (with_black_holes && t->ci->black_holes.count > 0)) && ((t->cj->hydro.count > 0) || (with_feedback && t->cj->stars.count > 0) || (with_sinks && t->cj->sinks.count > 0) || (with_black_holes && t->cj->black_holes.count > 0)); /* Empty task? */ if (!is_self && !is_pair) { t->type = task_type_none; t->subtype = task_subtype_none; t->ci = NULL; t->cj = NULL; t->skip = 1; break; } /* Self-interaction? */ if (t->type == task_type_self) { /* Get a handle on the cell involved. */ struct cell *ci = t->ci; /* Foreign task? */ if (ci->nodeID != s->nodeID) { t->skip = 1; break; } /* Is this cell even split and the task does not violate h ? */ if (cell_can_split_self_hydro_task(ci)) { /* Make a sub? */ if (scheduler_dosub && (ci->hydro.count < space_subsize_self_hydro) && (ci->stars.count < space_subsize_self_stars)) { /* convert to a self-subtask. */ t->type = task_type_sub_self; /* Otherwise, make tasks explicitly. */ } else { /* Take a step back (we're going to recycle the current task)... */ redo = 1; /* Add the self tasks. */ int first_child = 0; while (ci->progeny[first_child] == NULL) first_child++; t->ci = ci->progeny[first_child]; cell_set_flag(t->ci, cell_flag_has_tasks); for (int k = first_child + 1; k < 8; k++) { /* Do we have a non-empty progenitor? */ if (ci->progeny[k] != NULL && (ci->progeny[k]->hydro.count || (with_stars && ci->progeny[k]->stars.count))) { scheduler_splittask_hydro( scheduler_addtask(s, task_type_self, t->subtype, 0, 0, ci->progeny[k], NULL), s); } } /* Make a task for each pair of progeny */ for (int j = 0; j < 8; j++) { /* Do we have a non-empty progenitor? */ if (ci->progeny[j] != NULL && (ci->progeny[j]->hydro.count || (with_feedback && ci->progeny[j]->stars.count))) { for (int k = j + 1; k < 8; k++) { /* Do we have a second non-empty progenitor? */ if (ci->progeny[k] != NULL && (ci->progeny[k]->hydro.count || (with_feedback && ci->progeny[k]->stars.count))) { scheduler_splittask_hydro( scheduler_addtask(s, task_type_pair, t->subtype, sub_sid_flag[j][k], 0, ci->progeny[j], ci->progeny[k]), s); } } } } } } /* Cell is split */ } /* Self interaction */ /* Pair interaction? */ else if (t->type == task_type_pair) { /* Get a handle on the cells involved. */ struct cell *ci = t->ci; struct cell *cj = t->cj; /* Foreign task? */ if (ci->nodeID != s->nodeID && cj->nodeID != s->nodeID) { t->skip = 1; break; } /* Get the sort ID, use space_getsid_and_swap_cells and not t->flags to make sure we get ci and cj swapped if needed. */ double shift[3]; const int sid = space_getsid_and_swap_cells(s->space, &ci, &cj, shift); #ifdef SWIFT_DEBUG_CHECKS if (sid != t->flags) error("Got pair task with incorrect flags: sid=%d flags=%lld", sid, t->flags); #endif /* Should this task be split-up? */ if (cell_can_split_pair_hydro_task(ci) && cell_can_split_pair_hydro_task(cj)) { const int h_count_i = ci->hydro.count; const int h_count_j = cj->hydro.count; const int s_count_i = ci->stars.count; const int s_count_j = cj->stars.count; int do_sub_hydro = 1; int do_sub_stars_i = 1; int do_sub_stars_j = 1; if (h_count_i > 0 && h_count_j > 0) { /* Note: Use division to avoid integer overflow. */ do_sub_hydro = h_count_i * sid_scale[sid] < space_subsize_pair_hydro / h_count_j; } if (s_count_i > 0 && h_count_j > 0) { /* Note: Use division to avoid integer overflow. */ do_sub_stars_i = s_count_i * sid_scale[sid] < space_subsize_pair_stars / h_count_j; } if (s_count_j > 0 && h_count_i > 0) { /* Note: Use division to avoid integer overflow. */ do_sub_stars_j = s_count_j * sid_scale[sid] < space_subsize_pair_stars / h_count_i; } /* Replace by a single sub-task? */ if (scheduler_dosub && (do_sub_hydro && do_sub_stars_i && do_sub_stars_j) && !sort_is_corner(sid)) { /* Make this task a sub task. */ t->type = task_type_sub_pair; /* Otherwise, split it. */ } else { /* Take a step back (we're going to recycle the current task)... */ redo = 1; /* Loop over the sub-cell pairs for the current sid and add new tasks * for them. */ struct cell_split_pair *csp = &cell_split_pairs[sid]; t->ci = ci->progeny[csp->pairs[0].pid]; t->cj = cj->progeny[csp->pairs[0].pjd]; if (t->ci != NULL) cell_set_flag(t->ci, cell_flag_has_tasks); if (t->cj != NULL) cell_set_flag(t->cj, cell_flag_has_tasks); t->flags = csp->pairs[0].sid; for (int k = 1; k < csp->count; k++) { scheduler_splittask_hydro( scheduler_addtask(s, task_type_pair, t->subtype, csp->pairs[k].sid, 0, ci->progeny[csp->pairs[k].pid], cj->progeny[csp->pairs[k].pjd]), s); } } /* Otherwise, break it up if it is too large? */ } else if (scheduler_doforcesplit && ci->split && cj->split && (ci->hydro.count > space_maxsize / cj->hydro.count)) { // message( "force splitting pair with %i and %i parts." , // ci->hydro.count , cj->hydro.count ); /* Replace the current task. */ t->type = task_type_none; for (int j = 0; j < 8; j++) if (ci->progeny[j] != NULL && ci->progeny[j]->hydro.count) for (int k = 0; k < 8; k++) if (cj->progeny[k] != NULL && cj->progeny[k]->hydro.count) { struct task *tl = scheduler_addtask(s, task_type_pair, t->subtype, 0, 0, ci->progeny[j], cj->progeny[k]); scheduler_splittask_hydro(tl, s); tl->flags = space_getsid_and_swap_cells(s->space, &t->ci, &t->cj, shift); } } } /* pair interaction? */ } /* iterate over the current task. */ } /** * @brief Split a gravity task if too large. * * @param t The #task * @param s The #scheduler we are working in. */ static void scheduler_splittask_gravity(struct task *t, struct scheduler *s) { const struct space *sp = s->space; struct engine *e = sp->e; /* Iterate on this task until we're done with it. */ int redo = 1; while (redo) { /* Reset the redo flag. */ redo = 0; /* Non-splittable task? */ if ((t->ci == NULL) || (t->type == task_type_pair && t->cj == NULL)) { t->type = task_type_none; t->subtype = task_subtype_none; t->ci = NULL; t->cj = NULL; t->skip = 1; break; } /* Self-interaction? */ if (t->type == task_type_self) { /* Get a handle on the cell involved. */ const struct cell *ci = t->ci; /* Foreign task? */ if (ci->nodeID != s->nodeID) { t->skip = 1; break; } /* Should we split this task? */ if (cell_can_split_self_gravity_task(ci)) { if (scheduler_dosub && ci->grav.count < space_subsize_self_grav) { /* Otherwise, split it. */ } else { /* Take a step back (we're going to recycle the current task)... */ redo = 1; /* Add the self tasks. */ int first_child = 0; while (ci->progeny[first_child] == NULL) first_child++; t->ci = ci->progeny[first_child]; cell_set_flag(t->ci, cell_flag_has_tasks); for (int k = first_child + 1; k < 8; k++) if (ci->progeny[k] != NULL) scheduler_splittask_gravity( scheduler_addtask(s, task_type_self, t->subtype, 0, 0, ci->progeny[k], NULL), s); /* Make a task for each pair of progeny */ if (t->subtype != task_subtype_external_grav) { for (int j = 0; j < 8; j++) if (ci->progeny[j] != NULL) for (int k = j + 1; k < 8; k++) if (ci->progeny[k] != NULL) scheduler_splittask_gravity( scheduler_addtask(s, task_type_pair, t->subtype, sub_sid_flag[j][k], 0, ci->progeny[j], ci->progeny[k]), s); } /* Self-gravity only */ } /* Make tasks explicitly */ } /* Cell is split */ } /* Self interaction */ /* Pair interaction? */ else if (t->type == task_type_pair) { /* Get a handle on the cells involved. */ struct cell *ci = t->ci; struct cell *cj = t->cj; /* Foreign task? */ if (ci->nodeID != s->nodeID && cj->nodeID != s->nodeID) { t->skip = 1; break; } /* Should this task be split-up? */ if (cell_can_split_pair_gravity_task(ci) && cell_can_split_pair_gravity_task(cj)) { const long long gcount_i = ci->grav.count; const long long gcount_j = cj->grav.count; /* Replace by a single sub-task? */ if (scheduler_dosub && gcount_i * gcount_j < ((long long)space_subsize_pair_grav)) { /* Otherwise, split it. */ } else { /* Turn the task into a M-M task that will take care of all the * progeny pairs */ t->type = task_type_grav_mm; t->subtype = task_subtype_none; t->flags = 0; /* Make a task for every other pair of progeny */ for (int i = 0; i < 8; i++) { if (ci->progeny[i] != NULL) { for (int j = 0; j < 8; j++) { if (cj->progeny[j] != NULL) { /* Can we use a M-M interaction here? */ if (cell_can_use_pair_mm(ci->progeny[i], cj->progeny[j], e, sp, /*use_rebuild_data=*/1, /*is_tree_walk=*/1)) { /* Flag this pair as being treated by the M-M task. * We use the 64 bits in the task->flags field to store * this information. The corresponding taks will unpack * the information and operate according to the choices * made here. */ const int flag = i * 8 + j; t->flags |= (1ULL << flag); } else { /* Ok, we actually have to create a task */ scheduler_splittask_gravity( scheduler_addtask(s, task_type_pair, task_subtype_grav, 0, 0, ci->progeny[i], cj->progeny[j]), s); } } } } } /* Can none of the progenies use M-M calculations? */ if (t->flags == 0) { t->type = task_type_none; t->subtype = task_subtype_none; t->ci = NULL; t->cj = NULL; t->skip = 1; } } /* Split the pair */ } } /* pair interaction? */ } /* iterate over the current task. */ } /** * @brief Split a FOF task if too large. * * @param t The #task * @param s The #scheduler we are working in. */ static void scheduler_splittask_fof(struct task *t, struct scheduler *s) { /* Iterate on this task until we're done with it. */ int redo = 1; while (redo) { /* Reset the redo flag. */ redo = 0; /* Non-splittable task? */ if ((t->ci == NULL) || (t->type == task_type_fof_pair && t->cj == NULL) || t->ci->grav.count == 0 || (t->cj != NULL && t->cj->grav.count == 0)) { t->type = task_type_none; t->subtype = task_subtype_none; t->ci = NULL; t->cj = NULL; t->skip = 1; break; } /* Self-interaction? */ if (t->type == task_type_fof_self) { /* Get a handle on the cell involved. */ struct cell *ci = t->ci; /* Foreign task? */ if (ci->nodeID != s->nodeID) { t->skip = 1; break; } /* Is this cell even split? */ if (cell_can_split_self_fof_task(ci)) { /* Take a step back (we're going to recycle the current task)... */ redo = 1; /* Add the self tasks. */ int first_child = 0; while (ci->progeny[first_child] == NULL) first_child++; t->ci = ci->progeny[first_child]; for (int k = first_child + 1; k < 8; k++) if (ci->progeny[k] != NULL && ci->progeny[k]->grav.count) scheduler_splittask_fof( scheduler_addtask(s, task_type_fof_self, t->subtype, 0, 0, ci->progeny[k], NULL), s); /* Make a task for each pair of progeny */ for (int j = 0; j < 8; j++) if (ci->progeny[j] != NULL && ci->progeny[j]->grav.count) for (int k = j + 1; k < 8; k++) if (ci->progeny[k] != NULL && ci->progeny[k]->grav.count) scheduler_splittask_fof( scheduler_addtask(s, task_type_fof_pair, t->subtype, 0, 0, ci->progeny[j], ci->progeny[k]), s); } /* Cell is split */ } /* Self interaction */ } /* iterate over the current task. */ } /** * @brief Mapper function to split FOF tasks that may be too large. * * @param map_data the tasks to process * @param num_elements the number of tasks. * @param extra_data The #scheduler we are working in. */ void scheduler_splittasks_fof_mapper(void *map_data, int num_elements, void *extra_data) { /* Extract the parameters. */ struct scheduler *s = (struct scheduler *)extra_data; struct task *tasks = (struct task *)map_data; for (int ind = 0; ind < num_elements; ind++) { struct task *t = &tasks[ind]; /* Invoke the correct splitting strategy */ if (t->type == task_type_fof_self || t->type == task_type_fof_pair) { scheduler_splittask_fof(t, s); } } } /** * @brief Mapper function to split non-FOF tasks that may be too large. * * @param map_data the tasks to process * @param num_elements the number of tasks. * @param extra_data The #scheduler we are working in. */ void scheduler_splittasks_mapper(void *map_data, int num_elements, void *extra_data) { /* Extract the parameters. */ struct scheduler *s = (struct scheduler *)extra_data; struct task *tasks = (struct task *)map_data; for (int ind = 0; ind < num_elements; ind++) { struct task *t = &tasks[ind]; /* Invoke the correct splitting strategy */ if (t->subtype == task_subtype_density) { scheduler_splittask_hydro(t, s); } else if (t->subtype == task_subtype_external_grav) { scheduler_splittask_gravity(t, s); } else if (t->subtype == task_subtype_grav) { scheduler_splittask_gravity(t, s); } else { #ifdef SWIFT_DEBUG_CHECKS error("Unexpected task sub-type %s/%s", taskID_names[t->type], subtaskID_names[t->subtype]); #endif } } } /** * @brief Splits all the tasks in the scheduler that are too large. * * @param s The #scheduler. * @param fof_tasks Are we splitting the FOF tasks (1)? Or the regular tasks * (0)? * @param verbose Are we talkative? */ void scheduler_splittasks(struct scheduler *s, const int fof_tasks, const int verbose) { if (verbose) { message("space_subsize_self_hydro= %d", space_subsize_self_hydro); message("space_subsize_pair_hydro= %d", space_subsize_pair_hydro); message("space_subsize_self_stars= %d", space_subsize_self_stars); message("space_subsize_pair_stars= %d", space_subsize_pair_stars); message("space_subsize_self_grav= %d", space_subsize_self_grav); message("space_subsize_pair_grav= %d", space_subsize_pair_grav); } if (fof_tasks) { /* Call the mapper on each current task. */ threadpool_map(s->threadpool, scheduler_splittasks_fof_mapper, s->tasks, s->nr_tasks, sizeof(struct task), threadpool_auto_chunk_size, s); } else { /* Call the mapper on each current task. */ threadpool_map(s->threadpool, scheduler_splittasks_mapper, s->tasks, s->nr_tasks, sizeof(struct task), threadpool_auto_chunk_size, s); } } /** * @brief Add a #task to the #scheduler. * * @param s The #scheduler we are working in. * @param type The type of the task. * @param subtype The sub-type of the task. * @param flags The flags of the task. * @param implicit If true, only use this task to unlock dependencies, i.e. * this task is never enqueued. * @param ci The first cell to interact. * @param cj The second cell to interact. */ struct task *scheduler_addtask(struct scheduler *s, enum task_types type, enum task_subtypes subtype, long long flags, int implicit, struct cell *ci, struct cell *cj) { /* Get the next free task. */ const int ind = atomic_inc(&s->tasks_next); /* Overflow? */ if (ind >= s->size) error( "Task list overflow (%d). Need to increase " "Scheduler:tasks_per_cell.", ind); /* Get a pointer to the new task. */ struct task *t = &s->tasks[ind]; /* Copy the data. */ t->type = type; t->subtype = subtype; t->flags = flags; t->wait = 0; t->ci = ci; t->cj = cj; t->skip = 1; /* Mark tasks as skip by default. */ t->implicit = implicit; t->weight = 0; t->rank = 0; t->nr_unlock_tasks = 0; #ifdef SWIFT_DEBUG_TASKS t->rid = -1; #endif t->tic = 0; t->toc = 0; t->total_ticks = 0; if (ci != NULL) cell_set_flag(ci, cell_flag_has_tasks); if (cj != NULL) cell_set_flag(cj, cell_flag_has_tasks); /* Add an index for it. */ // lock_lock( &s->lock ); s->tasks_ind[atomic_inc(&s->nr_tasks)] = ind; // lock_unlock_blind( &s->lock ); /* Return a pointer to the new task. */ return t; } struct unlock_extra_data { struct scheduler *s; int *counts; int *offsets; struct task **unlocks; struct task **scheduler_unlocks; }; /** * @brief Cound the number of tasks unlocking each task * * @param map_data the index of unlocks in this pool thread. * @param num_elements the number of indexes in this pool thread * @param extra_data The scheduler and the count array. */ void scheduler_set_unlock_counts_mapper(void *map_data, int num_elements, void *extra_data) { struct unlock_extra_data *data = (struct unlock_extra_data *)extra_data; int *counts = (int *)data->counts; struct scheduler *s = data->s; int *volatile unlock_ind = (int *)map_data; for (int k = 0; k < num_elements; k++) { atomic_inc(&counts[unlock_ind[k]]); /* Check that we are not overflowing */ if (counts[unlock_ind[k]] < 0) error( "Task (type=%s/%s) unlocking more than %lld other tasks!\n" "This likely a result of having tasks at vastly different levels" "in the tree.\nYou may want to play with the 'Scheduler' " "parameters to modify the task splitting strategy and reduce" "the difference in task depths.", taskID_names[s->tasks[unlock_ind[k]].type], subtaskID_names[s->tasks[unlock_ind[k]].subtype], (1LL << (8 * sizeof(int) - 1)) - 1); } } /** * @brief Cound the number of tasks unlocking each task * * @param map_data the index of unlocks in this pool thread. * @param num_elements the number of indexes in this pool thread * @param extra_data The scheduler and the list of offsets */ void scheduler_set_unlock_sorts_mapper(void *map_data, int num_elements, void *extra_data) { struct unlock_extra_data *data = (struct unlock_extra_data *)extra_data; const struct scheduler *s = data->s; struct task **unlocks = data->unlocks; int *volatile offsets = data->offsets; int *unlock_ind = (int *)map_data; const size_t delta = unlock_ind - s->unlock_ind; for (int k = 0; k < num_elements; k++) { const int ind = unlock_ind[k]; unlocks[atomic_inc(&offsets[ind])] = s->unlocks[k + delta]; } } /** * @brief Set the unlock pointers in each task. * * @param s The #scheduler. * @param tp the #threadpool. */ void scheduler_set_unlocks(struct scheduler *s, struct threadpool *tp) { /* Temporary extra data for the threadpool */ struct unlock_extra_data extra_data; /* Store the counts for each task. */ int *counts; if ((counts = (int *)swift_malloc("counts", sizeof(int) * s->nr_tasks)) == NULL) error("Failed to allocate temporary counts array."); bzero(counts, sizeof(int) * s->nr_tasks); extra_data.s = s; extra_data.counts = counts; threadpool_map(tp, scheduler_set_unlock_counts_mapper, s->unlock_ind, s->nr_unlocks, sizeof(int), threadpool_auto_chunk_size, &extra_data); /* Compute the offset for each unlock block. */ int *offsets; if ((offsets = (int *)swift_malloc("offsets", sizeof(int) * (s->nr_tasks + 1))) == NULL) error("Failed to allocate temporary offsets array."); offsets[0] = 0; for (int k = 0; k < s->nr_tasks; k++) { offsets[k + 1] = offsets[k] + counts[k]; #ifdef SWIFT_DEBUG_CHECKS /* Check that we are not overflowing */ if (offsets[k + 1] < 0) error("Task unlock offset array overflowing"); #endif } /* Create and fill a temporary array with the sorted unlocks. */ struct task **unlocks; if ((unlocks = (struct task **)swift_malloc( "unlocks", sizeof(struct task *) * s->size_unlocks)) == NULL) error("Failed to allocate temporary unlocks array."); extra_data.offsets = offsets; extra_data.unlocks = unlocks; threadpool_map(tp, scheduler_set_unlock_sorts_mapper, s->unlock_ind, s->nr_unlocks, sizeof(int), threadpool_auto_chunk_size, &extra_data); /* Swap the unlocks. */ swift_free("unlocks", s->unlocks); s->unlocks = unlocks; /* Re-set the offsets. */ offsets[0] = 0; for (int k = 1; k < s->nr_tasks; k++) offsets[k] = offsets[k - 1] + counts[k - 1]; /* Set the unlocks in the tasks. */ for (int k = 0; k < s->nr_tasks; k++) { struct task *t = &s->tasks[k]; t->nr_unlock_tasks = counts[k]; t->unlock_tasks = &s->unlocks[offsets[k]]; } #ifdef SWIFT_DEBUG_CHECKS /* Verify that there are no duplicate unlocks. */ for (int k = 0; k < s->nr_tasks; k++) { struct task *t = &s->tasks[k]; for (int i = 0; i < t->nr_unlock_tasks; i++) { for (int j = i + 1; j < t->nr_unlock_tasks; j++) { if (t->unlock_tasks[i] == t->unlock_tasks[j]) error("duplicate unlock! t->type=%s/%s unlocking type=%s/%s", taskID_names[t->type], subtaskID_names[t->subtype], taskID_names[t->unlock_tasks[i]->type], subtaskID_names[t->unlock_tasks[i]->subtype]); } } } #endif /* Clean up. */ swift_free("counts", counts); swift_free("offsets", offsets); } /** * @brief Sort the tasks in topological order over all queues. * * @param s The #scheduler. */ void scheduler_ranktasks(struct scheduler *s) { struct task *tasks = s->tasks; int *tid = s->tasks_ind; const int nr_tasks = s->nr_tasks; /* Run through the tasks and get all the waits right. */ for (int i = 0; i < nr_tasks; i++) { struct task *t = &tasks[i]; // Increment the waits of the dependances for (int k = 0; k < t->nr_unlock_tasks; k++) { t->unlock_tasks[k]->wait++; } } /* Load the tids of tasks with no waits. */ int left = 0; for (int k = 0; k < nr_tasks; k++) if (tasks[k].wait == 0) { tid[left] = k; left += 1; } /* Main loop. */ for (int j = 0, rank = 0; j < nr_tasks; rank++) { /* Did we get anything? */ if (j == left) error("Unsatisfiable task dependencies detected."); /* Unlock the next layer of tasks. */ const int left_old = left; for (; j < left_old; j++) { struct task *t = &tasks[tid[j]]; t->rank = rank; /* message( "task %i of type %s has rank %i." , i , (t->type == task_type_self) ? "self" : (t->type == task_type_pair) ? "pair" : "sort" , rank ); */ for (int k = 0; k < t->nr_unlock_tasks; k++) { struct task *u = t->unlock_tasks[k]; if (--u->wait == 0) { tid[left] = u - tasks; left += 1; } } } /* Move back to the old left (like Sanders!). */ j = left_old; } #ifdef SWIFT_DEBUG_CHECKS /* Verify that the tasks were ranked correctly. */ for (int k = 1; k < s->nr_tasks; k++) if (tasks[tid[k - 1]].rank > tasks[tid[k]].rank) error("Task ranking failed."); #endif } /** * @brief (Re)allocate the task arrays. * * @param s The #scheduler. * @param size The maximum number of tasks in the #scheduler. */ void scheduler_reset(struct scheduler *s, int size) { /* Do we need to re-allocate? */ if (size > s->size) { /* Free existing task lists if necessary. */ scheduler_free_tasks(s); /* Allocate the new lists. */ if (swift_memalign("tasks", (void **)&s->tasks, task_align, size * sizeof(struct task)) != 0) error("Failed to allocate task array."); if ((s->tasks_ind = (int *)swift_malloc("tasks_ind", sizeof(int) * size)) == NULL) error("Failed to allocate task lists."); if ((s->tid_active = (int *)swift_malloc("tid_active", sizeof(int) * size)) == NULL) error("Failed to allocate aactive task lists."); } /* Reset the counters. */ s->size = size; s->nr_tasks = 0; s->tasks_next = 0; s->waiting = 0; s->nr_unlocks = 0; s->completed_unlock_writes = 0; s->active_count = 0; s->total_ticks = 0; /* Set the task pointers in the queues. */ for (int k = 0; k < s->nr_queues; k++) s->queues[k].tasks = s->tasks; } /** * @brief Compute the task weights * * @param s The #scheduler. * @param verbose Are we talkative? */ void scheduler_reweight(struct scheduler *s, int verbose) { const int nr_tasks = s->nr_tasks; int *tid = s->tasks_ind; struct task *tasks = s->tasks; const int nodeID = s->nodeID; const float wscale = 0.001f; const ticks tic = getticks(); /* Run through the tasks backwards and set their weights. */ for (int k = nr_tasks - 1; k >= 0; k--) { struct task *t = &tasks[tid[k]]; float cost = 0.f; t->weight = 0.f; for (int j = 0; j < t->nr_unlock_tasks; j++) t->weight += t->unlock_tasks[j]->weight; const float count_i = (t->ci != NULL) ? t->ci->hydro.count : 0.f; const float count_j = (t->cj != NULL) ? t->cj->hydro.count : 0.f; const float gcount_i = (t->ci != NULL) ? t->ci->grav.count : 0.f; const float gcount_j = (t->cj != NULL) ? t->cj->grav.count : 0.f; const float scount_i = (t->ci != NULL) ? t->ci->stars.count : 0.f; const float scount_j = (t->cj != NULL) ? t->cj->stars.count : 0.f; const float sink_count_i = (t->ci != NULL) ? t->ci->sinks.count : 0.f; const float sink_count_j = (t->cj != NULL) ? t->cj->sinks.count : 0.f; const float bcount_i = (t->ci != NULL) ? t->ci->black_holes.count : 0.f; const float bcount_j = (t->cj != NULL) ? t->cj->black_holes.count : 0.f; switch (t->type) { case task_type_sort: case task_type_rt_sort: cost = wscale * intrinsics_popcount(t->flags) * count_i * (sizeof(int) * 8 - (count_i ? intrinsics_clz(count_i) : 0)); break; case task_type_stars_sort: cost = wscale * intrinsics_popcount(t->flags) * scount_i * (sizeof(int) * 8 - (scount_i ? intrinsics_clz(scount_i) : 0)); break; case task_type_stars_resort: cost = wscale * intrinsics_popcount(t->flags) * scount_i * (sizeof(int) * 8 - (scount_i ? intrinsics_clz(scount_i) : 0)); break; case task_type_self: if (t->subtype == task_subtype_grav) { cost = 1.f * (wscale * gcount_i) * gcount_i; } else if (t->subtype == task_subtype_external_grav) cost = 1.f * wscale * gcount_i; else if (t->subtype == task_subtype_stars_density || t->subtype == task_subtype_stars_prep1 || t->subtype == task_subtype_stars_prep2 || t->subtype == task_subtype_stars_feedback) cost = 1.f * wscale * scount_i * count_i; else if (t->subtype == task_subtype_sink_density || t->subtype == task_subtype_sink_swallow || t->subtype == task_subtype_sink_do_gas_swallow) cost = 1.f * wscale * count_i * sink_count_i; else if (t->subtype == task_subtype_sink_do_sink_swallow) cost = 1.f * wscale * sink_count_i * sink_count_i; else if (t->subtype == task_subtype_bh_density || t->subtype == task_subtype_bh_swallow || t->subtype == task_subtype_bh_feedback) cost = 1.f * wscale * bcount_i * count_i; else if (t->subtype == task_subtype_do_gas_swallow) cost = 1.f * wscale * count_i; else if (t->subtype == task_subtype_do_bh_swallow) cost = 1.f * wscale * bcount_i; else if (t->subtype == task_subtype_density || t->subtype == task_subtype_gradient || t->subtype == task_subtype_force || t->subtype == task_subtype_limiter) cost = 1.f * (wscale * count_i) * count_i; else if (t->subtype == task_subtype_rt_gradient) cost = 1.f * wscale * count_i * count_i; else if (t->subtype == task_subtype_rt_transport) cost = 1.f * wscale * count_i * count_i; else error("Untreated sub-type for selfs: %s", subtaskID_names[t->subtype]); break; case task_type_pair: if (t->subtype == task_subtype_grav) { if (t->ci->nodeID != nodeID || t->cj->nodeID != nodeID) cost = 3.f * (wscale * gcount_i) * gcount_j; else cost = 2.f * (wscale * gcount_i) * gcount_j; } else if (t->subtype == task_subtype_stars_density || t->subtype == task_subtype_stars_prep1 || t->subtype == task_subtype_stars_prep2 || t->subtype == task_subtype_stars_feedback) { if (t->ci->nodeID != nodeID) cost = 3.f * wscale * count_i * scount_j * sid_scale[t->flags]; else if (t->cj->nodeID != nodeID) cost = 3.f * wscale * scount_i * count_j * sid_scale[t->flags]; else cost = 2.f * wscale * (scount_i * count_j + scount_j * count_i) * sid_scale[t->flags]; } else if (t->subtype == task_subtype_sink_density || t->subtype == task_subtype_sink_swallow || t->subtype == task_subtype_sink_do_gas_swallow) { if (t->ci->nodeID != nodeID) cost = 3.f * wscale * count_i * sink_count_j * sid_scale[t->flags]; else if (t->cj->nodeID != nodeID) cost = 3.f * wscale * sink_count_i * count_j * sid_scale[t->flags]; else cost = 2.f * wscale * (sink_count_i * count_j + sink_count_j * count_i) * sid_scale[t->flags]; } else if (t->subtype == task_subtype_sink_do_sink_swallow) { if (t->ci->nodeID != nodeID) cost = 3.f * wscale * sink_count_i * sink_count_j * sid_scale[t->flags]; else if (t->cj->nodeID != nodeID) cost = 3.f * wscale * sink_count_i * sink_count_j * sid_scale[t->flags]; else cost = 2.f * wscale * (sink_count_i * sink_count_j + sink_count_j * sink_count_i) * sid_scale[t->flags]; } else if (t->subtype == task_subtype_bh_density || t->subtype == task_subtype_bh_swallow || t->subtype == task_subtype_bh_feedback) { if (t->ci->nodeID != nodeID) cost = 3.f * wscale * count_i * bcount_j * sid_scale[t->flags]; else if (t->cj->nodeID != nodeID) cost = 3.f * wscale * bcount_i * count_j * sid_scale[t->flags]; else cost = 2.f * wscale * (bcount_i * count_j + bcount_j * count_i) * sid_scale[t->flags]; } else if (t->subtype == task_subtype_do_gas_swallow) { cost = 1.f * wscale * (count_i + count_j); } else if (t->subtype == task_subtype_do_bh_swallow) { cost = 1.f * wscale * (bcount_i + bcount_j); } else if (t->subtype == task_subtype_density || t->subtype == task_subtype_gradient || t->subtype == task_subtype_force || t->subtype == task_subtype_limiter) { if (t->ci->nodeID != nodeID || t->cj->nodeID != nodeID) cost = 3.f * (wscale * count_i) * count_j * sid_scale[t->flags]; else cost = 2.f * (wscale * count_i) * count_j * sid_scale[t->flags]; } else if (t->subtype == task_subtype_rt_gradient) { cost = 1.f * wscale * count_i * count_j; } else if (t->subtype == task_subtype_rt_transport) { cost = 1.f * wscale * count_i * count_j; } else { error("Untreated sub-type for pairs: %s", subtaskID_names[t->subtype]); } break; case task_type_sub_pair: #ifdef SWIFT_DEBUG_CHECKS if (t->flags < 0) error("Negative flag value!"); #endif if (t->subtype == task_subtype_stars_density || t->subtype == task_subtype_stars_prep1 || t->subtype == task_subtype_stars_prep2 || t->subtype == task_subtype_stars_feedback) { if (t->ci->nodeID != nodeID) { cost = 3.f * (wscale * count_i) * scount_j * sid_scale[t->flags]; } else if (t->cj->nodeID != nodeID) { cost = 3.f * (wscale * scount_i) * count_j * sid_scale[t->flags]; } else { cost = 2.f * wscale * (scount_i * count_j + scount_j * count_i) * sid_scale[t->flags]; } } else if (t->subtype == task_subtype_sink_density || t->subtype == task_subtype_sink_swallow || t->subtype == task_subtype_sink_do_gas_swallow) { if (t->ci->nodeID != nodeID) { cost = 3.f * (wscale * count_i) * sink_count_j * sid_scale[t->flags]; } else if (t->cj->nodeID != nodeID) { cost = 3.f * (wscale * sink_count_i) * count_j * sid_scale[t->flags]; } else { cost = 2.f * wscale * (sink_count_i * count_j + sink_count_j * count_i) * sid_scale[t->flags]; } } else if (t->subtype == task_subtype_sink_do_sink_swallow) { if (t->ci->nodeID != nodeID) { cost = 3.f * (wscale * sink_count_i) * sink_count_j * sid_scale[t->flags]; } else if (t->cj->nodeID != nodeID) { cost = 3.f * (wscale * sink_count_i) * sink_count_j * sid_scale[t->flags]; } else { cost = 2.f * wscale * (sink_count_i * sink_count_j + sink_count_j * sink_count_i) * sid_scale[t->flags]; } } else if (t->subtype == task_subtype_bh_density || t->subtype == task_subtype_bh_swallow || t->subtype == task_subtype_bh_feedback) { if (t->ci->nodeID != nodeID) { cost = 3.f * (wscale * count_i) * bcount_j * sid_scale[t->flags]; } else if (t->cj->nodeID != nodeID) { cost = 3.f * (wscale * bcount_i) * count_j * sid_scale[t->flags]; } else { cost = 2.f * wscale * (bcount_i * count_j + bcount_j * count_i) * sid_scale[t->flags]; } } else if (t->subtype == task_subtype_do_gas_swallow) { cost = 1.f * wscale * (count_i + count_j); } else if (t->subtype == task_subtype_do_bh_swallow) { cost = 1.f * wscale * (bcount_i + bcount_j); } else if (t->subtype == task_subtype_density || t->subtype == task_subtype_gradient || t->subtype == task_subtype_force || t->subtype == task_subtype_limiter) { if (t->ci->nodeID != nodeID || t->cj->nodeID != nodeID) { cost = 3.f * (wscale * count_i) * count_j * sid_scale[t->flags]; } else { cost = 2.f * (wscale * count_i) * count_j * sid_scale[t->flags]; } } else if (t->subtype == task_subtype_rt_gradient) { cost = 1.f * wscale * count_i * count_j; } else if (t->subtype == task_subtype_rt_transport) { cost = 1.f * wscale * count_i * count_j; } else { error("Untreated sub-type for sub-pairs: %s", subtaskID_names[t->subtype]); } break; case task_type_sub_self: if (t->subtype == task_subtype_stars_density || t->subtype == task_subtype_stars_prep1 || t->subtype == task_subtype_stars_prep2 || t->subtype == task_subtype_stars_feedback) { cost = 1.f * (wscale * scount_i) * count_i; } else if (t->subtype == task_subtype_sink_density || t->subtype == task_subtype_sink_swallow || t->subtype == task_subtype_sink_do_gas_swallow) { cost = 1.f * (wscale * sink_count_i) * count_i; } else if (t->subtype == task_subtype_sink_do_sink_swallow) { cost = 1.f * (wscale * sink_count_i) * sink_count_i; } else if (t->subtype == task_subtype_bh_density || t->subtype == task_subtype_bh_swallow || t->subtype == task_subtype_bh_feedback) { cost = 1.f * (wscale * bcount_i) * count_i; } else if (t->subtype == task_subtype_do_gas_swallow) { cost = 1.f * wscale * count_i; } else if (t->subtype == task_subtype_do_bh_swallow) { cost = 1.f * wscale * bcount_i; } else if (t->subtype == task_subtype_density || t->subtype == task_subtype_gradient || t->subtype == task_subtype_force || t->subtype == task_subtype_limiter) { cost = 1.f * (wscale * count_i) * count_i; } else if (t->subtype == task_subtype_rt_gradient) { cost = 1.f * wscale * scount_i * count_i; } else if (t->subtype == task_subtype_rt_transport) { cost = 1.f * wscale * scount_i * count_i; } else { error("Untreated sub-type for sub-selfs: %s", subtaskID_names[t->subtype]); } break; case task_type_ghost: if (t->ci == t->ci->hydro.super) cost = wscale * count_i; break; case task_type_extra_ghost: if (t->ci == t->ci->hydro.super) cost = wscale * count_i; break; case task_type_stars_ghost: if (t->ci == t->ci->hydro.super) cost = wscale * scount_i; break; case task_type_bh_density_ghost: if (t->ci == t->ci->hydro.super) cost = wscale * bcount_i; break; case task_type_bh_swallow_ghost2: if (t->ci == t->ci->hydro.super) cost = wscale * bcount_i; break; case task_type_sink_density_ghost: if (t->ci == t->ci->hydro.super) cost = wscale * sink_count_i; break; case task_type_drift_part: cost = wscale * count_i; break; case task_type_drift_gpart: cost = wscale * gcount_i; break; case task_type_drift_spart: cost = wscale * scount_i; break; case task_type_drift_sink: cost = wscale * sink_count_i; break; case task_type_drift_bpart: cost = wscale * bcount_i; break; case task_type_init_grav: cost = wscale * gcount_i; break; case task_type_grav_down: cost = wscale * gcount_i; break; case task_type_grav_long_range: cost = wscale * gcount_i; break; case task_type_grav_mm: cost = wscale * (gcount_i + gcount_j); break; case task_type_end_hydro_force: cost = wscale * count_i; break; case task_type_end_grav_force: cost = wscale * gcount_i; break; case task_type_cooling: cost = wscale * count_i; break; case task_type_star_formation: cost = wscale * (count_i + scount_i); break; case task_type_star_formation_sink: cost = wscale * (sink_count_i + scount_i); break; case task_type_sink_formation: cost = wscale * (count_i + sink_count_i); break; case task_type_rt_ghost1: cost = wscale * count_i; break; case task_type_rt_ghost2: cost = wscale * count_i; break; case task_type_rt_tchem: cost = wscale * count_i; break; case task_type_rt_advance_cell_time: case task_type_rt_collect_times: cost = wscale; break; case task_type_csds: cost = wscale * (count_i + gcount_i + scount_i + sink_count_i + bcount_i); break; case task_type_kick1: cost = wscale * (count_i + gcount_i + scount_i + sink_count_i + bcount_i); break; case task_type_kick2: cost = wscale * (count_i + gcount_i + scount_i + sink_count_i + bcount_i); break; case task_type_timestep: cost = wscale * (count_i + gcount_i + scount_i + sink_count_i + bcount_i); break; case task_type_timestep_limiter: cost = wscale * count_i; break; case task_type_timestep_sync: cost = wscale * count_i; break; case task_type_send: if (count_i < 1e5) cost = 10.f * (wscale * count_i) * count_i; else cost = 2e9; break; case task_type_recv: if (count_i < 1e5) cost = 5.f * (wscale * count_i) * count_i; else cost = 1e9; break; default: cost = 0; break; } t->weight += cost; } if (verbose) message("took %.3f %s.", clocks_from_ticks(getticks() - tic), clocks_getunit()); /* int min = tasks[0].weight, max = tasks[0].weight; for ( int k = 1 ; k < nr_tasks ; k++ ) if ( tasks[k].weight < min ) min = tasks[k].weight; else if ( tasks[k].weight > max ) max = tasks[k].weight; message( "task weights are in [ %i , %i ]." , min , max ); */ } /** * @brief #threadpool_map function which runs through the task * graph and re-computes the task wait counters. */ void scheduler_rewait_mapper(void *map_data, int num_elements, void *extra_data) { struct scheduler *s = (struct scheduler *)extra_data; const int *tid = (int *)map_data; for (int ind = 0; ind < num_elements; ind++) { struct task *t = &s->tasks[tid[ind]]; /* Ignore skipped tasks. */ if (t->skip) continue; /* Increment the task's own wait counter for the enqueueing. */ atomic_inc(&t->wait); #ifdef SWIFT_DEBUG_CHECKS /* Check that we don't have more waits that what can be stored. */ if (t->wait < 0) error("Task (type=%s/%s) unlocked by more than %lld tasks!", taskID_names[t->type], subtaskID_names[t->subtype], (1LL << (8 * sizeof(t->wait) - 1)) - 1); #endif /* Sets the waits of the dependances */ for (int k = 0; k < t->nr_unlock_tasks; k++) { struct task *u = t->unlock_tasks[k]; atomic_inc(&u->wait); } } } void scheduler_enqueue_mapper(void *map_data, int num_elements, void *extra_data) { struct scheduler *s = (struct scheduler *)extra_data; const int *tid = (int *)map_data; struct task *tasks = s->tasks; for (int ind = 0; ind < num_elements; ind++) { struct task *t = &tasks[tid[ind]]; if (atomic_dec(&t->wait) == 1 && !t->skip) { scheduler_enqueue(s, t); } } pthread_cond_broadcast(&s->sleep_cond); } /** * @brief Start the scheduler, i.e. fill the queues with ready tasks. * * @param s The #scheduler. */ void scheduler_start(struct scheduler *s) { /* Re-wait the tasks. */ if (s->active_count > 1000) { threadpool_map(s->threadpool, scheduler_rewait_mapper, s->tid_active, s->active_count, sizeof(int), threadpool_auto_chunk_size, s); } else { scheduler_rewait_mapper(s->tid_active, s->active_count, s); } /* Loop over the tasks and enqueue whoever is ready. */ if (s->active_count > 1000) { threadpool_map(s->threadpool, scheduler_enqueue_mapper, s->tid_active, s->active_count, sizeof(int), threadpool_auto_chunk_size, s); } else { scheduler_enqueue_mapper(s->tid_active, s->active_count, s); } /* Clear the list of active tasks. */ s->active_count = 0; /* To be safe, fire of one last sleep_cond in a safe way. */ pthread_mutex_lock(&s->sleep_mutex); pthread_cond_broadcast(&s->sleep_cond); pthread_mutex_unlock(&s->sleep_mutex); } /** * @brief Put a task on one of the queues. * * @param s The #scheduler. * @param t The #task. */ void scheduler_enqueue(struct scheduler *s, struct task *t) { /* Ignore skipped tasks */ if (t->skip) return; /* If this is an implicit task, just pretend it's done. */ if (t->implicit) { #ifdef SWIFT_DEBUG_CHECKS t->ti_run = s->space->e->ti_current; /* Mark that we have run this task on these cells */ if (t->ci != NULL) { t->ci->tasks_executed[t->type]++; t->ci->subtasks_executed[t->subtype]++; } if (t->cj != NULL) { t->cj->tasks_executed[t->type]++; t->cj->subtasks_executed[t->subtype]++; } #endif t->skip = 1; for (int j = 0; j < t->nr_unlock_tasks; j++) { struct task *t2 = t->unlock_tasks[j]; if (atomic_dec(&t2->wait) == 1) scheduler_enqueue(s, t2); } } /* Otherwise, look for a suitable queue. */ else { #ifdef WITH_MPI int err = MPI_SUCCESS; #endif /* Find the previous owner for each task type, and do * any pre-processing needed. */ short int qid = -1; short int *owner = NULL; switch (t->type) { case task_type_self: case task_type_sub_self: if (t->subtype == task_subtype_grav || t->subtype == task_subtype_external_grav) { qid = t->ci->grav.super->owner; owner = &t->ci->grav.super->owner; } else { qid = t->ci->hydro.super->owner; owner = &t->ci->hydro.super->owner; } break; case task_type_sort: case task_type_ghost: case task_type_drift_part: qid = t->ci->hydro.super->owner; owner = &t->ci->hydro.super->owner; break; case task_type_drift_gpart: qid = t->ci->grav.super->owner; owner = &t->ci->grav.super->owner; break; case task_type_kick1: case task_type_kick2: case task_type_stars_ghost: case task_type_csds: case task_type_stars_sort: case task_type_timestep: qid = t->ci->super->owner; owner = &t->ci->super->owner; break; case task_type_pair: case task_type_sub_pair: qid = t->ci->super->owner; owner = &t->ci->super->owner; if ((qid < 0) || ((t->cj->super->owner > -1) && (s->queues[qid].count > s->queues[t->cj->super->owner].count))) { qid = t->cj->super->owner; owner = &t->cj->super->owner; } break; case task_type_recv: #ifdef WITH_MPI { size_t size = 0; /* Size in bytes. */ size_t count = 0; /* Number of elements to receive */ MPI_Datatype type = MPI_BYTE; /* Type of the elements */ void *buff = NULL; /* Buffer to accept elements */ if (t->subtype == task_subtype_tend) { count = size = t->ci->mpi.pcell_size * sizeof(struct pcell_step); buff = t->buff = malloc(count); } else if (t->subtype == task_subtype_part_swallow) { count = size = t->ci->hydro.count * sizeof(struct black_holes_part_data); buff = t->buff = malloc(count); } else if (t->subtype == task_subtype_bpart_merger) { count = size = sizeof(struct black_holes_bpart_data) * t->ci->black_holes.count; buff = t->buff = malloc(count); } else if (t->subtype == task_subtype_xv || t->subtype == task_subtype_rho || t->subtype == task_subtype_gradient || t->subtype == task_subtype_rt_gradient || t->subtype == task_subtype_rt_transport || t->subtype == task_subtype_part_prep1) { count = t->ci->hydro.count; size = count * sizeof(struct part); type = part_mpi_type; buff = t->ci->hydro.parts; } else if (t->subtype == task_subtype_limiter) { size = count = t->ci->hydro.count * sizeof(timebin_t); if (posix_memalign((void **)&buff, SWIFT_CACHE_ALIGNMENT, count) != 0) error("Error allocating timebin recv buffer"); type = MPI_BYTE; t->buff = buff; task_get_unique_dependent(t)->buff = buff; } else if (t->subtype == task_subtype_gpart) { count = t->ci->grav.count; size = count * sizeof(struct gpart); type = gpart_mpi_type; buff = t->ci->grav.parts; } else if (t->subtype == task_subtype_spart_density || t->subtype == task_subtype_spart_prep2) { count = t->ci->stars.count; size = count * sizeof(struct spart); type = spart_mpi_type; buff = t->ci->stars.parts; } else if (t->subtype == task_subtype_bpart_rho || t->subtype == task_subtype_bpart_feedback) { count = t->ci->black_holes.count; size = count * sizeof(struct bpart); type = bpart_mpi_type; buff = t->ci->black_holes.parts; } else if (t->subtype == task_subtype_sf_counts) { count = size = t->ci->mpi.pcell_size * sizeof(struct pcell_sf_stars); buff = t->buff = malloc(count); } else if (t->subtype == task_subtype_grav_counts) { count = size = t->ci->mpi.pcell_size * sizeof(struct pcell_sf_grav); buff = t->buff = malloc(count); } else { error("Unknown communication sub-type"); } err = MPI_Irecv(buff, count, type, t->ci->nodeID, t->flags, subtaskMPI_comms[t->subtype], &t->req); if (err != MPI_SUCCESS) { mpi_error(err, "Failed to emit irecv for particle data."); } /* And log, if logging enabled. */ mpiuse_log_allocation(t->type, t->subtype, &t->req, 1, size, t->ci->nodeID, t->flags); qid = 1 % s->nr_queues; } #else error("SWIFT was not compiled with MPI support."); #endif break; case task_type_send: #ifdef WITH_MPI { size_t size = 0; /* Size in bytes. */ size_t count = 0; /* Number of elements to send */ MPI_Datatype type = MPI_BYTE; /* Type of the elements */ void *buff = NULL; /* Buffer to send */ if (t->subtype == task_subtype_tend) { size = count = t->ci->mpi.pcell_size * sizeof(struct pcell_step); buff = t->buff = malloc(size); cell_pack_end_step(t->ci, (struct pcell_step *)buff); } else if (t->subtype == task_subtype_part_swallow) { size = count = t->ci->hydro.count * sizeof(struct black_holes_part_data); buff = t->buff = malloc(size); cell_pack_part_swallow(t->ci, (struct black_holes_part_data *)buff); } else if (t->subtype == task_subtype_bpart_merger) { size = count = sizeof(struct black_holes_bpart_data) * t->ci->black_holes.count; buff = t->buff = malloc(size); cell_pack_bpart_swallow(t->ci, (struct black_holes_bpart_data *)t->buff); } else if (t->subtype == task_subtype_xv || t->subtype == task_subtype_rho || t->subtype == task_subtype_gradient || t->subtype == task_subtype_rt_gradient || t->subtype == task_subtype_rt_transport || t->subtype == task_subtype_part_prep1) { count = t->ci->hydro.count; size = count * sizeof(struct part); type = part_mpi_type; buff = t->ci->hydro.parts; } else if (t->subtype == task_subtype_limiter) { size = count = t->ci->hydro.count * sizeof(timebin_t); type = MPI_BYTE; buff = t->buff; } else if (t->subtype == task_subtype_gpart) { count = t->ci->grav.count; size = count * sizeof(struct gpart); type = gpart_mpi_type; buff = t->ci->grav.parts; } else if (t->subtype == task_subtype_spart_density || t->subtype == task_subtype_spart_prep2) { count = t->ci->stars.count; size = count * sizeof(struct spart); type = spart_mpi_type; buff = t->ci->stars.parts; } else if (t->subtype == task_subtype_bpart_rho || t->subtype == task_subtype_bpart_feedback) { count = t->ci->black_holes.count; size = count * sizeof(struct bpart); type = bpart_mpi_type; buff = t->ci->black_holes.parts; } else if (t->subtype == task_subtype_sf_counts) { size = count = t->ci->mpi.pcell_size * sizeof(struct pcell_sf_stars); buff = t->buff = malloc(size); cell_pack_sf_counts(t->ci, (struct pcell_sf_stars *)t->buff); } else if (t->subtype == task_subtype_grav_counts) { size = count = t->ci->mpi.pcell_size * sizeof(struct pcell_sf_grav); buff = t->buff = malloc(size); cell_pack_grav_counts(t->ci, (struct pcell_sf_grav *)t->buff); } else { error("Unknown communication sub-type"); } if (size > s->mpi_message_limit) { err = MPI_Isend(buff, count, type, t->cj->nodeID, t->flags, subtaskMPI_comms[t->subtype], &t->req); } else { err = MPI_Issend(buff, count, type, t->cj->nodeID, t->flags, subtaskMPI_comms[t->subtype], &t->req); } if (err != MPI_SUCCESS) { mpi_error(err, "Failed to emit isend for particle data."); } /* And log, if logging enabled. */ mpiuse_log_allocation(t->type, t->subtype, &t->req, 1, size, t->cj->nodeID, t->flags); qid = 0; } #else error("SWIFT was not compiled with MPI support."); #endif break; default: qid = -1; } if (qid >= s->nr_queues) error("Bad computed qid."); /* If no qid, pick a random queue. */ if (qid < 0) qid = rand() % s->nr_queues; /* Save qid as owner for next time a task accesses this cell. */ if (owner != NULL) *owner = qid; /* Increase the waiting counter. */ atomic_inc(&s->waiting); /* Insert the task into that queue. */ queue_insert(&s->queues[qid], t); } } /** * @brief Take care of a tasks dependencies. * * @param s The #scheduler. * @param t The finished #task. * * @return A pointer to the next task, if a suitable one has * been identified. */ struct task *scheduler_done(struct scheduler *s, struct task *t) { /* Release whatever locks this task held. */ if (!t->implicit) task_unlock(t); /* Loop through the dependencies and add them to a queue if they are ready. */ for (int k = 0; k < t->nr_unlock_tasks; k++) { struct task *t2 = t->unlock_tasks[k]; if (t2->skip) continue; const int res = atomic_dec(&t2->wait); if (res < 1) { error("Negative wait!"); } else if (res == 1) { scheduler_enqueue(s, t2); } } /* Task definitely done, signal any sleeping runners. */ if (!t->implicit) { t->toc = getticks(); t->total_ticks += t->toc - t->tic; pthread_mutex_lock(&s->sleep_mutex); atomic_dec(&s->waiting); pthread_cond_broadcast(&s->sleep_cond); pthread_mutex_unlock(&s->sleep_mutex); } /* Mark the task as skip. */ t->skip = 1; /* Return the next best task. Note that we currently do not implement anything that does this, as getting it to respect priorities is too tricky and currently unnecessary. */ return NULL; } /** * @brief Resolve a single dependency by hand. * * @param s The #scheduler. * @param t The dependent #task. * * @return A pointer to the next task, if a suitable one has * been identified. */ struct task *scheduler_unlock(struct scheduler *s, struct task *t) { /* Loop through the dependencies and add them to a queue if they are ready. */ for (int k = 0; k < t->nr_unlock_tasks; k++) { struct task *t2 = t->unlock_tasks[k]; const int res = atomic_dec(&t2->wait); if (res < 1) { error("Negative wait!"); } else if (res == 1) { scheduler_enqueue(s, t2); } } /* Task definitely done. */ if (!t->implicit) { t->toc = getticks(); t->total_ticks += t->toc - t->tic; pthread_mutex_lock(&s->sleep_mutex); atomic_dec(&s->waiting); pthread_cond_broadcast(&s->sleep_cond); pthread_mutex_unlock(&s->sleep_mutex); } /* Return the next best task. Note that we currently do not implement anything that does this, as getting it to respect priorities is too tricky and currently unnecessary. */ return NULL; } /** * Take note of the time at which a task was successfully fetched from the * queue. * * @param s The #scheduler. */ void scheduler_mark_last_fetch(struct scheduler *s) { #if defined(SWIFT_DEBUG_CHECKS) if (s->deadlock_waiting_time_ms <= 0.f) return; ticks now = getticks(); ticks last = s->last_successful_task_fetch; while (atomic_cas(&s->last_successful_task_fetch, last, now) != last) { now = getticks(); last = s->last_successful_task_fetch; } #endif } /** * Abort the run if you're stuck doing nothing for too long. * This function is intended to abort the mission if you're * deadlocked somewhere and somehow. You might get core dumps * this way. Alternatively, you might manually set a breakpoint * with gdb when this function is called. * * @param s The #scheduler. */ void scheduler_check_deadlock(struct scheduler *s) { #if defined(SWIFT_DEBUG_CHECKS) if (s->deadlock_waiting_time_ms <= 0.f) return; /* lock_lock(&s->last_task_fetch_lock); */ ticks now = getticks(); ticks last = s->last_successful_task_fetch; if (last == 0LL) { /* Ensure that the first check each engine_launch doesn't fail. There is no * guarantee how long it will take from the point where * last_successful_task_fetch was reset to get to this point. A poorly * chosen scheduler->deadlock_waiting_time_ms may abort a big run in places * where there is no deadlock. Better safe than sorry, so at start-up, the * last successful task fetch time is marked as 0. So we just exit without * checking the time. */ while (atomic_cas(&s->last_successful_task_fetch, last, now) != last) { now = getticks(); last = s->last_successful_task_fetch; } return; } /* ticks on different CPUs may disagree a bit. So we may end up * with last > now, and consequently negative idle time, which * then overflows unsigned long longs and gives false positives. */ const ticks big = max(now, last); const ticks small = min(now, last); const double idle_time = clocks_diff_ticks(big, small); if (idle_time > s->deadlock_waiting_time_ms) { message( "Detected what looks like a deadlock after %g ms of no new task being " "fetched from queues. Dumping diagnostic data.", idle_time); engine_dump_diagnostic_data(s->e); error("Aborting now."); } #endif } /** * @brief Get a task, preferably from the given queue. * * @param s The #scheduler. * @param qid The ID of the preferred #queue. * @param prev the previous task that was run. * * @return A pointer to a #task or @c NULL if there are no available tasks. */ struct task *scheduler_gettask(struct scheduler *s, int qid, const struct task *prev) { struct task *res = NULL; const int nr_queues = s->nr_queues; unsigned int seed = qid; /* Check qid. */ if (qid >= nr_queues || qid < 0) error("Bad queue ID."); /* Loop as long as there are tasks... */ while (s->waiting > 0 && res == NULL) { /* Try more than once before sleeping. */ for (int tries = 0; res == NULL && s->waiting && tries < scheduler_maxtries; tries++) { /* Try to get a task from the suggested queue. */ if (s->queues[qid].count > 0 || s->queues[qid].count_incoming > 0) { TIMER_TIC res = queue_gettask(&s->queues[qid], prev, 0); TIMER_TOC(timer_qget); if (res != NULL) break; } /* If unsuccessful, try stealing from the other queues. */ if (s->flags & scheduler_flag_steal) { int count = 0, qids[nr_queues]; for (int k = 0; k < nr_queues; k++) if (s->queues[k].count > 0 || s->queues[k].count_incoming > 0) { qids[count++] = k; } for (int k = 0; k < scheduler_maxsteal && count > 0; k++) { const int ind = rand_r(&seed) % count; TIMER_TIC res = queue_gettask(&s->queues[qids[ind]], prev, 0); TIMER_TOC(timer_qsteal); if (res != NULL) { break; } else { qids[ind] = qids[--count]; } } if (res != NULL) break; } } /* If we failed, take a short nap. */ #ifdef WITH_MPI if (res == NULL && qid > 1) #else if (res == NULL) #endif { pthread_mutex_lock(&s->sleep_mutex); res = queue_gettask(&s->queues[qid], prev, 1); if (res == NULL && s->waiting > 0) { pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex); } pthread_mutex_unlock(&s->sleep_mutex); } scheduler_check_deadlock(s); } if (res != NULL) { scheduler_mark_last_fetch(s); /* Start the timer on this task, if we got one. */ res->tic = getticks(); #ifdef SWIFT_DEBUG_TASKS res->rid = qid; #endif } /* No milk today. */ return res; } /** * @brief Initialize the #scheduler. * * @param s The #scheduler. * @param space The #space we are working with * @param nr_tasks The number of tasks to allocate initially. * @param nr_queues The number of queues in this scheduler. * @param flags The #scheduler flags. * @param nodeID The MPI rank * @param tp Parallel processing threadpool. */ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks, int nr_queues, unsigned int flags, int nodeID, struct threadpool *tp) { /* Init the lock. */ lock_init(&s->lock); /* Allocate the queues. */ if (swift_memalign("queues", (void **)&s->queues, queue_struct_align, sizeof(struct queue) * nr_queues) != 0) error("Failed to allocate queues."); /* Initialize each queue. */ for (int k = 0; k < nr_queues; k++) queue_init(&s->queues[k], NULL); /* Init the sleep mutex and cond. */ if (pthread_cond_init(&s->sleep_cond, NULL) != 0 || pthread_mutex_init(&s->sleep_mutex, NULL) != 0) error("Failed to initialize sleep barrier."); /* Init the unlocks. */ if ((s->unlocks = (struct task **)swift_malloc( "unlocks", sizeof(struct task *) * scheduler_init_nr_unlocks)) == NULL || (s->unlock_ind = (int *)swift_malloc( "unlock_ind", sizeof(int) * scheduler_init_nr_unlocks)) == NULL) error("Failed to allocate unlocks."); s->nr_unlocks = 0; s->size_unlocks = scheduler_init_nr_unlocks; /* Set the scheduler variables. */ s->nr_queues = nr_queues; s->flags = flags; s->space = space; s->nodeID = nodeID; s->threadpool = tp; /* Init the tasks array. */ s->size = 0; s->tasks = NULL; s->tasks_ind = NULL; scheduler_reset(s, nr_tasks); #if defined(SWIFT_DEBUG_CHECKS) s->e = space->e; s->last_successful_task_fetch = 0LL; #endif } /** * @brief Prints the list of tasks to a file * * @param s The #scheduler * @param fileName Name of the file to write to */ void scheduler_print_tasks(const struct scheduler *s, const char *fileName) { const int nr_tasks = s->nr_tasks, *tid = s->tasks_ind; struct task *t, *tasks = s->tasks; FILE *file = fopen(fileName, "w"); if (file == NULL) error("Could not create file '%s'.", fileName); fprintf(file, "# Rank Name Subname unlocks waits\n"); for (int k = nr_tasks - 1; k >= 0; k--) { t = &tasks[tid[k]]; if (t->skip) continue; fprintf(file, "%d %s %s %d %d\n", k, taskID_names[t->type], subtaskID_names[t->subtype], t->nr_unlock_tasks, t->wait); } fclose(file); } /** * @brief Frees up the memory allocated for this #scheduler */ void scheduler_clean(struct scheduler *s) { scheduler_free_tasks(s); swift_free("unlocks", s->unlocks); swift_free("unlock_ind", s->unlock_ind); for (int i = 0; i < s->nr_queues; ++i) queue_clean(&s->queues[i]); swift_free("queues", s->queues); } /** * @brief Free the task arrays allocated by this #scheduler. */ void scheduler_free_tasks(struct scheduler *s) { if (s->tasks != NULL) { swift_free("tasks", s->tasks); s->tasks = NULL; } if (s->tasks_ind != NULL) { swift_free("tasks_ind", s->tasks_ind); s->tasks_ind = NULL; } if (s->tid_active != NULL) { swift_free("tid_active", s->tid_active); s->tid_active = NULL; } s->size = 0; s->nr_tasks = 0; } /** * @brief write down the levels and the number of tasks at that level. * * Run plot_task_level.py for an example of how to use it * to generate the figure. * * @param s The #scheduler we are working in. * @param step The current step number. */ void scheduler_write_task_level(const struct scheduler *s, int step) { /* init */ const int max_depth = 30; const struct task *tasks = s->tasks; int nr_tasks = s->nr_tasks; /* Init counter */ int size = task_type_count * task_subtype_count * max_depth; int *count = (int *)malloc(size * sizeof(int)); if (count == NULL) error("Failed to allocate memory"); for (int i = 0; i < size; i++) count[i] = 0; /* Count tasks */ for (int i = 0; i < nr_tasks; i++) { const struct task *t = &tasks[i]; if (t->ci) { if ((int)t->ci->depth >= max_depth) error("Cell is too deep, you need to increase max_depth"); int ind = t->type * task_subtype_count * max_depth; ind += t->subtype * max_depth; ind += (int)t->ci->depth; count[ind] += 1; } } /* Generate filename */ char filename[200] = "task_level_\0"; #ifdef WITH_MPI char rankstr[6]; sprintf(rankstr, "%04d_", s->nodeID); strcat(filename, rankstr); #endif char stepstr[100]; sprintf(stepstr, "%d.txt", step); strcat(filename, stepstr); /* Open file */ FILE *f = fopen(filename, "w"); if (f == NULL) error("Error opening task level file."); /* Print header */ fprintf(f, "# task_type, task_subtype, depth, count\n"); /* Print tasks level */ for (int i = 0; i < size; i++) { if (count[i] == 0) continue; int type = i / (task_subtype_count * max_depth); int subtype = i - task_subtype_count * max_depth * type; subtype /= max_depth; int depth = i - task_subtype_count * max_depth * type; depth -= subtype * max_depth; fprintf(f, "%s %s %i %i\n", taskID_names[type], subtaskID_names[subtype], depth, count[i]); } /* clean up */ fclose(f); free(count); } /** * @brief dump all the active queues of all the known schedulers into files. * * @param e the #scheduler */ void scheduler_dump_queues(struct engine *e) { struct scheduler *s = &e->sched; char dumpfile[35]; #ifdef WITH_MPI /* Open a file per rank and write the header. Use per rank to avoid MPI * calls that can interact with other blocking ones. */ snprintf(dumpfile, sizeof(dumpfile), "queue_dump_MPI-step%d.dat_%d", e->step, e->nodeID); #else snprintf(dumpfile, sizeof(dumpfile), "queue_dump-step%d.dat", e->step); #endif FILE *file_thread = fopen(dumpfile, "w"); if (file_thread == NULL) error("Could not create file '%s'.", dumpfile); fprintf(file_thread, "# rank queue index type subtype weight\n"); for (int l = 0; l < s->nr_queues; l++) { queue_dump(engine_rank, l, file_thread, &s->queues[l]); } fclose(file_thread); } void scheduler_report_task_times_mapper(void *map_data, int num_elements, void *extra_data) { struct task *tasks = (struct task *)map_data; float time_local[task_category_count] = {0}; float *time_global = (float *)extra_data; /* Gather the times spent in the different task categories */ for (int i = 0; i < num_elements; ++i) { const struct task *t = &tasks[i]; const float total_time = clocks_from_ticks(t->total_ticks); const enum task_categories cat = task_get_category(t); time_local[cat] += total_time; } /* Update the global counters */ for (int i = 0; i < task_category_count; ++i) { atomic_add_f(&time_global[i], time_local[i]); } } /** * @brief Display the time spent in the different task categories. * * @param s The #scheduler. * @param nr_threads The number of threads used in the engine. */ void scheduler_report_task_times(const struct scheduler *s, const int nr_threads) { const ticks tic = getticks(); /* Total CPU time spent in engine_launch() */ const float total_tasks_time = clocks_from_ticks(s->total_ticks) * nr_threads; if (total_tasks_time > 0.) { /* Initialise counters */ float time[task_category_count] = {0}; threadpool_map(s->threadpool, scheduler_report_task_times_mapper, s->tasks, s->nr_tasks, sizeof(struct task), threadpool_auto_chunk_size, time); /* Compute the dead time */ float total_time = 0.; for (int i = 0; i < task_category_count; ++i) { total_time += time[i]; } const float dead_time = total_tasks_time - total_time; message("*** CPU time spent in different task categories:"); for (int i = 0; i < task_category_count; ++i) { message("*** %20s: %8.2f %s (%.2f %%)", task_category_names[i], time[i], clocks_getunit(), time[i] / total_tasks_time * 100.); } message("*** %20s: %8.2f %s (%.2f %%)", "dead time", dead_time, clocks_getunit(), dead_time / total_tasks_time * 100.); message("*** %20s: %8.2f %s (%.2f %%)", "total", total_tasks_time, clocks_getunit(), total_tasks_time / total_tasks_time * 100.); } /* Done. Report the time spent doing this analysis */ message("took %.3f %s.", clocks_from_ticks(getticks() - tic), clocks_getunit()); }