Commit b821896f authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'dumper-thread' into 'master'

Add an optional task dumper thread

See merge request !859
parents 8e668947 94e79f99
...@@ -407,6 +407,18 @@ if test "$enable_memuse_reports" = "yes"; then ...@@ -407,6 +407,18 @@ if test "$enable_memuse_reports" = "yes"; then
AC_DEFINE([SWIFT_MEMUSE_REPORTS],1,[Enable memory usage reports]) AC_DEFINE([SWIFT_MEMUSE_REPORTS],1,[Enable memory usage reports])
fi fi
# Check if we want to make the dumper thread active.
AC_ARG_ENABLE([dumper],
[AS_HELP_STRING([--enable-dumper],
[Dump active tasks and memory use (if configured)@<:@yes/no@:>@]
)],
[enable_dumper="$enableval"],
[enable_dumper="no"]
)
if test "$enable_dumper" = "yes"; then
AC_DEFINE([SWIFT_DUMPER_THREAD],1,[Enable dumper thread])
fi
# See if we want mpi reporting. # See if we want mpi reporting.
AC_ARG_ENABLE([mpiuse-reports], AC_ARG_ENABLE([mpiuse-reports],
[AS_HELP_STRING([--enable-mpiuse-reports], [AS_HELP_STRING([--enable-mpiuse-reports],
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
/* MPI headers. */ /* MPI headers. */
#ifdef WITH_MPI #ifdef WITH_MPI
...@@ -3383,6 +3384,67 @@ void engine_unpin(void) { ...@@ -3383,6 +3384,67 @@ void engine_unpin(void) {
#endif #endif
} }
#ifdef SWIFT_DUMPER_THREAD
/**
* @brief dumper thread action, checks got the existence of the .dump file
* every 5 seconds and does the dump if found.
*
* @param p the #engine
*/
static void *engine_dumper_poll(void *p) {
struct engine *e = (struct engine *)p;
while (1) {
if (access(".dump", F_OK) == 0) {
/* OK, do our work. */
message("Dumping engine tasks in step: %d", e->step);
task_dump_active(e);
#ifdef SWIFT_MEMUSE_REPORTS
/* Dump the currently logged memory. */
message("Dumping memory use report");
memuse_log_dump_error(e->nodeID);
#endif
/* Add more interesting diagnostics. */
scheduler_dump_queues(e);
/* Delete the file. */
unlink(".dump");
message("Dumping completed");
fflush(stdout);
}
/* Take a breath. */
sleep(5);
}
return NULL;
}
#endif /* SWIFT_DUMPER_THREAD */
#ifdef SWIFT_DUMPER_THREAD
/**
* @brief creates the dumper thread.
*
* This watches for the creation of a ".dump" file in the current directory
* and if found dumps the current state of the tasks and memory use (if also
* configured).
*
* @param e the #engine
*
*/
static void engine_dumper_init(struct engine *e) {
pthread_t dumper;
/* Make sure the .dump file is not present, that is bad when starting up. */
struct stat buf;
if (stat(".dump", &buf) == 0) unlink(".dump");
/* Thread does not exit, so nothing to do but create it. */
pthread_create(&dumper, NULL, &engine_dumper_poll, e);
}
#endif /* SWIFT_DUMPER_THREAD */
/** /**
* @brief init an engine struct with the necessary properties for the * @brief init an engine struct with the necessary properties for the
* simulation. * simulation.
...@@ -4283,6 +4345,12 @@ void engine_config(int restart, int fof, struct engine *e, ...@@ -4283,6 +4345,12 @@ void engine_config(int restart, int fof, struct engine *e,
} }
#endif #endif
#ifdef SWIFT_DUMPER_THREAD
/* Start the dumper thread.*/
engine_dumper_init(e);
#endif
/* Wait for the runner threads to be in place. */ /* Wait for the runner threads to be in place. */
swift_barrier_wait(&e->wait_barrier); swift_barrier_wait(&e->wait_barrier);
} }
......
...@@ -301,3 +301,34 @@ void queue_clean(struct queue *q) { ...@@ -301,3 +301,34 @@ void queue_clean(struct queue *q) {
free(q->tid); free(q->tid);
free(q->tid_incoming); free(q->tid_incoming);
} }
/**
* @brief Dump a formatted list of tasks in the queue to the given file stream.
*
* @param nodeID the node id of this rank.
* @param index a number for this queue, added to the output.
* @param file the FILE stream, should opened for write.
* @param q The task #queue.
*/
void queue_dump(int nodeID, int index, FILE *file, struct queue *q) {
swift_lock_type *qlock = &q->lock;
/* Grab the queue lock. */
if (lock_lock(qlock) != 0) error("Locking the qlock failed.\n");
/* Fill any tasks from the incoming DEQ. */
queue_get_incoming(q);
/* Loop over the queue entries. */
for (int k = 0; k < q->count; k++) {
struct task *t = &q->tasks[q->tid[k]];
fprintf(file, "%d %d %d %s %s %.2f\n", nodeID, index, k,
taskID_names[t->type], subtaskID_names[t->subtype], t->weight);
}
/* Release the task lock. */
if (lock_unlock(qlock) != 0) error("Unlocking the qlock failed.\n");
}
...@@ -67,4 +67,6 @@ void queue_init(struct queue *q, struct task *tasks); ...@@ -67,4 +67,6 @@ void queue_init(struct queue *q, struct task *tasks);
void queue_insert(struct queue *q, struct task *t); void queue_insert(struct queue *q, struct task *t);
void queue_clean(struct queue *q); void queue_clean(struct queue *q);
void queue_dump(int nodeID, int index, FILE *file, struct queue *q);
#endif /* SWIFT_QUEUE_H */ #endif /* SWIFT_QUEUE_H */
...@@ -2274,3 +2274,62 @@ void scheduler_write_task_level(const struct scheduler *s) { ...@@ -2274,3 +2274,62 @@ void scheduler_write_task_level(const struct scheduler *s) {
fclose(f); fclose(f);
free(count); free(count);
} }
/**
* @brief dump all the active queues of all the known schedulers into a file.
*
* @param e the #scheduler
*/
void scheduler_dump_queues(struct engine *e) {
struct scheduler *s = &e->sched;
/* Open the file and write the header. */
char dumpfile[35];
#ifdef WITH_MPI
snprintf(dumpfile, sizeof(dumpfile), "queue_dump_MPI-step%d.dat", e->step);
#else
snprintf(dumpfile, sizeof(dumpfile), "queue_dump-step%d.dat", e->step);
#endif
FILE *file_thread = NULL;
if (engine_rank == 0) {
file_thread = fopen(dumpfile, "w");
fprintf(file_thread, "# rank queue index type subtype weight\n");
}
#ifdef WITH_MPI
/* Make sure output file is closed and empty, then we reopen on each rank. */
if (engine_rank == 0) fclose(file_thread);
MPI_Barrier(MPI_COMM_WORLD);
for (int i = 0; i < e->nr_nodes; i++) {
/* Rank 0 decides the index of the writing node, this happens
* one-by-one. */
int kk = i;
MPI_Bcast(&kk, 1, MPI_INT, 0, MPI_COMM_WORLD);
if (i == engine_rank) {
/* Open file and position at end. */
file_thread = fopen(dumpfile, "a");
for (int l = 0; l < s->nr_queues; l++) {
queue_dump(engine_rank, l, file_thread, &s->queues[l]);
}
fclose(file_thread);
}
/* And we wait for all to synchronize. */
MPI_Barrier(MPI_COMM_WORLD);
}
#else
/* Non-MPI, so just a single schedulers worth of queues to dump. */
for (int l = 0; l < s->nr_queues; l++) {
queue_dump(engine_rank, l, file_thread, &s->queues[l]);
}
fclose(file_thread);
#endif // WITH_MPI
}
...@@ -202,5 +202,6 @@ void scheduler_clean(struct scheduler *s); ...@@ -202,5 +202,6 @@ void scheduler_clean(struct scheduler *s);
void scheduler_free_tasks(struct scheduler *s); void scheduler_free_tasks(struct scheduler *s);
void scheduler_write_dependencies(struct scheduler *s, int verbose); void scheduler_write_dependencies(struct scheduler *s, int verbose);
void scheduler_write_task_level(const struct scheduler *s); void scheduler_write_task_level(const struct scheduler *s);
void scheduler_dump_queues(struct engine *e);
#endif /* SWIFT_SCHEDULER_H */ #endif /* SWIFT_SCHEDULER_H */
...@@ -905,7 +905,7 @@ void task_free_mpi_comms(void) { ...@@ -905,7 +905,7 @@ void task_free_mpi_comms(void) {
* *
* Dumps the information to a file "thread_info-stepn.dat" where n is the * Dumps the information to a file "thread_info-stepn.dat" where n is the
* given step value, or "thread_info_MPI-stepn.dat", if we are running * given step value, or "thread_info_MPI-stepn.dat", if we are running
* under MPI. Note if running under MPIU all the ranks are dumped into this * under MPI. Note if running under MPI all the ranks are dumped into this
* one file, which has an additional field to identify the rank. * one file, which has an additional field to identify the rank.
* *
* @param e the #engine * @param e the #engine
...@@ -1172,3 +1172,116 @@ void task_dump_stats(const char *dumpfile, struct engine *e, int header, ...@@ -1172,3 +1172,116 @@ void task_dump_stats(const char *dumpfile, struct engine *e, int header,
} }
#endif #endif
} }
/**
* @brief dump all the active tasks of all the known engines into a file.
*
* Dumps the information to a file "task_dump-stepn.dat" where n is the given
* step value, or "task_dump_MPI-stepn.dat", if we are running under MPI. Note
* if running under MPI all the ranks are dumped into this one file, which has
* an additional field to identify the rank. Very similar to task_dump_all()
* except for the additional fields used in task debugging and we record tasks
* that have not ran (i.e !skip, but toc == 0) and how many waits are still
* active.
*
* @param e the #engine
*/
void task_dump_active(struct engine *e) {
/* Need this to convert ticks to seconds. */
unsigned long long cpufreq = clocks_get_cpufreq();
#ifdef WITH_MPI
/* Make sure output file is empty, only on one rank. */
char dumpfile[35];
snprintf(dumpfile, sizeof(dumpfile), "task_dump_MPI-step%d.dat", e->step);
FILE *file_thread;
if (engine_rank == 0) {
file_thread = fopen(dumpfile, "w");
fprintf(file_thread,
"# rank otherrank type subtype waits pair tic toc"
" ci.hydro.count cj.hydro.count ci.grav.count cj.grav.count"
" flags\n");
fclose(file_thread);
}
MPI_Barrier(MPI_COMM_WORLD);
for (int i = 0; i < e->nr_nodes; i++) {
/* Rank 0 decides the index of the writing node, this happens
* one-by-one. */
int kk = i;
MPI_Bcast(&kk, 1, MPI_INT, 0, MPI_COMM_WORLD);
if (i == engine_rank) {
/* Open file and position at end. */
file_thread = fopen(dumpfile, "a");
/* Add some information to help with the plots and conversion of ticks to
* seconds. */
fprintf(
file_thread, "%i 0 none none -1 0 %lld %lld %lld %lld %lld 0 %lld\n",
engine_rank, (long long int)e->tic_step, (long long int)e->toc_step,
e->updates, e->g_updates, e->s_updates, cpufreq);
int count = 0;
for (int l = 0; l < e->sched.nr_tasks; l++) {
struct task *t = &e->sched.tasks[l];
/* Not implicit and not skipped. */
if (!t->implicit && !t->skip) {
/* Get destination rank of MPI requests. */
int paired = (t->cj != NULL);
int otherrank = t->ci->nodeID;
if (paired) otherrank = t->cj->nodeID;
fprintf(file_thread, "%i %i %s %s %i %i %lli %lli %i %i %i %i %lli\n",
engine_rank, otherrank, taskID_names[t->type],
subtaskID_names[t->subtype], t->wait, paired,
(long long int)t->tic, (long long int)t->toc,
(t->ci != NULL) ? t->ci->hydro.count : 0,
(t->cj != NULL) ? t->cj->hydro.count : 0,
(t->ci != NULL) ? t->ci->grav.count : 0,
(t->cj != NULL) ? t->cj->grav.count : 0, t->flags);
}
count++;
}
fclose(file_thread);
}
/* And we wait for all to synchronize. */
MPI_Barrier(MPI_COMM_WORLD);
}
#else
/* Non-MPI, so just a single engine's worth of tasks to dump. */
char dumpfile[32];
snprintf(dumpfile, sizeof(dumpfile), "task_dump-step%d.dat", e->step);
FILE *file_thread;
file_thread = fopen(dumpfile, "w");
fprintf(file_thread,
"#type subtype waits pair tic toc ci.hydro.count cj.hydro.count "
"ci.grav.count cj.grav.count\n");
/* Add some information to help with the plots and conversion of ticks to
* seconds. */
fprintf(file_thread, "none none -1 0, %lld %lld %lld %lld %lld %lld\n",
(unsigned long long)e->tic_step, (unsigned long long)e->toc_step,
e->updates, e->g_updates, e->s_updates, cpufreq);
for (int l = 0; l < e->sched.nr_tasks; l++) {
struct task *t = &e->sched.tasks[l];
if (!t->implicit && !t->skip) {
fprintf(file_thread, "%s %s %i %i %lli %lli %i %i %i %i\n",
taskID_names[t->type], subtaskID_names[t->subtype], t->wait,
(t->cj == NULL), (unsigned long long)t->tic,
(unsigned long long)t->toc,
(t->ci == NULL) ? 0 : t->ci->hydro.count,
(t->cj == NULL) ? 0 : t->cj->hydro.count,
(t->ci == NULL) ? 0 : t->ci->grav.count,
(t->cj == NULL) ? 0 : t->cj->grav.count);
}
}
fclose(file_thread);
#endif // WITH_MPI
}
...@@ -239,6 +239,7 @@ void task_print(const struct task *t); ...@@ -239,6 +239,7 @@ void task_print(const struct task *t);
void task_dump_all(struct engine *e, int step); void task_dump_all(struct engine *e, int step);
void task_dump_stats(const char *dumpfile, struct engine *e, int header, void task_dump_stats(const char *dumpfile, struct engine *e, int header,
int allranks); int allranks);
void task_dump_active(struct engine *e);
void task_get_full_name(int type, int subtype, char *name); void task_get_full_name(int type, int subtype, char *name);
void task_get_group_name(int type, int subtype, char *cluster); void task_get_group_name(int type, int subtype, char *cluster);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment